From 0780d7229abfc5213b3691622a33358b92b5da58 Mon Sep 17 00:00:00 2001 From: sabban Date: Fri, 7 Jul 2023 17:03:56 +0200 Subject: [PATCH] debug --- pkg/leakybucket/bucket.go | 5 ++++- pkg/leakybucket/manager_run.go | 4 ++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/leakybucket/bucket.go b/pkg/leakybucket/bucket.go index 500c7bc9b..2ed35cb67 100644 --- a/pkg/leakybucket/bucket.go +++ b/pkg/leakybucket/bucket.go @@ -246,6 +246,7 @@ func LeakRoutine(leaky *Leaky) error { msg = processor.OnBucketPour(leaky.BucketConfig)(*msg, leaky) // if &msg == nil we stop processing if msg == nil { + orderEvent[leaky.Mapkey].Done() goto End } } @@ -259,6 +260,7 @@ func LeakRoutine(leaky *Leaky) error { for _, processor := range processors { msg = processor.AfterBucketPour(leaky.BucketConfig)(*msg, leaky) if msg == nil { + orderEvent[leaky.Mapkey].Done() goto End } } @@ -278,7 +280,8 @@ func LeakRoutine(leaky *Leaky) error { } } firstEvent = false - /*we overflowed*/ + /*we overflowed*/ + orderEvent[leaky.Mapkey].Done() case ofw := <-leaky.Out: leaky.overflow(ofw) return nil diff --git a/pkg/leakybucket/manager_run.go b/pkg/leakybucket/manager_run.go index c7e3a67c1..0c568bd46 100644 --- a/pkg/leakybucket/manager_run.go +++ b/pkg/leakybucket/manager_run.go @@ -353,6 +353,10 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc if orderEvent == nil { orderEvent = make(map[string]*sync.WaitGroup) } + if orderEvent[buckey] != nil { + orderEvent[buckey].Wait() + } + orderEvent[buckey] = &sync.WaitGroup{} orderEvent[buckey].Add(1) }