From 25a2d528b0963745269b9f7162f2dca619329bb8 Mon Sep 17 00:00:00 2001 From: blotus Date: Tue, 26 Oct 2021 13:33:45 +0200 Subject: [PATCH] Alerts flush: Optimization of the flush mechanism (batch and limit to one job) + add `cscli alerts flush` command (#1024) - Don't allow running more than one alert flush job at a time to prevent runaway CPU usage in some case. (fix High CPU after Upgrade to 1.2.0 #1022) - Add a cscli alerts flush command to manually flush the alerts in the database (fixes Improvement/Manual flush mechanism #1023 ). - Enable cascading deletion on alerts as we upgraded ent: Deleting an alert in the database will automatically delete all related decisions, events and meta - Add an index on alerts.id to try to improve flush performance with very big sqlite database. - Flush alert now operates in batch --- cmd/crowdsec-cli/alerts.go | 36 ++++++++++++++++++ go.mod | 2 +- go.sum | 10 +++-- pkg/database/alerts.go | 60 ++++++++++++++++++++++-------- pkg/database/database.go | 3 +- pkg/database/ent/migrate/schema.go | 13 +++++-- pkg/database/ent/schema/alert.go | 23 ++++++++++-- 7 files changed, 121 insertions(+), 26 deletions(-) diff --git a/cmd/crowdsec-cli/alerts.go b/cmd/crowdsec-cli/alerts.go index eae1e2e45..54b9972bb 100644 --- a/cmd/crowdsec-cli/alerts.go +++ b/cmd/crowdsec-cli/alerts.go @@ -13,6 +13,7 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/apiclient" "github.com/crowdsecurity/crowdsec/pkg/cwversion" + "github.com/crowdsecurity/crowdsec/pkg/database" "github.com/crowdsecurity/crowdsec/pkg/models" "github.com/go-openapi/strfmt" "github.com/olekukonko/tablewriter" @@ -469,5 +470,40 @@ cscli alerts delete -s crowdsecurity/ssh-bf"`, cmdAlerts.AddCommand(cmdAlertsInspect) + var maxItems int + var maxAge string + var cmdAlertsFlush = &cobra.Command{ + Use: `flush`, + Short: `Flush alerts +/!\ This command can be used only on the same machine than the local API`, + Example: `cscli alerts flush --max-items 1000 --max-age 7d`, + DisableAutoGenTag: true, + Run: func(cmd *cobra.Command, args []string) { + var err error + if err := csConfig.LoadAPIServer(); err != nil || csConfig.DisableAPI { + log.Fatal("Local API is disabled, please run this command on the local API machine") + } + if err := csConfig.LoadDBConfig(); err != nil { + log.Fatalf(err.Error()) + } + dbClient, err = database.NewClient(csConfig.DbConfig) + if err != nil { + log.Fatalf("unable to create new database client: %s", err) + } + log.Info("Flushing alerts. !! This may take a long time !!") + err = dbClient.FlushAlerts(maxAge, maxItems) + if err != nil { + log.Fatalf("unable to flush alerts: %s", err) + } + log.Info("Alerts flushed") + }, + } + + cmdAlertsFlush.Flags().SortFlags = false + cmdAlertsFlush.Flags().IntVar(&maxItems, "max-items", 5000, "Maximum number of alert items to keep in the database") + cmdAlertsFlush.Flags().StringVar(&maxAge, "max-age", "7d", "Maximum age of alert items to keep in the database") + + cmdAlerts.AddCommand(cmdAlertsFlush) + return cmdAlerts } diff --git a/go.mod b/go.mod index 0901d77b7..5995d5cf3 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/enescakir/emoji v1.0.0 github.com/fsnotify/fsnotify v1.4.9 github.com/gin-gonic/gin v1.6.3 - github.com/go-co-op/gocron v0.5.1 + github.com/go-co-op/gocron v1.9.0 github.com/go-openapi/errors v0.19.9 github.com/go-openapi/strfmt v0.19.11 github.com/go-openapi/swag v0.19.12 diff --git a/go.sum b/go.sum index dd7f7c662..569a2455b 100644 --- a/go.sum +++ b/go.sum @@ -169,8 +169,8 @@ github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwv github.com/globalsign/mgo v0.0.0-20180905125535-1ca0a4f7cbcb/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= github.com/go-bindata/go-bindata v1.0.1-0.20190711162640-ee3c2418e368/go.mod h1:7xCgX1lzlrXPHkfvn3EhumqHkmSlzt8at9q7v0ax19c= -github.com/go-co-op/gocron v0.5.1 h1:Cni1V7mt184+HnYTDYe6MH7siofCvf94PrGyIDI1v1U= -github.com/go-co-op/gocron v0.5.1/go.mod h1:6Btk4lVj3bnFAgbVfr76W8impTyhYrEi1pV5Pt4Tp/M= +github.com/go-co-op/gocron v1.9.0 h1:+V+DDenw3ryB7B+tK1bAIC5p0ruw4oX9IqAsdRnGIf0= +github.com/go-co-op/gocron v1.9.0/go.mod h1:DbJm9kdgr1sEvWpHCA7dFFs/PGHPMil9/97EXCRPr4k= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -632,6 +632,8 @@ github.com/rivo/tview v0.0.0-20200219210816-cd38d7432498/go.mod h1:6lkG1x+13OShE github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -815,6 +817,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1004,8 +1007,9 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= gotest.tools/v3 v3.0.3 h1:4AuOwCGf4lLR9u3YOe2awrHygurzhO/HeQ6laiA6Sx0= gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8= diff --git a/pkg/database/alerts.go b/pkg/database/alerts.go index 2e4445077..2a8352a2a 100644 --- a/pkg/database/alerts.go +++ b/pkg/database/alerts.go @@ -730,6 +730,24 @@ func (c *Client) QueryAlertWithFilter(filter map[string][]string) ([]*ent.Alert, return ret, nil } +func (c *Client) DeleteAlertGraphBatch(alertItems []*ent.Alert) (int, error) { + idList := make([]int, 0) + for _, alert := range alertItems { + idList = append(idList, int(alert.ID)) + } + + deleted, err := c.Ent.Alert.Delete(). + Where(alert.IDIn(idList...)).Exec(c.CTX) + if err != nil { + c.Log.Warningf("DeleteAlertGraph : %s", err) + return deleted, errors.Wrapf(DeleteFail, "alert graph delete batch") + } + + c.Log.Debug("Done batch delete alerts") + + return deleted, nil +} + func (c *Client) DeleteAlertGraph(alertItem *ent.Alert) error { // delete the associated events _, err := c.Ent.Event.Delete(). @@ -810,12 +828,15 @@ func (c *Client) FlushAlerts(MaxAge string, MaxItems int) error { var totalAlerts int var err error + c.Log.Info("Flushing orphan alerts") c.FlushOrphans() + c.Log.Info("Done flushing orphan alerts") totalAlerts, err = c.TotalAlerts() if err != nil { c.Log.Warningf("FlushAlerts (max items count) : %s", err) return errors.Wrap(err, "unable to get alerts count") } + c.Log.Infof("FlushAlerts (Total alerts): %d", totalAlerts) if MaxAge != "" { filter := map[string][]string{ "created_before": {MaxAge}, @@ -825,29 +846,38 @@ func (c *Client) FlushAlerts(MaxAge string, MaxItems int) error { c.Log.Warningf("FlushAlerts (max age) : %s", err) return errors.Wrapf(err, "unable to flush alerts with filter until: %s", MaxAge) } + c.Log.Infof("FlushAlerts (deleted max age alerts): %d", nbDeleted) deletedByAge = nbDeleted } if MaxItems > 0 { if totalAlerts > MaxItems { nbToDelete := totalAlerts - MaxItems - alerts, err := c.QueryAlertWithFilter(map[string][]string{ - "sort": {"ASC"}, - "limit": {strconv.Itoa(nbToDelete)}, - }) // we want to delete older alerts if we reach the max number of items - if err != nil { - c.Log.Warningf("FlushAlerts (max items query) : %s", err) - return errors.Wrap(err, "unable to get all alerts") + batchSize := 500 + if batchSize > nbToDelete { + batchSize = nbToDelete } - for itemNb, alert := range alerts { - if itemNb < nbToDelete { - err := c.DeleteAlertGraph(alert) - if err != nil { - c.Log.Warningf("FlushAlerts : %s", err) - return errors.Wrap(err, "unable to flush alert") - } - deletedByNbItem++ + deleted := 0 + for deleted < nbToDelete { + c.Log.Infof("FlushAlerts (before query with filter) to delete: %d", nbToDelete) + alerts, err := c.QueryAlertWithFilter(map[string][]string{ + "sort": {"ASC"}, + "limit": {strconv.Itoa(batchSize)}, + }) // we want to delete older alerts if we reach the max number of items + if err != nil { + c.Log.Warningf("FlushAlerts (max items query) : %s", err) + return errors.Wrap(err, "unable to get all alerts") + } + deletedAlerts, err := c.DeleteAlertGraphBatch(alerts) + if err != nil { + c.Log.Warningf("FlushAlerts (max items query) : %s", err) + return errors.Wrap(err, "unable to delete alerts") + } + deleted += deletedAlerts + if nbToDelete-deleted < batchSize { + batchSize = nbToDelete - deleted } } + deletedByNbItem = deleted } } if deletedByNbItem > 0 { diff --git a/pkg/database/database.go b/pkg/database/database.go index 7c7108f15..a28a8fde4 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -100,7 +100,8 @@ func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Sched } // Init & Start cronjob every minute scheduler := gocron.NewScheduler(time.UTC) - scheduler.Every(1).Minute().Do(c.FlushAlerts, maxAge, maxItems) + job, _ := scheduler.Every(1).Minute().Do(c.FlushAlerts, maxAge, maxItems) + job.SingletonMode() scheduler.StartAsync() return scheduler, nil diff --git a/pkg/database/ent/migrate/schema.go b/pkg/database/ent/migrate/schema.go index f97700712..94cad488e 100644 --- a/pkg/database/ent/migrate/schema.go +++ b/pkg/database/ent/migrate/schema.go @@ -48,6 +48,13 @@ var ( OnDelete: schema.SetNull, }, }, + Indexes: []*schema.Index{ + { + Name: "alert_id", + Unique: false, + Columns: []*schema.Column{AlertsColumns[0]}, + }, + }, } // BouncersColumns holds the columns for the "bouncers" table. BouncersColumns = []*schema.Column{ @@ -98,7 +105,7 @@ var ( Symbol: "decisions_alerts_decisions", Columns: []*schema.Column{DecisionsColumns[15]}, RefColumns: []*schema.Column{AlertsColumns[0]}, - OnDelete: schema.SetNull, + OnDelete: schema.Cascade, }, }, } @@ -121,7 +128,7 @@ var ( Symbol: "events_alerts_events", Columns: []*schema.Column{EventsColumns[5]}, RefColumns: []*schema.Column{AlertsColumns[0]}, - OnDelete: schema.SetNull, + OnDelete: schema.Cascade, }, }, } @@ -163,7 +170,7 @@ var ( Symbol: "meta_alerts_metas", Columns: []*schema.Column{MetaColumns[5]}, RefColumns: []*schema.Column{AlertsColumns[0]}, - OnDelete: schema.SetNull, + OnDelete: schema.Cascade, }, }, } diff --git a/pkg/database/ent/schema/alert.go b/pkg/database/ent/schema/alert.go index 956550dee..3afb36a62 100644 --- a/pkg/database/ent/schema/alert.go +++ b/pkg/database/ent/schema/alert.go @@ -4,8 +4,10 @@ import ( "time" "entgo.io/ent" + "entgo.io/ent/dialect/entsql" "entgo.io/ent/schema/edge" "entgo.io/ent/schema/field" + "entgo.io/ent/schema/index" ) // Alert holds the schema definition for the Alert entity. @@ -56,8 +58,23 @@ func (Alert) Edges() []ent.Edge { edge.From("owner", Machine.Type). Ref("alerts"). Unique(), - edge.To("decisions", Decision.Type), - edge.To("events", Event.Type), - edge.To("metas", Meta.Type), + edge.To("decisions", Decision.Type). + Annotations(entsql.Annotation{ + OnDelete: entsql.Cascade, + }), + edge.To("events", Event.Type). + Annotations(entsql.Annotation{ + OnDelete: entsql.Cascade, + }), + edge.To("metas", Meta.Type). + Annotations(entsql.Annotation{ + OnDelete: entsql.Cascade, + }), + } +} + +func (Alert) Indexes() []ent.Index { + return []ent.Index{ + index.Fields("id"), } }