enforce event order
This commit is contained in:
parent
c7784f91a6
commit
2bd6e9e0d4
|
@ -70,6 +70,7 @@ type Flags struct {
|
||||||
WinSvc string
|
WinSvc string
|
||||||
DisableCAPI bool
|
DisableCAPI bool
|
||||||
Transform string
|
Transform string
|
||||||
|
OrderEvent bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type labelsMap map[string]string
|
type labelsMap map[string]string
|
||||||
|
@ -87,7 +88,7 @@ func LoadBuckets(cConfig *csconfig.Config) error {
|
||||||
buckets = leakybucket.NewBuckets()
|
buckets = leakybucket.NewBuckets()
|
||||||
|
|
||||||
log.Infof("Loading %d scenario files", len(files))
|
log.Infof("Loading %d scenario files", len(files))
|
||||||
holders, outputEventChan, err = leakybucket.LoadBuckets(cConfig.Crowdsec, files, &bucketsTomb, buckets)
|
holders, outputEventChan, err = leakybucket.LoadBuckets(cConfig.Crowdsec, files, &bucketsTomb, buckets, flags.OrderEvent)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("scenario loading failed: %v", err)
|
return fmt.Errorf("scenario loading failed: %v", err)
|
||||||
|
@ -110,7 +111,7 @@ func LoadAcquisition(cConfig *csconfig.Config) error {
|
||||||
|
|
||||||
dataSources, err = acquisition.LoadAcquisitionFromDSN(flags.OneShotDSN, flags.Labels, flags.Transform)
|
dataSources, err = acquisition.LoadAcquisitionFromDSN(flags.OneShotDSN, flags.Labels, flags.Transform)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to configure datasource for %s: %w", flags.OneShotDSN, err)
|
return errors.Wrapf(err, "failed to configure datasource for %s", flags.OneShotDSN)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
dataSources, err = acquisition.LoadAcquisitionFromFile(cConfig.Crowdsec)
|
dataSources, err = acquisition.LoadAcquisitionFromFile(cConfig.Crowdsec)
|
||||||
|
@ -119,10 +120,6 @@ func LoadAcquisition(cConfig *csconfig.Config) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(dataSources) == 0 {
|
|
||||||
return fmt.Errorf("no datasource enabled")
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,6 +161,7 @@ func (f *Flags) Parse() {
|
||||||
flag.BoolVar(&f.DisableAgent, "no-cs", false, "disable crowdsec agent")
|
flag.BoolVar(&f.DisableAgent, "no-cs", false, "disable crowdsec agent")
|
||||||
flag.BoolVar(&f.DisableAPI, "no-api", false, "disable local API")
|
flag.BoolVar(&f.DisableAPI, "no-api", false, "disable local API")
|
||||||
flag.BoolVar(&f.DisableCAPI, "no-capi", false, "disable communication with Central API")
|
flag.BoolVar(&f.DisableCAPI, "no-capi", false, "disable communication with Central API")
|
||||||
|
flag.BoolVar(&f.OrderEvent, "order-event", false, "enforce event ordering with significant perfomance cost")
|
||||||
if runtime.GOOS == "windows" {
|
if runtime.GOOS == "windows" {
|
||||||
flag.StringVar(&f.WinSvc, "winsvc", "", "Windows service Action: Install, Remove etc..")
|
flag.StringVar(&f.WinSvc, "winsvc", "", "Windows service Action: Install, Remove etc..")
|
||||||
}
|
}
|
||||||
|
@ -322,7 +320,7 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// some features can require configuration or command-line options,
|
// some features can require configuration or command-line options,
|
||||||
// so we need to parse them asap. we'll load from feature.yaml later.
|
// so wwe need to parse them asap. we'll load from feature.yaml later.
|
||||||
if err := csconfig.LoadFeatureFlagsEnv(log.StandardLogger()); err != nil {
|
if err := csconfig.LoadFeatureFlagsEnv(log.StandardLogger()); err != nil {
|
||||||
log.Fatalf("failed to set feature flags from environment: %s", err)
|
log.Fatalf("failed to set feature flags from environment: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,6 +74,7 @@ type BucketFactory struct {
|
||||||
tomb *tomb.Tomb `yaml:"-"`
|
tomb *tomb.Tomb `yaml:"-"`
|
||||||
wgPour *sync.WaitGroup `yaml:"-"`
|
wgPour *sync.WaitGroup `yaml:"-"`
|
||||||
wgDumpState *sync.WaitGroup `yaml:"-"`
|
wgDumpState *sync.WaitGroup `yaml:"-"`
|
||||||
|
orderEvent bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// we use one NameGenerator for all the future buckets
|
// we use one NameGenerator for all the future buckets
|
||||||
|
@ -178,7 +179,7 @@ func ValidateFactory(bucketFactory *BucketFactory) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadBuckets(cscfg *csconfig.CrowdsecServiceCfg, files []string, tomb *tomb.Tomb, buckets *Buckets) ([]BucketFactory, chan types.Event, error) {
|
func LoadBuckets(cscfg *csconfig.CrowdsecServiceCfg, files []string, tomb *tomb.Tomb, buckets *Buckets, orderEvent bool) ([]BucketFactory, chan types.Event, error) {
|
||||||
var (
|
var (
|
||||||
ret = []BucketFactory{}
|
ret = []BucketFactory{}
|
||||||
response chan types.Event
|
response chan types.Event
|
||||||
|
@ -256,6 +257,9 @@ func LoadBuckets(cscfg *csconfig.CrowdsecServiceCfg, files []string, tomb *tomb.
|
||||||
log.Errorf("Failed to load bucket %s : %v", bucketFactory.Name, err)
|
log.Errorf("Failed to load bucket %s : %v", bucketFactory.Name, err)
|
||||||
return nil, nil, fmt.Errorf("loading of %s failed : %v", bucketFactory.Name, err)
|
return nil, nil, fmt.Errorf("loading of %s failed : %v", bucketFactory.Name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bucketFactory.orderEvent = orderEvent
|
||||||
|
|
||||||
ret = append(ret, bucketFactory)
|
ret = append(ret, bucketFactory)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue