This commit is contained in:
sabban 2023-07-10 12:01:39 +02:00
parent 0780d7229a
commit 28a939c805

View file

@ -246,7 +246,9 @@ func LeakRoutine(leaky *Leaky) error {
msg = processor.OnBucketPour(leaky.BucketConfig)(*msg, leaky)
// if &msg == nil we stop processing
if msg == nil {
orderEvent[leaky.Mapkey].Done()
if leaky.orderEvent {
orderEvent[leaky.Mapkey].Done()
}
goto End
}
}
@ -260,7 +262,9 @@ func LeakRoutine(leaky *Leaky) error {
for _, processor := range processors {
msg = processor.AfterBucketPour(leaky.BucketConfig)(*msg, leaky)
if msg == nil {
orderEvent[leaky.Mapkey].Done()
if leaky.orderEvent {
orderEvent[leaky.Mapkey].Done()
}
goto End
}
}
@ -281,7 +285,9 @@ func LeakRoutine(leaky *Leaky) error {
}
firstEvent = false
/*we overflowed*/
orderEvent[leaky.Mapkey].Done()
if leaky.orderEvent {
orderEvent[leaky.Mapkey].Done()
}
case ofw := <-leaky.Out:
leaky.overflow(ofw)
return nil