fix the ci by adding the ability to enforce event ordering (#2347)

* fix the ci by adding the ability to enforce event ordering
This commit is contained in:
Manuel Sabban 2023-07-20 11:41:30 +02:00 committed by GitHub
parent 3c16139c44
commit 9ac5aeda79
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 52 additions and 14 deletions

View file

@ -70,6 +70,7 @@ type Flags struct {
WinSvc string WinSvc string
DisableCAPI bool DisableCAPI bool
Transform string Transform string
OrderEvent bool
} }
type labelsMap map[string]string type labelsMap map[string]string
@ -87,7 +88,7 @@ func LoadBuckets(cConfig *csconfig.Config) error {
buckets = leakybucket.NewBuckets() buckets = leakybucket.NewBuckets()
log.Infof("Loading %d scenario files", len(files)) log.Infof("Loading %d scenario files", len(files))
holders, outputEventChan, err = leakybucket.LoadBuckets(cConfig.Crowdsec, files, &bucketsTomb, buckets) holders, outputEventChan, err = leakybucket.LoadBuckets(cConfig.Crowdsec, files, &bucketsTomb, buckets, flags.OrderEvent)
if err != nil { if err != nil {
return fmt.Errorf("scenario loading failed: %v", err) return fmt.Errorf("scenario loading failed: %v", err)
@ -110,7 +111,7 @@ func LoadAcquisition(cConfig *csconfig.Config) error {
dataSources, err = acquisition.LoadAcquisitionFromDSN(flags.OneShotDSN, flags.Labels, flags.Transform) dataSources, err = acquisition.LoadAcquisitionFromDSN(flags.OneShotDSN, flags.Labels, flags.Transform)
if err != nil { if err != nil {
return fmt.Errorf("failed to configure datasource for %s: %w", flags.OneShotDSN, err) return errors.Wrapf(err, "failed to configure datasource for %s", flags.OneShotDSN)
} }
} else { } else {
dataSources, err = acquisition.LoadAcquisitionFromFile(cConfig.Crowdsec) dataSources, err = acquisition.LoadAcquisitionFromFile(cConfig.Crowdsec)
@ -164,6 +165,7 @@ func (f *Flags) Parse() {
flag.BoolVar(&f.DisableAgent, "no-cs", false, "disable crowdsec agent") flag.BoolVar(&f.DisableAgent, "no-cs", false, "disable crowdsec agent")
flag.BoolVar(&f.DisableAPI, "no-api", false, "disable local API") flag.BoolVar(&f.DisableAPI, "no-api", false, "disable local API")
flag.BoolVar(&f.DisableCAPI, "no-capi", false, "disable communication with Central API") flag.BoolVar(&f.DisableCAPI, "no-capi", false, "disable communication with Central API")
flag.BoolVar(&f.OrderEvent, "order-event", false, "enforce event ordering with significant performance cost")
if runtime.GOOS == "windows" { if runtime.GOOS == "windows" {
flag.StringVar(&f.WinSvc, "winsvc", "", "Windows service Action: Install, Remove etc..") flag.StringVar(&f.WinSvc, "winsvc", "", "Windows service Action: Install, Remove etc..")
} }

View file

@ -15,13 +15,13 @@ import (
) )
type HubTestItemConfig struct { type HubTestItemConfig struct {
Parsers []string `yaml:"parsers"` Parsers []string `yaml:"parsers"`
Scenarios []string `yaml:"scenarios"` Scenarios []string `yaml:"scenarios"`
PostOVerflows []string `yaml:"postoverflows"` PostOVerflows []string `yaml:"postoverflows"`
LogFile string `yaml:"log_file"` LogFile string `yaml:"log_file"`
LogType string `yaml:"log_type"` LogType string `yaml:"log_type"`
Labels map[string]string `yaml:"labels"` Labels map[string]string `yaml:"labels"`
IgnoreParsers bool `yaml:"ignore_parsers"` // if we test a scenario, we don't want to assert on Parser IgnoreParsers bool `yaml:"ignore_parsers"` // if we test a scenario, we don't want to assert on Parser
OverrideStatics []parser.ExtraField `yaml:"override_statics"` //Allow to override statics. Executed before s00 OverrideStatics []parser.ExtraField `yaml:"override_statics"` //Allow to override statics. Executed before s00
} }
@ -530,7 +530,7 @@ func (t *HubTestItem) Run() error {
} }
} }
cmdArgs = []string{"-c", t.RuntimeConfigFilePath, "-type", logType, "-dsn", dsn, "-dump-data", t.ResultsPath} cmdArgs = []string{"-c", t.RuntimeConfigFilePath, "-type", logType, "-dsn", dsn, "-dump-data", t.ResultsPath, "-order-event"}
for labelKey, labelValue := range t.Config.Labels { for labelKey, labelValue := range t.Config.Labels {
arg := fmt.Sprintf("%s:%s", labelKey, labelValue) arg := fmt.Sprintf("%s:%s", labelKey, labelValue)
cmdArgs = append(cmdArgs, "-label", arg) cmdArgs = append(cmdArgs, "-label", arg)

View file

@ -70,6 +70,7 @@ type Leaky struct {
wgPour *sync.WaitGroup wgPour *sync.WaitGroup
wgDumpState *sync.WaitGroup wgDumpState *sync.WaitGroup
mutex *sync.Mutex //used only for TIMEMACHINE mode to allow garbage collection without races mutex *sync.Mutex //used only for TIMEMACHINE mode to allow garbage collection without races
orderEvent bool
} }
var BucketsPour = prometheus.NewCounterVec( var BucketsPour = prometheus.NewCounterVec(
@ -178,6 +179,7 @@ func FromFactory(bucketFactory BucketFactory) *Leaky {
wgPour: bucketFactory.wgPour, wgPour: bucketFactory.wgPour,
wgDumpState: bucketFactory.wgDumpState, wgDumpState: bucketFactory.wgDumpState,
mutex: &sync.Mutex{}, mutex: &sync.Mutex{},
orderEvent: bucketFactory.orderEvent,
} }
if l.BucketConfig.Capacity > 0 && l.BucketConfig.leakspeed != time.Duration(0) { if l.BucketConfig.Capacity > 0 && l.BucketConfig.leakspeed != time.Duration(0) {
l.Duration = time.Duration(l.BucketConfig.Capacity+1) * l.BucketConfig.leakspeed l.Duration = time.Duration(l.BucketConfig.Capacity+1) * l.BucketConfig.leakspeed
@ -245,6 +247,9 @@ func LeakRoutine(leaky *Leaky) error {
msg = processor.OnBucketPour(leaky.BucketConfig)(*msg, leaky) msg = processor.OnBucketPour(leaky.BucketConfig)(*msg, leaky)
// if &msg == nil we stop processing // if &msg == nil we stop processing
if msg == nil { if msg == nil {
if leaky.orderEvent {
orderEvent[leaky.Mapkey].Done()
}
goto End goto End
} }
} }
@ -258,6 +263,9 @@ func LeakRoutine(leaky *Leaky) error {
for _, processor := range processors { for _, processor := range processors {
msg = processor.AfterBucketPour(leaky.BucketConfig)(*msg, leaky) msg = processor.AfterBucketPour(leaky.BucketConfig)(*msg, leaky)
if msg == nil { if msg == nil {
if leaky.orderEvent {
orderEvent[leaky.Mapkey].Done()
}
goto End goto End
} }
} }
@ -277,7 +285,10 @@ func LeakRoutine(leaky *Leaky) error {
} }
} }
firstEvent = false firstEvent = false
/*we overflowed*/ /*we overflowed*/
if leaky.orderEvent {
orderEvent[leaky.Mapkey].Done()
}
case ofw := <-leaky.Out: case ofw := <-leaky.Out:
leaky.overflow(ofw) leaky.overflow(ofw)
return nil return nil

View file

@ -115,7 +115,7 @@ func testOneBucket(t *testing.T, dir string, tomb *tomb.Tomb) error {
cscfg := &csconfig.CrowdsecServiceCfg{ cscfg := &csconfig.CrowdsecServiceCfg{
DataDir: "tests", DataDir: "tests",
} }
holders, response, err := LoadBuckets(cscfg, files, tomb, buckets) holders, response, err := LoadBuckets(cscfg, files, tomb, buckets, false)
if err != nil { if err != nil {
t.Fatalf("failed loading bucket : %s", err) t.Fatalf("failed loading bucket : %s", err)
} }

View file

@ -74,6 +74,7 @@ type BucketFactory struct {
tomb *tomb.Tomb `yaml:"-"` tomb *tomb.Tomb `yaml:"-"`
wgPour *sync.WaitGroup `yaml:"-"` wgPour *sync.WaitGroup `yaml:"-"`
wgDumpState *sync.WaitGroup `yaml:"-"` wgDumpState *sync.WaitGroup `yaml:"-"`
orderEvent bool
} }
// we use one NameGenerator for all the future buckets // we use one NameGenerator for all the future buckets
@ -178,7 +179,7 @@ func ValidateFactory(bucketFactory *BucketFactory) error {
return nil return nil
} }
func LoadBuckets(cscfg *csconfig.CrowdsecServiceCfg, files []string, tomb *tomb.Tomb, buckets *Buckets) ([]BucketFactory, chan types.Event, error) { func LoadBuckets(cscfg *csconfig.CrowdsecServiceCfg, files []string, tomb *tomb.Tomb, buckets *Buckets, orderEvent bool) ([]BucketFactory, chan types.Event, error) {
var ( var (
ret = []BucketFactory{} ret = []BucketFactory{}
response chan types.Event response chan types.Event
@ -256,6 +257,9 @@ func LoadBuckets(cscfg *csconfig.CrowdsecServiceCfg, files []string, tomb *tomb.
log.Errorf("Failed to load bucket %s : %v", bucketFactory.Name, err) log.Errorf("Failed to load bucket %s : %v", bucketFactory.Name, err)
return nil, nil, fmt.Errorf("loading of %s failed : %v", bucketFactory.Name, err) return nil, nil, fmt.Errorf("loading of %s failed : %v", bucketFactory.Name, err)
} }
bucketFactory.orderEvent = orderEvent
ret = append(ret, bucketFactory) ret = append(ret, bucketFactory)
} }
} }

View file

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"math" "math"
"os" "os"
"sync"
"time" "time"
"github.com/antonmedv/expr" "github.com/antonmedv/expr"
@ -279,6 +280,8 @@ func LoadOrStoreBucketFromHolder(partitionKey string, buckets *Buckets, holder B
return biface.(*Leaky), nil return biface.(*Leaky), nil
} }
var orderEvent map[string]*sync.WaitGroup
func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buckets) (bool, error) { func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buckets) (bool, error) {
var ( var (
ok, condition, poured bool ok, condition, poured bool
@ -344,7 +347,26 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc
return false, fmt.Errorf("failed to load or store bucket: %w", err) return false, fmt.Errorf("failed to load or store bucket: %w", err)
} }
//finally, pour the even into the bucket //finally, pour the even into the bucket
if bucket.orderEvent {
if orderEvent == nil {
orderEvent = make(map[string]*sync.WaitGroup)
}
if orderEvent[buckey] != nil {
orderEvent[buckey].Wait()
} else {
orderEvent[buckey] = &sync.WaitGroup{}
}
orderEvent[buckey].Add(1)
}
ok, err := PourItemToBucket(bucket, holders[idx], buckets, &parsed) ok, err := PourItemToBucket(bucket, holders[idx], buckets, &parsed)
if bucket.orderEvent {
orderEvent[buckey].Wait()
}
if err != nil { if err != nil {
return false, fmt.Errorf("failed to pour bucket: %w", err) return false, fmt.Errorf("failed to pour bucket: %w", err)
} }

View file

@ -35,7 +35,6 @@ func TimeMachinePour(l *Leaky, msg types.Event) {
} }
l.Last_ts = d l.Last_ts = d
l.mutex.Unlock() l.mutex.Unlock()
if l.Limiter.AllowN(d, 1) { if l.Limiter.AllowN(d, 1) {
l.logger.Tracef("Time-Pouring event %s (tokens:%f)", d, l.Limiter.GetTokensCount()) l.logger.Tracef("Time-Pouring event %s (tokens:%f)", d, l.Limiter.GetTokensCount())
l.Queue.Add(msg) l.Queue.Add(msg)