From 10c97e46af364d267db54fab0b613e951f2e93fc Mon Sep 17 00:00:00 2001 From: Sebastien Blot Date: Mon, 8 Aug 2022 12:41:55 +0200 Subject: [PATCH] do not create bucket if the overflow would be blackholed --- cmd/crowdsec/crowdsec.go | 9 ++++ cmd/crowdsec/output.go | 2 + pkg/apiserver/controllers/v1/alerts.go | 7 +++ pkg/database/alerts.go | 14 +++++- pkg/leakybucket/blackhole.go | 64 +++++++++++++++----------- pkg/leakybucket/bucket.go | 1 - pkg/leakybucket/buckets_test.go | 6 ++- pkg/leakybucket/manager_run.go | 14 ++++-- 8 files changed, 84 insertions(+), 33 deletions(-) diff --git a/cmd/crowdsec/crowdsec.go b/cmd/crowdsec/crowdsec.go index 84cf0838b..2fb7602d6 100644 --- a/cmd/crowdsec/crowdsec.go +++ b/cmd/crowdsec/crowdsec.go @@ -11,6 +11,7 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/acquisition" "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/cwhub" + "github.com/crowdsecurity/crowdsec/pkg/leakybucket" leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" "github.com/crowdsecurity/crowdsec/pkg/parser" "github.com/crowdsecurity/crowdsec/pkg/types" @@ -47,6 +48,8 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers) error { inputLineChan := make(chan types.Event) inputEventChan := make(chan types.Event) + leaky.BlackholeTracking = &sync.Map{} + //start go-routines for parsing, buckets pour and outputs. parserWg := &sync.WaitGroup{} parsersTomb.Go(func() error { @@ -67,6 +70,12 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers) error { parserWg.Wait() bucketWg := &sync.WaitGroup{} + bucketsTomb.Go(func() error { + bucketWg.Add(1) + leakybucket.CleanupBlackhole(&bucketsTomb) + bucketWg.Done() + return nil + }) bucketsTomb.Go(func() error { bucketWg.Add(1) /*restore previous state as well if present*/ diff --git a/cmd/crowdsec/output.go b/cmd/crowdsec/output.go index 88e85e04b..92a1fcff3 100644 --- a/cmd/crowdsec/output.go +++ b/cmd/crowdsec/output.go @@ -48,6 +48,7 @@ func PushAlerts(alerts []types.RuntimeAlert, client *apiclient.ApiClient) error ctx := context.Background() alertsToPush, err := dedupAlerts(alerts) + start := time.Now() if err != nil { return errors.Wrap(err, "failed to transform alerts for api") } @@ -55,6 +56,7 @@ func PushAlerts(alerts []types.RuntimeAlert, client *apiclient.ApiClient) error if err != nil { return errors.Wrap(err, "failed sending alert to LAPI") } + log.Tracef("sent %d alerts in %s", len(alertsToPush), time.Since(start)) return nil } diff --git a/pkg/apiserver/controllers/v1/alerts.go b/pkg/apiserver/controllers/v1/alerts.go index 4d79af907..7c1dafd1a 100644 --- a/pkg/apiserver/controllers/v1/alerts.go +++ b/pkg/apiserver/controllers/v1/alerts.go @@ -115,6 +115,7 @@ func (c *Controller) sendAlertToPluginChannel(alert *models.Alert, profileID uin // CreateAlert : write received alerts in body to the database func (c *Controller) CreateAlert(gctx *gin.Context) { + startCreateAlert := time.Now() var input models.AddAlertsRequest claims := jwt.ExtractClaims(gctx) @@ -130,6 +131,7 @@ func (c *Controller) CreateAlert(gctx *gin.Context) { return } stopFlush := false + startAlertLoop := time.Now() for _, alert := range input { alert.MachineID = machineID if len(alert.Decisions) != 0 { @@ -175,12 +177,15 @@ func (c *Controller) CreateAlert(gctx *gin.Context) { } } } + log.Tracef("alert loop in controller took %s", time.Since(startAlertLoop)) if stopFlush { c.DBClient.CanFlush = false } + startDbCall := time.Now() alerts, err := c.DBClient.CreateAlert(machineID, input) + log.Tracef("CreateAlert : %s", time.Since(startDbCall)) c.DBClient.CanFlush = true if err != nil { @@ -197,6 +202,8 @@ func (c *Controller) CreateAlert(gctx *gin.Context) { } } + log.Tracef("%d alerts created in %s", len(input), time.Since(startCreateAlert)) + gctx.JSON(http.StatusCreated, alerts) } diff --git a/pkg/database/alerts.go b/pkg/database/alerts.go index 66ae60a10..61d1a004d 100644 --- a/pkg/database/alerts.go +++ b/pkg/database/alerts.go @@ -306,6 +306,7 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([ c.Log.Debugf("writing %d items", len(alertList)) bulk := make([]*ent.AlertCreate, 0, bulkSize) alertDecisions := make([][]*ent.Decision, 0, bulkSize) + alertLoopStart := time.Now() for i, alertItem := range alertList { var decisions []*ent.Decision var metas []*ent.Meta @@ -524,12 +525,16 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([ } } } + c.Log.Tracef("alert loop took %s for %d elements", time.Since(alertLoopStart), len(alertList)) + createBulkStart := time.Now() alerts, err := c.Ent.Alert.CreateBulk(bulk...).Save(c.CTX) if err != nil { return []string{}, errors.Wrapf(BulkError, "leftovers creating alert : %s", err) } + c.Log.Infof("createBulk took %s for %d elements", time.Since(createBulkStart), len(bulk)) + decisionsUpdateTime := time.Now() for alertIndex, a := range alerts { ret = append(ret, strconv.Itoa(a.ID)) d := alertDecisions[alertIndex] @@ -542,6 +547,8 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([ } } + c.Log.Infof("decisions update took %s for %d elements", time.Since(decisionsUpdateTime), len(alerts)) + return ret, nil } @@ -888,7 +895,12 @@ func (c *Client) DeleteAlertWithFilter(filter map[string][]string) (int, error) return 0, errors.Wrap(DeleteFail, "alert query failed") } - for _, alertItem := range alertsToDelete { + c.Log.Debugf("Deleting %d alerts", len(alertsToDelete)) + + for p, alertItem := range alertsToDelete { + if p%100 == 0 { + c.Log.Debugf("Deleting alert %d", p) + } err = c.DeleteAlertGraph(alertItem) if err != nil { c.Log.Warningf("DeleteAlertWithFilter : %s", err) diff --git a/pkg/leakybucket/blackhole.go b/pkg/leakybucket/blackhole.go index dd46d11ae..1803d717e 100644 --- a/pkg/leakybucket/blackhole.go +++ b/pkg/leakybucket/blackhole.go @@ -5,6 +5,8 @@ import ( "time" "github.com/crowdsecurity/crowdsec/pkg/types" + log "github.com/sirupsen/logrus" + "gopkg.in/tomb.v2" ) type HiddenKey struct { @@ -31,38 +33,48 @@ func NewBlackhole(bucketFactory *BucketFactory) (*Blackhole, error) { }, nil } +func CleanupBlackhole(bucketsTomb *tomb.Tomb) error { + ticker := time.NewTicker(10 * time.Second) + for { + select { + case <-bucketsTomb.Dying(): + ticker.Stop() + BlackholeTracking.Range(func(key, value interface{}) bool { + BlackholeTracking.Delete(key) + return true + }) + return nil + case <-ticker.C: + BlackholeTracking.Range(func(key, value interface{}) bool { + expirationDate := value.(time.Time) + if expirationDate.Before(time.Now().UTC()) { + log.Debugf("Expiring blackhole for %s", key) + BlackholeTracking.Delete(key) + } + return true + }) + } + } +} + func (bl *Blackhole) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, types.RuntimeAlert, *Queue) (types.RuntimeAlert, *Queue) { return func(leaky *Leaky, alert types.RuntimeAlert, queue *Queue) (types.RuntimeAlert, *Queue) { - var blackholed bool = false - var tmp []HiddenKey - // search if we are blackholed and refresh the slice - for _, element := range bl.hiddenKeys { - - if element.key == leaky.Mapkey { - if element.expiration.After(leaky.Ovflw_ts) { - leaky.logger.Debugf("Overflow discarded, still blackholed for %s", element.expiration.Sub(leaky.Ovflw_ts)) - blackholed = true - } - } - - if element.expiration.After(leaky.Ovflw_ts) { - tmp = append(tmp, element) + if expirationDate, ok := BlackholeTracking.Load(leaky.Mapkey); ok { + if expirationDate.(time.Time).After(time.Now().UTC()) { + leaky.logger.Debugf("Blackhole already triggered for %s", leaky.Mapkey) + return types.RuntimeAlert{ + Mapkey: leaky.Mapkey, + }, nil } else { - leaky.logger.Debugf("%s left blackhole %s ago", element.key, leaky.Ovflw_ts.Sub(element.expiration)) - + leaky.logger.Debugf("Blackhole expired for %s", leaky.Mapkey) + BlackholeTracking.Delete(leaky.Mapkey) } } - bl.hiddenKeys = tmp - if blackholed { - leaky.logger.Tracef("Event is blackholed (%s)", leaky.First_ts) - return types.RuntimeAlert{ - Mapkey: leaky.Mapkey, - }, nil - } - bl.hiddenKeys = append(bl.hiddenKeys, HiddenKey{leaky.Mapkey, leaky.Ovflw_ts.Add(bl.duration)}) - leaky.logger.Debugf("Adding overflow to blackhole (%s)", leaky.First_ts) + BlackholeTracking.Store(leaky.Mapkey, time.Now().UTC().Add(bl.duration)) + + leaky.logger.Debugf("Blackhole triggered for %s", leaky.Mapkey) + return alert, queue } - } diff --git a/pkg/leakybucket/bucket.go b/pkg/leakybucket/bucket.go index f9b360241..f76e42d6b 100644 --- a/pkg/leakybucket/bucket.go +++ b/pkg/leakybucket/bucket.go @@ -220,7 +220,6 @@ func LeakRoutine(leaky *Leaky) error { } } - leaky.logger.Debugf("Leaky routine starting, lifetime : %s", leaky.Duration) for { select { /*receiving an event*/ diff --git a/pkg/leakybucket/buckets_test.go b/pkg/leakybucket/buckets_test.go index 23e5b1c79..6dcbf6089 100644 --- a/pkg/leakybucket/buckets_test.go +++ b/pkg/leakybucket/buckets_test.go @@ -38,6 +38,8 @@ func TestBucket(t *testing.T) { log.Fatalf("exprhelpers init failed: %s", err) } + BlackholeTracking = &sync.Map{} + if envSetting != "" { if err := testOneBucket(t, envSetting, tomb); err != nil { t.Fatalf("Test '%s' failed : %s", envSetting, err) @@ -64,8 +66,8 @@ func TestBucket(t *testing.T) { } } -//during tests, we're likely to have only one scenario, and thus only one holder. -//we want to avoid the death of the tomb because all existing buckets have been destroyed. +// during tests, we're likely to have only one scenario, and thus only one holder. +// we want to avoid the death of the tomb because all existing buckets have been destroyed. func watchTomb(tomb *tomb.Tomb) { for { if tomb.Alive() == false { diff --git a/pkg/leakybucket/manager_run.go b/pkg/leakybucket/manager_run.go index 2a891957e..46e2ebb28 100644 --- a/pkg/leakybucket/manager_run.go +++ b/pkg/leakybucket/manager_run.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "math" "os" + "sync" "time" "github.com/pkg/errors" @@ -23,9 +24,13 @@ var serialized map[string]Leaky var BucketPourCache map[string][]types.Event var BucketPourTrack bool -/*The leaky routines lifecycle are based on "real" time. +var BlackholeTracking *sync.Map + +/* +The leaky routines lifecycle are based on "real" time. But when we are running in time-machine mode, the reference time is in logs and not "real" time. -Thus we need to garbage collect them to avoid a skyrocketing memory usage.*/ +Thus we need to garbage collect them to avoid a skyrocketing memory usage. +*/ func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) error { buckets.wgPour.Wait() buckets.wgDumpState.Add(1) @@ -340,7 +345,10 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc } } buckey := GetKey(holders[idx], groupby) - + if _, ok := BlackholeTracking.Load(buckey); ok { + holders[idx].logger.Tracef("Event is blackholed: %s", buckey) + continue + } //we need to either find the existing bucket, or create a new one (if it's the first event to hit it for this partition key) bucket, err := LoadOrStoreBucketFromHolder(buckey, buckets, holders[idx], parsed.ExpectMode) if err != nil {