do not create bucket if the overflow would be blackholed

This commit is contained in:
Sebastien Blot 2022-08-08 12:41:55 +02:00
parent 9f72bbe725
commit 10c97e46af
No known key found for this signature in database
GPG key ID: DFC2902F40449F6A
8 changed files with 84 additions and 33 deletions

View file

@ -11,6 +11,7 @@ import (
"github.com/crowdsecurity/crowdsec/pkg/acquisition"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
"github.com/crowdsecurity/crowdsec/pkg/cwhub"
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/parser"
"github.com/crowdsecurity/crowdsec/pkg/types"
@ -47,6 +48,8 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers) error {
inputLineChan := make(chan types.Event)
inputEventChan := make(chan types.Event)
leaky.BlackholeTracking = &sync.Map{}
//start go-routines for parsing, buckets pour and outputs.
parserWg := &sync.WaitGroup{}
parsersTomb.Go(func() error {
@ -67,6 +70,12 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers) error {
parserWg.Wait()
bucketWg := &sync.WaitGroup{}
bucketsTomb.Go(func() error {
bucketWg.Add(1)
leakybucket.CleanupBlackhole(&bucketsTomb)
bucketWg.Done()
return nil
})
bucketsTomb.Go(func() error {
bucketWg.Add(1)
/*restore previous state as well if present*/

View file

@ -48,6 +48,7 @@ func PushAlerts(alerts []types.RuntimeAlert, client *apiclient.ApiClient) error
ctx := context.Background()
alertsToPush, err := dedupAlerts(alerts)
start := time.Now()
if err != nil {
return errors.Wrap(err, "failed to transform alerts for api")
}
@ -55,6 +56,7 @@ func PushAlerts(alerts []types.RuntimeAlert, client *apiclient.ApiClient) error
if err != nil {
return errors.Wrap(err, "failed sending alert to LAPI")
}
log.Tracef("sent %d alerts in %s", len(alertsToPush), time.Since(start))
return nil
}

View file

@ -115,6 +115,7 @@ func (c *Controller) sendAlertToPluginChannel(alert *models.Alert, profileID uin
// CreateAlert : write received alerts in body to the database
func (c *Controller) CreateAlert(gctx *gin.Context) {
startCreateAlert := time.Now()
var input models.AddAlertsRequest
claims := jwt.ExtractClaims(gctx)
@ -130,6 +131,7 @@ func (c *Controller) CreateAlert(gctx *gin.Context) {
return
}
stopFlush := false
startAlertLoop := time.Now()
for _, alert := range input {
alert.MachineID = machineID
if len(alert.Decisions) != 0 {
@ -175,12 +177,15 @@ func (c *Controller) CreateAlert(gctx *gin.Context) {
}
}
}
log.Tracef("alert loop in controller took %s", time.Since(startAlertLoop))
if stopFlush {
c.DBClient.CanFlush = false
}
startDbCall := time.Now()
alerts, err := c.DBClient.CreateAlert(machineID, input)
log.Tracef("CreateAlert : %s", time.Since(startDbCall))
c.DBClient.CanFlush = true
if err != nil {
@ -197,6 +202,8 @@ func (c *Controller) CreateAlert(gctx *gin.Context) {
}
}
log.Tracef("%d alerts created in %s", len(input), time.Since(startCreateAlert))
gctx.JSON(http.StatusCreated, alerts)
}

View file

@ -306,6 +306,7 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([
c.Log.Debugf("writing %d items", len(alertList))
bulk := make([]*ent.AlertCreate, 0, bulkSize)
alertDecisions := make([][]*ent.Decision, 0, bulkSize)
alertLoopStart := time.Now()
for i, alertItem := range alertList {
var decisions []*ent.Decision
var metas []*ent.Meta
@ -524,12 +525,16 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([
}
}
}
c.Log.Tracef("alert loop took %s for %d elements", time.Since(alertLoopStart), len(alertList))
createBulkStart := time.Now()
alerts, err := c.Ent.Alert.CreateBulk(bulk...).Save(c.CTX)
if err != nil {
return []string{}, errors.Wrapf(BulkError, "leftovers creating alert : %s", err)
}
c.Log.Infof("createBulk took %s for %d elements", time.Since(createBulkStart), len(bulk))
decisionsUpdateTime := time.Now()
for alertIndex, a := range alerts {
ret = append(ret, strconv.Itoa(a.ID))
d := alertDecisions[alertIndex]
@ -542,6 +547,8 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([
}
}
c.Log.Infof("decisions update took %s for %d elements", time.Since(decisionsUpdateTime), len(alerts))
return ret, nil
}
@ -888,7 +895,12 @@ func (c *Client) DeleteAlertWithFilter(filter map[string][]string) (int, error)
return 0, errors.Wrap(DeleteFail, "alert query failed")
}
for _, alertItem := range alertsToDelete {
c.Log.Debugf("Deleting %d alerts", len(alertsToDelete))
for p, alertItem := range alertsToDelete {
if p%100 == 0 {
c.Log.Debugf("Deleting alert %d", p)
}
err = c.DeleteAlertGraph(alertItem)
if err != nil {
c.Log.Warningf("DeleteAlertWithFilter : %s", err)

View file

@ -5,6 +5,8 @@ import (
"time"
"github.com/crowdsecurity/crowdsec/pkg/types"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
)
type HiddenKey struct {
@ -31,38 +33,48 @@ func NewBlackhole(bucketFactory *BucketFactory) (*Blackhole, error) {
}, nil
}
func CleanupBlackhole(bucketsTomb *tomb.Tomb) error {
ticker := time.NewTicker(10 * time.Second)
for {
select {
case <-bucketsTomb.Dying():
ticker.Stop()
BlackholeTracking.Range(func(key, value interface{}) bool {
BlackholeTracking.Delete(key)
return true
})
return nil
case <-ticker.C:
BlackholeTracking.Range(func(key, value interface{}) bool {
expirationDate := value.(time.Time)
if expirationDate.Before(time.Now().UTC()) {
log.Debugf("Expiring blackhole for %s", key)
BlackholeTracking.Delete(key)
}
return true
})
}
}
}
func (bl *Blackhole) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, types.RuntimeAlert, *Queue) (types.RuntimeAlert, *Queue) {
return func(leaky *Leaky, alert types.RuntimeAlert, queue *Queue) (types.RuntimeAlert, *Queue) {
var blackholed bool = false
var tmp []HiddenKey
// search if we are blackholed and refresh the slice
for _, element := range bl.hiddenKeys {
if element.key == leaky.Mapkey {
if element.expiration.After(leaky.Ovflw_ts) {
leaky.logger.Debugf("Overflow discarded, still blackholed for %s", element.expiration.Sub(leaky.Ovflw_ts))
blackholed = true
}
}
if element.expiration.After(leaky.Ovflw_ts) {
tmp = append(tmp, element)
if expirationDate, ok := BlackholeTracking.Load(leaky.Mapkey); ok {
if expirationDate.(time.Time).After(time.Now().UTC()) {
leaky.logger.Debugf("Blackhole already triggered for %s", leaky.Mapkey)
return types.RuntimeAlert{
Mapkey: leaky.Mapkey,
}, nil
} else {
leaky.logger.Debugf("%s left blackhole %s ago", element.key, leaky.Ovflw_ts.Sub(element.expiration))
leaky.logger.Debugf("Blackhole expired for %s", leaky.Mapkey)
BlackholeTracking.Delete(leaky.Mapkey)
}
}
bl.hiddenKeys = tmp
if blackholed {
leaky.logger.Tracef("Event is blackholed (%s)", leaky.First_ts)
return types.RuntimeAlert{
Mapkey: leaky.Mapkey,
}, nil
}
bl.hiddenKeys = append(bl.hiddenKeys, HiddenKey{leaky.Mapkey, leaky.Ovflw_ts.Add(bl.duration)})
leaky.logger.Debugf("Adding overflow to blackhole (%s)", leaky.First_ts)
BlackholeTracking.Store(leaky.Mapkey, time.Now().UTC().Add(bl.duration))
leaky.logger.Debugf("Blackhole triggered for %s", leaky.Mapkey)
return alert, queue
}
}

View file

@ -220,7 +220,6 @@ func LeakRoutine(leaky *Leaky) error {
}
}
leaky.logger.Debugf("Leaky routine starting, lifetime : %s", leaky.Duration)
for {
select {
/*receiving an event*/

View file

@ -38,6 +38,8 @@ func TestBucket(t *testing.T) {
log.Fatalf("exprhelpers init failed: %s", err)
}
BlackholeTracking = &sync.Map{}
if envSetting != "" {
if err := testOneBucket(t, envSetting, tomb); err != nil {
t.Fatalf("Test '%s' failed : %s", envSetting, err)
@ -64,8 +66,8 @@ func TestBucket(t *testing.T) {
}
}
//during tests, we're likely to have only one scenario, and thus only one holder.
//we want to avoid the death of the tomb because all existing buckets have been destroyed.
// during tests, we're likely to have only one scenario, and thus only one holder.
// we want to avoid the death of the tomb because all existing buckets have been destroyed.
func watchTomb(tomb *tomb.Tomb) {
for {
if tomb.Alive() == false {

View file

@ -6,6 +6,7 @@ import (
"io/ioutil"
"math"
"os"
"sync"
"time"
"github.com/pkg/errors"
@ -23,9 +24,13 @@ var serialized map[string]Leaky
var BucketPourCache map[string][]types.Event
var BucketPourTrack bool
/*The leaky routines lifecycle are based on "real" time.
var BlackholeTracking *sync.Map
/*
The leaky routines lifecycle are based on "real" time.
But when we are running in time-machine mode, the reference time is in logs and not "real" time.
Thus we need to garbage collect them to avoid a skyrocketing memory usage.*/
Thus we need to garbage collect them to avoid a skyrocketing memory usage.
*/
func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) error {
buckets.wgPour.Wait()
buckets.wgDumpState.Add(1)
@ -340,7 +345,10 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc
}
}
buckey := GetKey(holders[idx], groupby)
if _, ok := BlackholeTracking.Load(buckey); ok {
holders[idx].logger.Tracef("Event is blackholed: %s", buckey)
continue
}
//we need to either find the existing bucket, or create a new one (if it's the first event to hit it for this partition key)
bucket, err := LoadOrStoreBucketFromHolder(buckey, buckets, holders[idx], parsed.ExpectMode)
if err != nil {