use a copy of bucket processors in LeakRoutine (#1902)
This commit is contained in:
parent
104f5d1fe6
commit
60f1228030
|
@ -9,6 +9,7 @@ import (
|
||||||
//"log"
|
//"log"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/time/rate"
|
"github.com/crowdsecurity/crowdsec/pkg/time/rate"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||||
|
"github.com/mohae/deepcopy"
|
||||||
"gopkg.in/tomb.v2"
|
"gopkg.in/tomb.v2"
|
||||||
|
|
||||||
//rate "time/rate"
|
//rate "time/rate"
|
||||||
|
@ -208,11 +209,17 @@ func LeakRoutine(leaky *Leaky) error {
|
||||||
/*todo : we create a logger at runtime while we want leakroutine to be up asap, might not be a good idea*/
|
/*todo : we create a logger at runtime while we want leakroutine to be up asap, might not be a good idea*/
|
||||||
leaky.logger = leaky.BucketConfig.logger.WithFields(log.Fields{"capacity": leaky.Capacity, "partition": leaky.Mapkey, "bucket_id": leaky.Uuid})
|
leaky.logger = leaky.BucketConfig.logger.WithFields(log.Fields{"capacity": leaky.Capacity, "partition": leaky.Mapkey, "bucket_id": leaky.Uuid})
|
||||||
|
|
||||||
|
//We copy the processors, as they are coming from the BucketFactory, and thus are shared between buckets
|
||||||
|
//If we don't copy, processors using local cache (such as Uniq) are subject to race conditions
|
||||||
|
//This can lead to creating buckets that will discard their first events, preventing the underflow ticker from being initialized
|
||||||
|
//and preventing them from being destroyed
|
||||||
|
processors := deepcopy.Copy(leaky.BucketConfig.processors).([]Processor)
|
||||||
|
|
||||||
leaky.Signal <- true
|
leaky.Signal <- true
|
||||||
atomic.AddInt64(&LeakyRoutineCount, 1)
|
atomic.AddInt64(&LeakyRoutineCount, 1)
|
||||||
defer atomic.AddInt64(&LeakyRoutineCount, -1)
|
defer atomic.AddInt64(&LeakyRoutineCount, -1)
|
||||||
|
|
||||||
for _, f := range leaky.BucketConfig.processors {
|
for _, f := range processors {
|
||||||
err := f.OnBucketInit(leaky.BucketConfig)
|
err := f.OnBucketInit(leaky.BucketConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
leaky.logger.Errorf("Problem at bucket initializiation. Bail out %T : %v", f, err)
|
leaky.logger.Errorf("Problem at bucket initializiation. Bail out %T : %v", f, err)
|
||||||
|
@ -227,7 +234,7 @@ func LeakRoutine(leaky *Leaky) error {
|
||||||
/*receiving an event*/
|
/*receiving an event*/
|
||||||
case msg := <-leaky.In:
|
case msg := <-leaky.In:
|
||||||
/*the msg var use is confusing and is redeclared in a different type :/*/
|
/*the msg var use is confusing and is redeclared in a different type :/*/
|
||||||
for _, processor := range leaky.BucketConfig.processors {
|
for _, processor := range processors {
|
||||||
msg = processor.OnBucketPour(leaky.BucketConfig)(*msg, leaky)
|
msg = processor.OnBucketPour(leaky.BucketConfig)(*msg, leaky)
|
||||||
// if &msg == nil we stop processing
|
// if &msg == nil we stop processing
|
||||||
if msg == nil {
|
if msg == nil {
|
||||||
|
|
Loading…
Reference in a new issue