This commit is contained in:
sabban 2023-07-07 17:03:56 +02:00
parent a6a0bb3af8
commit 0780d7229a
2 changed files with 8 additions and 1 deletions

View file

@ -246,6 +246,7 @@ func LeakRoutine(leaky *Leaky) error {
msg = processor.OnBucketPour(leaky.BucketConfig)(*msg, leaky)
// if &msg == nil we stop processing
if msg == nil {
orderEvent[leaky.Mapkey].Done()
goto End
}
}
@ -259,6 +260,7 @@ func LeakRoutine(leaky *Leaky) error {
for _, processor := range processors {
msg = processor.AfterBucketPour(leaky.BucketConfig)(*msg, leaky)
if msg == nil {
orderEvent[leaky.Mapkey].Done()
goto End
}
}
@ -278,7 +280,8 @@ func LeakRoutine(leaky *Leaky) error {
}
}
firstEvent = false
/*we overflowed*/
/*we overflowed*/
orderEvent[leaky.Mapkey].Done()
case ofw := <-leaky.Out:
leaky.overflow(ofw)
return nil

View file

@ -353,6 +353,10 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc
if orderEvent == nil {
orderEvent = make(map[string]*sync.WaitGroup)
}
if orderEvent[buckey] != nil {
orderEvent[buckey].Wait()
}
orderEvent[buckey] = &sync.WaitGroup{}
orderEvent[buckey].Add(1)
}