From 9ac5aeda79287fb206ff87d6790436ad7a104234 Mon Sep 17 00:00:00 2001 From: Manuel Sabban Date: Thu, 20 Jul 2023 11:41:30 +0200 Subject: [PATCH] fix the ci by adding the ability to enforce event ordering (#2347) * fix the ci by adding the ability to enforce event ordering --- cmd/crowdsec/main.go | 6 ++++-- pkg/hubtest/hubtest_item.go | 16 ++++++++-------- pkg/leakybucket/bucket.go | 13 ++++++++++++- pkg/leakybucket/buckets_test.go | 2 +- pkg/leakybucket/manager_load.go | 6 +++++- pkg/leakybucket/manager_run.go | 22 ++++++++++++++++++++++ pkg/leakybucket/timemachine.go | 1 - 7 files changed, 52 insertions(+), 14 deletions(-) diff --git a/cmd/crowdsec/main.go b/cmd/crowdsec/main.go index 9e73ba903..5c50884b5 100644 --- a/cmd/crowdsec/main.go +++ b/cmd/crowdsec/main.go @@ -70,6 +70,7 @@ type Flags struct { WinSvc string DisableCAPI bool Transform string + OrderEvent bool } type labelsMap map[string]string @@ -87,7 +88,7 @@ func LoadBuckets(cConfig *csconfig.Config) error { buckets = leakybucket.NewBuckets() log.Infof("Loading %d scenario files", len(files)) - holders, outputEventChan, err = leakybucket.LoadBuckets(cConfig.Crowdsec, files, &bucketsTomb, buckets) + holders, outputEventChan, err = leakybucket.LoadBuckets(cConfig.Crowdsec, files, &bucketsTomb, buckets, flags.OrderEvent) if err != nil { return fmt.Errorf("scenario loading failed: %v", err) @@ -110,7 +111,7 @@ func LoadAcquisition(cConfig *csconfig.Config) error { dataSources, err = acquisition.LoadAcquisitionFromDSN(flags.OneShotDSN, flags.Labels, flags.Transform) if err != nil { - return fmt.Errorf("failed to configure datasource for %s: %w", flags.OneShotDSN, err) + return errors.Wrapf(err, "failed to configure datasource for %s", flags.OneShotDSN) } } else { dataSources, err = acquisition.LoadAcquisitionFromFile(cConfig.Crowdsec) @@ -164,6 +165,7 @@ func (f *Flags) Parse() { flag.BoolVar(&f.DisableAgent, "no-cs", false, "disable crowdsec agent") flag.BoolVar(&f.DisableAPI, "no-api", false, "disable local API") flag.BoolVar(&f.DisableCAPI, "no-capi", false, "disable communication with Central API") + flag.BoolVar(&f.OrderEvent, "order-event", false, "enforce event ordering with significant performance cost") if runtime.GOOS == "windows" { flag.StringVar(&f.WinSvc, "winsvc", "", "Windows service Action: Install, Remove etc..") } diff --git a/pkg/hubtest/hubtest_item.go b/pkg/hubtest/hubtest_item.go index 1ec7c5f44..475f42cf6 100644 --- a/pkg/hubtest/hubtest_item.go +++ b/pkg/hubtest/hubtest_item.go @@ -15,13 +15,13 @@ import ( ) type HubTestItemConfig struct { - Parsers []string `yaml:"parsers"` - Scenarios []string `yaml:"scenarios"` - PostOVerflows []string `yaml:"postoverflows"` - LogFile string `yaml:"log_file"` - LogType string `yaml:"log_type"` - Labels map[string]string `yaml:"labels"` - IgnoreParsers bool `yaml:"ignore_parsers"` // if we test a scenario, we don't want to assert on Parser + Parsers []string `yaml:"parsers"` + Scenarios []string `yaml:"scenarios"` + PostOVerflows []string `yaml:"postoverflows"` + LogFile string `yaml:"log_file"` + LogType string `yaml:"log_type"` + Labels map[string]string `yaml:"labels"` + IgnoreParsers bool `yaml:"ignore_parsers"` // if we test a scenario, we don't want to assert on Parser OverrideStatics []parser.ExtraField `yaml:"override_statics"` //Allow to override statics. Executed before s00 } @@ -530,7 +530,7 @@ func (t *HubTestItem) Run() error { } } - cmdArgs = []string{"-c", t.RuntimeConfigFilePath, "-type", logType, "-dsn", dsn, "-dump-data", t.ResultsPath} + cmdArgs = []string{"-c", t.RuntimeConfigFilePath, "-type", logType, "-dsn", dsn, "-dump-data", t.ResultsPath, "-order-event"} for labelKey, labelValue := range t.Config.Labels { arg := fmt.Sprintf("%s:%s", labelKey, labelValue) cmdArgs = append(cmdArgs, "-label", arg) diff --git a/pkg/leakybucket/bucket.go b/pkg/leakybucket/bucket.go index 286c51f11..4589be32a 100644 --- a/pkg/leakybucket/bucket.go +++ b/pkg/leakybucket/bucket.go @@ -70,6 +70,7 @@ type Leaky struct { wgPour *sync.WaitGroup wgDumpState *sync.WaitGroup mutex *sync.Mutex //used only for TIMEMACHINE mode to allow garbage collection without races + orderEvent bool } var BucketsPour = prometheus.NewCounterVec( @@ -178,6 +179,7 @@ func FromFactory(bucketFactory BucketFactory) *Leaky { wgPour: bucketFactory.wgPour, wgDumpState: bucketFactory.wgDumpState, mutex: &sync.Mutex{}, + orderEvent: bucketFactory.orderEvent, } if l.BucketConfig.Capacity > 0 && l.BucketConfig.leakspeed != time.Duration(0) { l.Duration = time.Duration(l.BucketConfig.Capacity+1) * l.BucketConfig.leakspeed @@ -245,6 +247,9 @@ func LeakRoutine(leaky *Leaky) error { msg = processor.OnBucketPour(leaky.BucketConfig)(*msg, leaky) // if &msg == nil we stop processing if msg == nil { + if leaky.orderEvent { + orderEvent[leaky.Mapkey].Done() + } goto End } } @@ -258,6 +263,9 @@ func LeakRoutine(leaky *Leaky) error { for _, processor := range processors { msg = processor.AfterBucketPour(leaky.BucketConfig)(*msg, leaky) if msg == nil { + if leaky.orderEvent { + orderEvent[leaky.Mapkey].Done() + } goto End } } @@ -277,7 +285,10 @@ func LeakRoutine(leaky *Leaky) error { } } firstEvent = false - /*we overflowed*/ + /*we overflowed*/ + if leaky.orderEvent { + orderEvent[leaky.Mapkey].Done() + } case ofw := <-leaky.Out: leaky.overflow(ofw) return nil diff --git a/pkg/leakybucket/buckets_test.go b/pkg/leakybucket/buckets_test.go index 5cc8b18b5..e08887be8 100644 --- a/pkg/leakybucket/buckets_test.go +++ b/pkg/leakybucket/buckets_test.go @@ -115,7 +115,7 @@ func testOneBucket(t *testing.T, dir string, tomb *tomb.Tomb) error { cscfg := &csconfig.CrowdsecServiceCfg{ DataDir: "tests", } - holders, response, err := LoadBuckets(cscfg, files, tomb, buckets) + holders, response, err := LoadBuckets(cscfg, files, tomb, buckets, false) if err != nil { t.Fatalf("failed loading bucket : %s", err) } diff --git a/pkg/leakybucket/manager_load.go b/pkg/leakybucket/manager_load.go index dc1f4ed51..45485e4f9 100644 --- a/pkg/leakybucket/manager_load.go +++ b/pkg/leakybucket/manager_load.go @@ -74,6 +74,7 @@ type BucketFactory struct { tomb *tomb.Tomb `yaml:"-"` wgPour *sync.WaitGroup `yaml:"-"` wgDumpState *sync.WaitGroup `yaml:"-"` + orderEvent bool } // we use one NameGenerator for all the future buckets @@ -178,7 +179,7 @@ func ValidateFactory(bucketFactory *BucketFactory) error { return nil } -func LoadBuckets(cscfg *csconfig.CrowdsecServiceCfg, files []string, tomb *tomb.Tomb, buckets *Buckets) ([]BucketFactory, chan types.Event, error) { +func LoadBuckets(cscfg *csconfig.CrowdsecServiceCfg, files []string, tomb *tomb.Tomb, buckets *Buckets, orderEvent bool) ([]BucketFactory, chan types.Event, error) { var ( ret = []BucketFactory{} response chan types.Event @@ -256,6 +257,9 @@ func LoadBuckets(cscfg *csconfig.CrowdsecServiceCfg, files []string, tomb *tomb. log.Errorf("Failed to load bucket %s : %v", bucketFactory.Name, err) return nil, nil, fmt.Errorf("loading of %s failed : %v", bucketFactory.Name, err) } + + bucketFactory.orderEvent = orderEvent + ret = append(ret, bucketFactory) } } diff --git a/pkg/leakybucket/manager_run.go b/pkg/leakybucket/manager_run.go index 679c603df..388227a41 100644 --- a/pkg/leakybucket/manager_run.go +++ b/pkg/leakybucket/manager_run.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "os" + "sync" "time" "github.com/antonmedv/expr" @@ -279,6 +280,8 @@ func LoadOrStoreBucketFromHolder(partitionKey string, buckets *Buckets, holder B return biface.(*Leaky), nil } +var orderEvent map[string]*sync.WaitGroup + func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buckets) (bool, error) { var ( ok, condition, poured bool @@ -344,7 +347,26 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc return false, fmt.Errorf("failed to load or store bucket: %w", err) } //finally, pour the even into the bucket + + if bucket.orderEvent { + if orderEvent == nil { + orderEvent = make(map[string]*sync.WaitGroup) + } + if orderEvent[buckey] != nil { + orderEvent[buckey].Wait() + } else { + orderEvent[buckey] = &sync.WaitGroup{} + } + + orderEvent[buckey].Add(1) + } + ok, err := PourItemToBucket(bucket, holders[idx], buckets, &parsed) + + if bucket.orderEvent { + orderEvent[buckey].Wait() + } + if err != nil { return false, fmt.Errorf("failed to pour bucket: %w", err) } diff --git a/pkg/leakybucket/timemachine.go b/pkg/leakybucket/timemachine.go index de3c69d86..6e84797d4 100644 --- a/pkg/leakybucket/timemachine.go +++ b/pkg/leakybucket/timemachine.go @@ -35,7 +35,6 @@ func TimeMachinePour(l *Leaky, msg types.Event) { } l.Last_ts = d l.mutex.Unlock() - if l.Limiter.AllowN(d, 1) { l.logger.Tracef("Time-Pouring event %s (tokens:%f)", d, l.Limiter.GetTokensCount()) l.Queue.Add(msg)