diff --git a/pkg/leakybucket/bucket.go b/pkg/leakybucket/bucket.go index 7f2be17a6..1c508aab9 100644 --- a/pkg/leakybucket/bucket.go +++ b/pkg/leakybucket/bucket.go @@ -9,6 +9,7 @@ import ( //"log" "github.com/crowdsecurity/crowdsec/pkg/time/rate" "github.com/crowdsecurity/crowdsec/pkg/types" + "github.com/mohae/deepcopy" "gopkg.in/tomb.v2" //rate "time/rate" @@ -208,11 +209,17 @@ func LeakRoutine(leaky *Leaky) error { /*todo : we create a logger at runtime while we want leakroutine to be up asap, might not be a good idea*/ leaky.logger = leaky.BucketConfig.logger.WithFields(log.Fields{"capacity": leaky.Capacity, "partition": leaky.Mapkey, "bucket_id": leaky.Uuid}) + //We copy the processors, as they are coming from the BucketFactory, and thus are shared between buckets + //If we don't copy, processors using local cache (such as Uniq) are subject to race conditions + //This can lead to creating buckets that will discard their first events, preventing the underflow ticker from being initialized + //and preventing them from being destroyed + processors := deepcopy.Copy(leaky.BucketConfig.processors).([]Processor) + leaky.Signal <- true atomic.AddInt64(&LeakyRoutineCount, 1) defer atomic.AddInt64(&LeakyRoutineCount, -1) - for _, f := range leaky.BucketConfig.processors { + for _, f := range processors { err := f.OnBucketInit(leaky.BucketConfig) if err != nil { leaky.logger.Errorf("Problem at bucket initializiation. Bail out %T : %v", f, err) @@ -227,7 +234,7 @@ func LeakRoutine(leaky *Leaky) error { /*receiving an event*/ case msg := <-leaky.In: /*the msg var use is confusing and is redeclared in a different type :/*/ - for _, processor := range leaky.BucketConfig.processors { + for _, processor := range processors { msg = processor.OnBucketPour(leaky.BucketConfig)(*msg, leaky) // if &msg == nil we stop processing if msg == nil {