diff --git a/pkg/leakybucket/bucket.go b/pkg/leakybucket/bucket.go index 1c508aab9..b2f97169d 100644 --- a/pkg/leakybucket/bucket.go +++ b/pkg/leakybucket/bucket.go @@ -61,16 +61,17 @@ type Leaky struct { Duration time.Duration Pour func(*Leaky, types.Event) `json:"-"` //Profiling when set to true enables profiling of bucket - Profiling bool - timedOverflow bool - logger *log.Entry - scopeType types.ScopeType - hash string - scenarioVersion string - tomb *tomb.Tomb - wgPour *sync.WaitGroup - wgDumpState *sync.WaitGroup - mutex *sync.Mutex //used only for TIMEMACHINE mode to allow garbage collection without races + Profiling bool + timedOverflow bool + conditionalOverflow bool + logger *log.Entry + scopeType types.ScopeType + hash string + scenarioVersion string + tomb *tomb.Tomb + wgPour *sync.WaitGroup + wgDumpState *sync.WaitGroup + mutex *sync.Mutex //used only for TIMEMACHINE mode to allow garbage collection without races } var BucketsPour = prometheus.NewCounterVec( @@ -188,6 +189,10 @@ func FromFactory(bucketFactory BucketFactory) *Leaky { l.timedOverflow = true } + if l.BucketConfig.Type == "conditional" { + l.conditionalOverflow = true + l.Duration = l.BucketConfig.leakspeed + } return l } @@ -247,6 +252,14 @@ func LeakRoutine(leaky *Leaky) error { BucketsPour.With(prometheus.Labels{"name": leaky.Name, "source": msg.Line.Src, "type": msg.Line.Module}).Inc() leaky.Pour(leaky, *msg) // glue for now + + for _, processor := range processors { + msg = processor.AfterBucketPour(leaky.BucketConfig)(*msg, leaky) + if msg == nil { + goto End + } + } + //Clear cache on behalf of pour // if durationTicker isn't initialized, then we're pouring our first event @@ -337,7 +350,8 @@ func Pour(leaky *Leaky, msg types.Event) { leaky.First_ts = time.Now().UTC() } leaky.Last_ts = time.Now().UTC() - if leaky.Limiter.Allow() { + + if leaky.Limiter.Allow() || leaky.conditionalOverflow { leaky.Queue.Add(msg) } else { leaky.Ovflw_ts = time.Now().UTC() diff --git a/pkg/leakybucket/buckets_test.go b/pkg/leakybucket/buckets_test.go index a7eb3a087..833a4aa37 100644 --- a/pkg/leakybucket/buckets_test.go +++ b/pkg/leakybucket/buckets_test.go @@ -64,8 +64,8 @@ func TestBucket(t *testing.T) { } } -//during tests, we're likely to have only one scenario, and thus only one holder. -//we want to avoid the death of the tomb because all existing buckets have been destroyed. +// during tests, we're likely to have only one scenario, and thus only one holder. +// we want to avoid the death of the tomb because all existing buckets have been destroyed. func watchTomb(tomb *tomb.Tomb) { for { if tomb.Alive() == false { diff --git a/pkg/leakybucket/conditional.go b/pkg/leakybucket/conditional.go new file mode 100644 index 000000000..fecba9c69 --- /dev/null +++ b/pkg/leakybucket/conditional.go @@ -0,0 +1,61 @@ +package leakybucket + +import ( + "fmt" + "time" + + "github.com/antonmedv/expr" + "github.com/antonmedv/expr/vm" + "github.com/crowdsecurity/crowdsec/pkg/exprhelpers" + "github.com/crowdsecurity/crowdsec/pkg/types" +) + +type ConditionalOverflow struct { + ConditionalFilter string + ConditionalFilterRuntime *vm.Program + DumbProcessor +} + +func NewConditionalOverflow(g *BucketFactory) (*ConditionalOverflow, error) { + var err error + + c := ConditionalOverflow{} + c.ConditionalFilter = g.ConditionalOverflow + c.ConditionalFilterRuntime, err = expr.Compile(c.ConditionalFilter, expr.Env(exprhelpers.GetExprEnv(map[string]interface{}{ + "queue": &Queue{}, "leaky": &Leaky{}}))) + if err != nil { + g.logger.Errorf("Unable to compile condition expression for conditional bucket : %s", err) + return nil, fmt.Errorf("unable to compile condition expression for conditional bucket : %v", err) + } + return &c, nil +} + +func (c *ConditionalOverflow) AfterBucketPour(b *BucketFactory) func(types.Event, *Leaky) *types.Event { + return func(msg types.Event, l *Leaky) *types.Event { + var condition, ok bool + if c.ConditionalFilterRuntime != nil { + l.logger.Debugf("Running condition expression : %s", c.ConditionalFilter) + ret, err := expr.Run(c.ConditionalFilterRuntime, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &msg, "queue": l.Queue, "leaky": l})) + if err != nil { + l.logger.Errorf("unable to run conditional filter : %s", err) + return &msg + } + + l.logger.Debugf("Conditional bucket expression returned : %v", ret) + + if condition, ok = ret.(bool); !ok { + l.logger.Warningf("overflow condition, unexpected non-bool return : %T", ret) + return &msg + } + + if condition { + l.logger.Debugf("Conditional bucket overflow") + l.Ovflw_ts = time.Now().UTC() + l.Out <- l.Queue + return nil + } + } + + return &msg + } +} diff --git a/pkg/leakybucket/manager_load.go b/pkg/leakybucket/manager_load.go index a786e2f6b..462a665c0 100644 --- a/pkg/leakybucket/manager_load.go +++ b/pkg/leakybucket/manager_load.go @@ -34,49 +34,50 @@ import ( // BucketFactory struct holds all fields for any bucket configuration. This is to have a // generic struct for buckets. This can be seen as a bucket factory. type BucketFactory struct { - FormatVersion string `yaml:"format"` - Author string `yaml:"author"` - Description string `yaml:"description"` - References []string `yaml:"references"` - Type string `yaml:"type"` //Type can be : leaky, counter, trigger. It determines the main bucket characteristics - Name string `yaml:"name"` //Name of the bucket, used later in log and user-messages. Should be unique - Capacity int `yaml:"capacity"` //Capacity is applicable to leaky buckets and determines the "burst" capacity - LeakSpeed string `yaml:"leakspeed"` //Leakspeed is a float representing how many events per second leak out of the bucket - Duration string `yaml:"duration"` //Duration allows 'counter' buckets to have a fixed life-time - Filter string `yaml:"filter"` //Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct - GroupBy string `yaml:"groupby,omitempty"` //groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip - Distinct string `yaml:"distinct"` //Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on distinct expr result) - Debug bool `yaml:"debug"` //Debug, when set to true, will enable debugging for _this_ scenario specifically - Labels map[string]string `yaml:"labels"` //Labels is K:V list aiming at providing context the overflow - Blackhole string `yaml:"blackhole,omitempty"` //Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration - logger *log.Entry `yaml:"-"` //logger is bucket-specific logger (used by Debug as well) - Reprocess bool `yaml:"reprocess"` //Reprocess, if true, will for the bucket to be re-injected into processing chain - CacheSize int `yaml:"cache_size"` //CacheSize, if > 0, limits the size of in-memory cache of the bucket - Profiling bool `yaml:"profiling"` //Profiling, if true, will make the bucket record pours/overflows/etc. - OverflowFilter string `yaml:"overflow_filter"` //OverflowFilter if present, is a filter that must return true for the overflow to go through - ScopeType types.ScopeType `yaml:"scope,omitempty"` //to enforce a different remediation than blocking an IP. Will default this to IP - BucketName string `yaml:"-"` - Filename string `yaml:"-"` - RunTimeFilter *vm.Program `json:"-"` - ExprDebugger *exprhelpers.ExprDebugger `yaml:"-" json:"-"` // used to debug expression by printing the content of each variable of the expression - RunTimeGroupBy *vm.Program `json:"-"` - Data []*types.DataSource `yaml:"data,omitempty"` - DataDir string `yaml:"-"` - CancelOnFilter string `yaml:"cancel_on,omitempty"` //a filter that, if matched, kills the bucket - leakspeed time.Duration //internal representation of `Leakspeed` - duration time.Duration //internal representation of `Duration` - ret chan types.Event //the bucket-specific output chan for overflows - processors []Processor //processors is the list of hooks for pour/overflow/create (cf. uniq, blackhole etc.) - output bool //?? - ScenarioVersion string `yaml:"version,omitempty"` - hash string `yaml:"-"` - Simulated bool `yaml:"simulated"` //Set to true if the scenario instantiating the bucket was in the exclusion list - tomb *tomb.Tomb `yaml:"-"` - wgPour *sync.WaitGroup `yaml:"-"` - wgDumpState *sync.WaitGroup `yaml:"-"` + FormatVersion string `yaml:"format"` + Author string `yaml:"author"` + Description string `yaml:"description"` + References []string `yaml:"references"` + Type string `yaml:"type"` //Type can be : leaky, counter, trigger. It determines the main bucket characteristics + Name string `yaml:"name"` //Name of the bucket, used later in log and user-messages. Should be unique + Capacity int `yaml:"capacity"` //Capacity is applicable to leaky buckets and determines the "burst" capacity + LeakSpeed string `yaml:"leakspeed"` //Leakspeed is a float representing how many events per second leak out of the bucket + Duration string `yaml:"duration"` //Duration allows 'counter' buckets to have a fixed life-time + Filter string `yaml:"filter"` //Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct + GroupBy string `yaml:"groupby,omitempty"` //groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip + Distinct string `yaml:"distinct"` //Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on distinct expr result) + Debug bool `yaml:"debug"` //Debug, when set to true, will enable debugging for _this_ scenario specifically + Labels map[string]string `yaml:"labels"` //Labels is K:V list aiming at providing context the overflow + Blackhole string `yaml:"blackhole,omitempty"` //Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration + logger *log.Entry `yaml:"-"` //logger is bucket-specific logger (used by Debug as well) + Reprocess bool `yaml:"reprocess"` //Reprocess, if true, will for the bucket to be re-injected into processing chain + CacheSize int `yaml:"cache_size"` //CacheSize, if > 0, limits the size of in-memory cache of the bucket + Profiling bool `yaml:"profiling"` //Profiling, if true, will make the bucket record pours/overflows/etc. + OverflowFilter string `yaml:"overflow_filter"` //OverflowFilter if present, is a filter that must return true for the overflow to go through + ConditionalOverflow string `yaml:"condition"` //condition if present, is an expression that must return true for the bucket to overflow + ScopeType types.ScopeType `yaml:"scope,omitempty"` //to enforce a different remediation than blocking an IP. Will default this to IP + BucketName string `yaml:"-"` + Filename string `yaml:"-"` + RunTimeFilter *vm.Program `json:"-"` + ExprDebugger *exprhelpers.ExprDebugger `yaml:"-" json:"-"` // used to debug expression by printing the content of each variable of the expression + RunTimeGroupBy *vm.Program `json:"-"` + Data []*types.DataSource `yaml:"data,omitempty"` + DataDir string `yaml:"-"` + CancelOnFilter string `yaml:"cancel_on,omitempty"` //a filter that, if matched, kills the bucket + leakspeed time.Duration //internal representation of `Leakspeed` + duration time.Duration //internal representation of `Duration` + ret chan types.Event //the bucket-specific output chan for overflows + processors []Processor //processors is the list of hooks for pour/overflow/create (cf. uniq, blackhole etc.) + output bool //?? + ScenarioVersion string `yaml:"version,omitempty"` + hash string `yaml:"-"` + Simulated bool `yaml:"simulated"` //Set to true if the scenario instantiating the bucket was in the exclusion list + tomb *tomb.Tomb `yaml:"-"` + wgPour *sync.WaitGroup `yaml:"-"` + wgDumpState *sync.WaitGroup `yaml:"-"` } -//we use one NameGenerator for all the future buckets +// we use one NameGenerator for all the future buckets var seed namegenerator.Generator = namegenerator.NewNameGenerator(time.Now().UTC().UnixNano()) func ValidateFactory(bucketFactory *BucketFactory) error { @@ -98,7 +99,7 @@ func ValidateFactory(bucketFactory *BucketFactory) error { } } else if bucketFactory.Type == "counter" { if bucketFactory.Duration == "" { - return fmt.Errorf("duration ca't be empty for counter") + return fmt.Errorf("duration can't be empty for counter") } if bucketFactory.duration == 0 { return fmt.Errorf("bad duration for counter bucket '%d'", bucketFactory.duration) @@ -110,6 +111,19 @@ func ValidateFactory(bucketFactory *BucketFactory) error { if bucketFactory.Capacity != 0 { return fmt.Errorf("trigger bucket must have 0 capacity") } + } else if bucketFactory.Type == "conditional" { + if bucketFactory.ConditionalOverflow == "" { + return fmt.Errorf("conditional bucket must have a condition") + } + if bucketFactory.Capacity != -1 { + bucketFactory.logger.Warnf("Using a value different than -1 as capacity for conditional bucket, this may lead to unexpected overflows") + } + if bucketFactory.LeakSpeed == "" { + return fmt.Errorf("leakspeed can't be empty for conditional bucket") + } + if bucketFactory.leakspeed == 0 { + return fmt.Errorf("bad leakspeed for conditional bucket '%s'", bucketFactory.LeakSpeed) + } } else { return fmt.Errorf("unknown bucket type '%s'", bucketFactory.Type) } @@ -304,6 +318,8 @@ func LoadBucket(bucketFactory *BucketFactory, tomb *tomb.Tomb) error { bucketFactory.processors = append(bucketFactory.processors, &Trigger{}) case "counter": bucketFactory.processors = append(bucketFactory.processors, &DumbProcessor{}) + case "conditional": + bucketFactory.processors = append(bucketFactory.processors, &DumbProcessor{}) default: return fmt.Errorf("invalid type '%s' in %s : %v", bucketFactory.Type, bucketFactory.Filename, err) } @@ -338,6 +354,16 @@ func LoadBucket(bucketFactory *BucketFactory, tomb *tomb.Tomb) error { bucketFactory.processors = append(bucketFactory.processors, blackhole) } + if bucketFactory.ConditionalOverflow != "" { + bucketFactory.logger.Tracef("Adding conditional overflow.") + condovflw, err := NewConditionalOverflow(bucketFactory) + if err != nil { + bucketFactory.logger.Errorf("Error creating conditional overflow : %s", err) + return fmt.Errorf("error creating conditional overflow : %s", err) + } + bucketFactory.processors = append(bucketFactory.processors, condovflw) + } + if len(bucketFactory.Data) > 0 { for _, data := range bucketFactory.Data { if data.DestPath == "" { diff --git a/pkg/leakybucket/processor.go b/pkg/leakybucket/processor.go index 50693e7bc..18dc287d8 100644 --- a/pkg/leakybucket/processor.go +++ b/pkg/leakybucket/processor.go @@ -6,6 +6,8 @@ type Processor interface { OnBucketInit(Bucket *BucketFactory) error OnBucketPour(Bucket *BucketFactory) func(types.Event, *Leaky) *types.Event OnBucketOverflow(Bucket *BucketFactory) func(*Leaky, types.RuntimeAlert, *Queue) (types.RuntimeAlert, *Queue) + + AfterBucketPour(Bucket *BucketFactory) func(types.Event, *Leaky) *types.Event } type DumbProcessor struct { @@ -25,5 +27,10 @@ func (d *DumbProcessor) OnBucketOverflow(b *BucketFactory) func(*Leaky, types.Ru return func(leaky *Leaky, alert types.RuntimeAlert, queue *Queue) (types.RuntimeAlert, *Queue) { return alert, queue } - +} + +func (d *DumbProcessor) AfterBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event { + return func(msg types.Event, leaky *Leaky) *types.Event { + return &msg + } } diff --git a/pkg/leakybucket/reset_filter.go b/pkg/leakybucket/reset_filter.go index 0d50294e9..0c925afdb 100644 --- a/pkg/leakybucket/reset_filter.go +++ b/pkg/leakybucket/reset_filter.go @@ -64,6 +64,12 @@ func (u *CancelOnFilter) OnBucketOverflow(bucketFactory *BucketFactory) func(*Le } } +func (u *CancelOnFilter) AfterBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event { + return func(msg types.Event, leaky *Leaky) *types.Event { + return &msg + } +} + func (u *CancelOnFilter) OnBucketInit(bucketFactory *BucketFactory) error { var err error var compiledExpr struct { diff --git a/pkg/leakybucket/tests/conditional-bucket/bucket.yaml b/pkg/leakybucket/tests/conditional-bucket/bucket.yaml new file mode 100644 index 000000000..59fbe141d --- /dev/null +++ b/pkg/leakybucket/tests/conditional-bucket/bucket.yaml @@ -0,0 +1,11 @@ +type: conditional +name: test/conditional +#debug: true +description: "conditional bucket" +filter: "evt.Meta.log_type == 'http_access-log'" +groupby: evt.Meta.source_ip +condition: any(queue.Queue, {.Meta.http_path == "/"}) and any(queue.Queue, {.Meta.http_path == "/foo"}) +leakspeed: 1s +capacity: -1 +labels: + type: overflow_1 \ No newline at end of file diff --git a/pkg/leakybucket/tests/conditional-bucket/scenarios.yaml b/pkg/leakybucket/tests/conditional-bucket/scenarios.yaml new file mode 100644 index 000000000..05e1557cf --- /dev/null +++ b/pkg/leakybucket/tests/conditional-bucket/scenarios.yaml @@ -0,0 +1 @@ + - filename: {{.TestDirectory}}/bucket.yaml \ No newline at end of file diff --git a/pkg/leakybucket/tests/conditional-bucket/test.json b/pkg/leakybucket/tests/conditional-bucket/test.json new file mode 100644 index 000000000..7bda73286 --- /dev/null +++ b/pkg/leakybucket/tests/conditional-bucket/test.json @@ -0,0 +1,50 @@ +{ + "lines": [ + { + "Line": { + "Labels": { + "type": "nginx" + }, + "Raw": "don't care" + }, + "MarshaledTime": "2020-01-01T10:00:00.000Z", + "Meta": { + "source_ip": "2a00:1450:4007:816::200e", + "log_type": "http_access-log", + "http_path": "/" + } + }, + { + "Line": { + "Labels": { + "type": "nginx" + }, + "Raw": "don't care" + }, + "MarshaledTime": "2020-01-01T10:00:00.000Z", + "Meta": { + "source_ip": "2a00:1450:4007:816::200e", + "log_type": "http_access-log", + "http_path": "/foo" + } + } + ], + "results": [ + { + "Type" : 1, + "Alert": { + "sources" : { + "2a00:1450:4007:816::200e": { + "ip": "2a00:1450:4007:816::200e", + "scope": "Ip", + "value": "2a00:1450:4007:816::200e" + } + }, + "Alert" : { + "scenario": "test/conditional", + "events_count": 2 + } + } + } + ] + } \ No newline at end of file diff --git a/pkg/leakybucket/uniq.go b/pkg/leakybucket/uniq.go index 1e0cddeba..2c9ddb11e 100644 --- a/pkg/leakybucket/uniq.go +++ b/pkg/leakybucket/uniq.go @@ -53,6 +53,12 @@ func (u *Uniq) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, types } } +func (u *Uniq) AfterBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event { + return func(msg types.Event, leaky *Leaky) *types.Event { + return &msg + } +} + func (u *Uniq) OnBucketInit(bucketFactory *BucketFactory) error { var err error var compiledExpr *vm.Program