From ab525fff6a933d32b27a3acd972e11475e4b658e Mon Sep 17 00:00:00 2001 From: alteredCoder Date: Mon, 11 Apr 2022 19:02:02 +0200 Subject: [PATCH] update --- cmd/crowdsec-cli/console.go | 16 ++++++ pkg/database/alerts.go | 1 - pkg/leakybucket/bucket.go | 3 ++ pkg/leakybucket/manager_load.go | 95 +++++++++++++++++++-------------- pkg/leakybucket/overflows.go | 41 +++++++++++++- 5 files changed, 114 insertions(+), 42 deletions(-) diff --git a/cmd/crowdsec-cli/console.go b/cmd/crowdsec-cli/console.go index 3cf94db91..b1e56a6de 100644 --- a/cmd/crowdsec-cli/console.go +++ b/cmd/crowdsec-cli/console.go @@ -20,6 +20,7 @@ import ( "github.com/olekukonko/tablewriter" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "gopkg.in/yaml.v2" ) func NewConsoleCmd() *cobra.Command { @@ -295,6 +296,21 @@ Disable given information push to the central API.`, cmdLabelAdd.MarkFlagRequired("value") cmdLabel.AddCommand(cmdLabelAdd) + cmdLabelStatus := &cobra.Command{ + Use: "status", + Short: "List label to send with alerts", + DisableAutoGenTag: true, + Run: func(cmd *cobra.Command, args []string) { + dump, err := yaml.Marshal(csConfig.Crowdsec.LabelsToSend) + if err != nil { + log.Fatalf("unable to show labels status: %s", err) + } + fmt.Println(dump) + + }, + } + cmdLabel.AddCommand(cmdLabelStatus) + cmdConsole.AddCommand(cmdLabel) return cmdConsole diff --git a/pkg/database/alerts.go b/pkg/database/alerts.go index 5304ab7e9..d40502dc3 100644 --- a/pkg/database/alerts.go +++ b/pkg/database/alerts.go @@ -392,7 +392,6 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([ return []string{}, errors.Wrapf(BulkError, "creating alert events: %s", err) } } - if len(alertItem.Meta) > 0 { metaBulk := make([]*ent.MetaCreate, len(alertItem.Meta)) for i, metaItem := range alertItem.Meta { diff --git a/pkg/leakybucket/bucket.go b/pkg/leakybucket/bucket.go index a51e9322f..b4b8447f0 100644 --- a/pkg/leakybucket/bucket.go +++ b/pkg/leakybucket/bucket.go @@ -7,6 +7,7 @@ import ( "time" //"log" + "github.com/antonmedv/expr/vm" "github.com/crowdsecurity/crowdsec/pkg/time/rate" "github.com/crowdsecurity/crowdsec/pkg/types" "github.com/goombaio/namegenerator" @@ -71,6 +72,7 @@ type Leaky struct { wgPour *sync.WaitGroup wgDumpState *sync.WaitGroup mutex *sync.Mutex //used only for TIMEMACHINE mode to allow garbage collection without races + LabelsToSend map[string][]*vm.Program } var BucketsPour = prometheus.NewCounterVec( @@ -179,6 +181,7 @@ func FromFactory(bucketFactory BucketFactory) *Leaky { wgPour: bucketFactory.wgPour, wgDumpState: bucketFactory.wgDumpState, mutex: &sync.Mutex{}, + LabelsToSend: bucketFactory.LabelsToSendCompiled, } if l.BucketConfig.Capacity > 0 && l.BucketConfig.leakspeed != time.Duration(0) { l.Duration = time.Duration(l.BucketConfig.Capacity+1) * l.BucketConfig.leakspeed diff --git a/pkg/leakybucket/manager_load.go b/pkg/leakybucket/manager_load.go index 7f595bd08..d9a51105b 100644 --- a/pkg/leakybucket/manager_load.go +++ b/pkg/leakybucket/manager_load.go @@ -32,46 +32,48 @@ import ( // BucketFactory struct holds all fields for any bucket configuration. This is to have a // generic struct for buckets. This can be seen as a bucket factory. type BucketFactory struct { - FormatVersion string `yaml:"format"` - Author string `yaml:"author"` - Description string `yaml:"description"` - References []string `yaml:"references"` - Type string `yaml:"type"` //Type can be : leaky, counter, trigger. It determines the main bucket characteristics - Name string `yaml:"name"` //Name of the bucket, used later in log and user-messages. Should be unique - Capacity int `yaml:"capacity"` //Capacity is applicable to leaky buckets and determines the "burst" capacity - LeakSpeed string `yaml:"leakspeed"` //Leakspeed is a float representing how many events per second leak out of the bucket - Duration string `yaml:"duration"` //Duration allows 'counter' buckets to have a fixed life-time - Filter string `yaml:"filter"` //Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct - GroupBy string `yaml:"groupby,omitempty"` //groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip - Distinct string `yaml:"distinct"` //Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on distinct expr result) - Debug bool `yaml:"debug"` //Debug, when set to true, will enable debugging for _this_ scenario specifically - Labels map[string]string `yaml:"labels"` //Labels is K:V list aiming at providing context the overflow - Blackhole string `yaml:"blackhole,omitempty"` //Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration - logger *log.Entry `yaml:"-"` //logger is bucket-specific logger (used by Debug as well) - Reprocess bool `yaml:"reprocess"` //Reprocess, if true, will for the bucket to be re-injected into processing chain - CacheSize int `yaml:"cache_size"` //CacheSize, if > 0, limits the size of in-memory cache of the bucket - Profiling bool `yaml:"profiling"` //Profiling, if true, will make the bucket record pours/overflows/etc. - OverflowFilter string `yaml:"overflow_filter"` //OverflowFilter if present, is a filter that must return true for the overflow to go through - ScopeType types.ScopeType `yaml:"scope,omitempty"` //to enforce a different remediation than blocking an IP. Will default this to IP - BucketName string `yaml:"-"` - Filename string `yaml:"-"` - RunTimeFilter *vm.Program `json:"-"` - ExprDebugger *exprhelpers.ExprDebugger `yaml:"-" json:"-"` // used to debug expression by printing the content of each variable of the expression - RunTimeGroupBy *vm.Program `json:"-"` - Data []*types.DataSource `yaml:"data,omitempty"` - DataDir string `yaml:"-"` - CancelOnFilter string `yaml:"cancel_on,omitempty"` //a filter that, if matched, kills the bucket - leakspeed time.Duration //internal representation of `Leakspeed` - duration time.Duration //internal representation of `Duration` - ret chan types.Event //the bucket-specific output chan for overflows - processors []Processor //processors is the list of hooks for pour/overflow/create (cf. uniq, blackhole etc.) - output bool //?? - ScenarioVersion string `yaml:"version,omitempty"` - hash string `yaml:"-"` - Simulated bool `yaml:"simulated"` //Set to true if the scenario instanciating the bucket was in the exclusion list - tomb *tomb.Tomb `yaml:"-"` - wgPour *sync.WaitGroup `yaml:"-"` - wgDumpState *sync.WaitGroup `yaml:"-"` + FormatVersion string `yaml:"format"` + Author string `yaml:"author"` + Description string `yaml:"description"` + References []string `yaml:"references"` + Type string `yaml:"type"` //Type can be : leaky, counter, trigger. It determines the main bucket characteristics + Name string `yaml:"name"` //Name of the bucket, used later in log and user-messages. Should be unique + Capacity int `yaml:"capacity"` //Capacity is applicable to leaky buckets and determines the "burst" capacity + LeakSpeed string `yaml:"leakspeed"` //Leakspeed is a float representing how many events per second leak out of the bucket + Duration string `yaml:"duration"` //Duration allows 'counter' buckets to have a fixed life-time + Filter string `yaml:"filter"` //Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct + GroupBy string `yaml:"groupby,omitempty"` //groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip + Distinct string `yaml:"distinct"` //Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on distinct expr result) + Debug bool `yaml:"debug"` //Debug, when set to true, will enable debugging for _this_ scenario specifically + Labels map[string]string `yaml:"labels"` //Labels is K:V list aiming at providing context the overflow + Blackhole string `yaml:"blackhole,omitempty"` //Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration + logger *log.Entry `yaml:"-"` //logger is bucket-specific logger (used by Debug as well) + Reprocess bool `yaml:"reprocess"` //Reprocess, if true, will for the bucket to be re-injected into processing chain + CacheSize int `yaml:"cache_size"` //CacheSize, if > 0, limits the size of in-memory cache of the bucket + Profiling bool `yaml:"profiling"` //Profiling, if true, will make the bucket record pours/overflows/etc. + OverflowFilter string `yaml:"overflow_filter"` //OverflowFilter if present, is a filter that must return true for the overflow to go through + ScopeType types.ScopeType `yaml:"scope,omitempty"` //to enforce a different remediation than blocking an IP. Will default this to IP + BucketName string `yaml:"-"` + Filename string `yaml:"-"` + RunTimeFilter *vm.Program `json:"-"` + ExprDebugger *exprhelpers.ExprDebugger `yaml:"-" json:"-"` // used to debug expression by printing the content of each variable of the expression + RunTimeGroupBy *vm.Program `json:"-"` + Data []*types.DataSource `yaml:"data,omitempty"` + DataDir string `yaml:"-"` + CancelOnFilter string `yaml:"cancel_on,omitempty"` //a filter that, if matched, kills the bucket + leakspeed time.Duration //internal representation of `Leakspeed` + duration time.Duration //internal representation of `Duration` + ret chan types.Event //the bucket-specific output chan for overflows + processors []Processor //processors is the list of hooks for pour/overflow/create (cf. uniq, blackhole etc.) + output bool //?? + ScenarioVersion string `yaml:"version,omitempty"` + hash string `yaml:"-"` + Simulated bool `yaml:"simulated"` //Set to true if the scenario instanciating the bucket was in the exclusion list + tomb *tomb.Tomb `yaml:"-"` + wgPour *sync.WaitGroup `yaml:"-"` + wgDumpState *sync.WaitGroup `yaml:"-"` + LabelsToSend map[string][]string `yaml:"-"` + LabelsToSendCompiled map[string][]*vm.Program `yaml:"-"` } func ValidateFactory(bucketFactory *BucketFactory) error { @@ -216,6 +218,7 @@ func LoadBuckets(cscfg *csconfig.CrowdsecServiceCfg, files []string, tomb *tomb. bucketFactory.wgDumpState = buckets.wgDumpState bucketFactory.wgPour = buckets.wgPour + bucketFactory.LabelsToSend = cscfg.LabelsToSend err = LoadBucket(&bucketFactory, tomb) if err != nil { log.Errorf("Failed to load bucket %s : %v", bucketFactory.Name, err) @@ -348,6 +351,18 @@ func LoadBucket(bucketFactory *BucketFactory, tomb *tomb.Tomb) error { return fmt.Errorf("invalid bucket from %s : %v", bucketFactory.Filename, err) } bucketFactory.tomb = tomb + bucketFactory.LabelsToSendCompiled = make(map[string][]*vm.Program) + for key, values := range bucketFactory.LabelsToSend { + bucketFactory.LabelsToSendCompiled[key] = make([]*vm.Program, 0) + for _, value := range values { + valueCompiled, err := expr.Compile(value, expr.Env(exprhelpers.GetExprEnv(map[string]interface{}{"evt": &types.Event{}}))) + if err != nil { + return fmt.Errorf("compilation of '%s' failed: %v", value, err) + } + bucketFactory.LabelsToSendCompiled[key] = append(bucketFactory.LabelsToSendCompiled[key], valueCompiled) + } + } + return nil } diff --git a/pkg/leakybucket/overflows.go b/pkg/leakybucket/overflows.go index 5b308d591..80aa6e173 100644 --- a/pkg/leakybucket/overflows.go +++ b/pkg/leakybucket/overflows.go @@ -1,6 +1,7 @@ package leakybucket import ( + "encoding/json" "fmt" "net" "sort" @@ -14,6 +15,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/antonmedv/expr" + "github.com/antonmedv/expr/vm" "github.com/crowdsecurity/crowdsec/pkg/exprhelpers" ) @@ -232,7 +234,42 @@ func alertFormatSource(leaky *Leaky, queue *Queue) (map[string]models.Source, st return sources, source_type, nil } -func EventToLabel(Queue) { +func EventToLabel(labels map[string][]*vm.Program, queue *Queue) models.Meta { + meta := make([]*models.MetaItems0, 0) + for _, evt := range queue.Queue { + for key, values := range labels { + tmpMeta := models.MetaItems0{} + tmpMeta.Key = key + tmpValue := make([]string, 0) + + for _, value := range values { + var val string + output, err := expr.Run(value, exprhelpers.GetExprEnv(map[string]interface{}{"evt": evt})) + if err != nil { + log.Warningf("failed to get value of '%v': %v", value, err) + continue + } + switch out := output.(type) { + case string: + val = out + case int: + val = strconv.Itoa(out) + default: + log.Warningf("unexpected return type for label to send : %T", output) + continue + } + tmpValue = append(tmpValue, val) + } + valueBytes, err := json.Marshal(tmpValue) + if err != nil { + log.Warningf("unable to marshall label values to send: %s", err) + } + tmpMeta.Value = string(valueBytes) + meta = append(meta, &tmpMeta) + } + } + ret := models.Meta(meta) + return ret } @@ -296,6 +333,7 @@ func NewAlert(leaky *Leaky, queue *Queue) (types.RuntimeAlert, error) { *apiAlert.Message = fmt.Sprintf("%s %s performed '%s' (%d events over %s) at %s", source_scope, sourceStr, leaky.Name, leaky.Total_count, leaky.Ovflw_ts.Sub(leaky.First_ts), leaky.Last_ts) //Get the events from Leaky/Queue apiAlert.Events = EventsFromQueue(queue) + apiAlert.Meta = EventToLabel(leaky.LabelsToSend, leaky.Queue) //Loop over the Sources and generate appropriate number of ApiAlerts for _, srcValue := range sources { @@ -321,5 +359,6 @@ func NewAlert(leaky *Leaky, queue *Queue) (types.RuntimeAlert, error) { if leaky.Reprocess { runtimeAlert.Reprocess = true } + return runtimeAlert, nil }