From 9f676844d9e1409ea16b29361610db63d73b27b4 Mon Sep 17 00:00:00 2001 From: Sebastien Blot Date: Wed, 10 Aug 2022 12:12:55 +0200 Subject: [PATCH] properly handle timemachine buckets --- pkg/leakybucket/blackhole.go | 34 ++++++++++++++++++---------------- pkg/leakybucket/manager_run.go | 23 +++++++++++++++++++---- 2 files changed, 37 insertions(+), 20 deletions(-) diff --git a/pkg/leakybucket/blackhole.go b/pkg/leakybucket/blackhole.go index 1803d717e..2d27bb61f 100644 --- a/pkg/leakybucket/blackhole.go +++ b/pkg/leakybucket/blackhole.go @@ -9,15 +9,14 @@ import ( "gopkg.in/tomb.v2" ) -type HiddenKey struct { - key string - expiration time.Time +type Blackhole struct { + duration time.Duration + DumbProcessor } -type Blackhole struct { - duration time.Duration - hiddenKeys []HiddenKey - DumbProcessor +type BlackholeExpiration struct { + blExpiration time.Time + cleanupExpiration time.Time //we need a separate expiration for the cleanup to properly handle timemachine buckets } func NewBlackhole(bucketFactory *BucketFactory) (*Blackhole, error) { @@ -28,7 +27,6 @@ func NewBlackhole(bucketFactory *BucketFactory) (*Blackhole, error) { } return &Blackhole{ duration: duration, - hiddenKeys: []HiddenKey{}, DumbProcessor: DumbProcessor{}, }, nil } @@ -46,8 +44,8 @@ func CleanupBlackhole(bucketsTomb *tomb.Tomb) error { return nil case <-ticker.C: BlackholeTracking.Range(func(key, value interface{}) bool { - expirationDate := value.(time.Time) - if expirationDate.Before(time.Now().UTC()) { + cleanupDate := value.(BlackholeExpiration).cleanupExpiration + if cleanupDate.Before(time.Now().UTC()) { log.Debugf("Expiring blackhole for %s", key) BlackholeTracking.Delete(key) } @@ -59,9 +57,11 @@ func CleanupBlackhole(bucketsTomb *tomb.Tomb) error { func (bl *Blackhole) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, types.RuntimeAlert, *Queue) (types.RuntimeAlert, *Queue) { return func(leaky *Leaky, alert types.RuntimeAlert, queue *Queue) (types.RuntimeAlert, *Queue) { - if expirationDate, ok := BlackholeTracking.Load(leaky.Mapkey); ok { - if expirationDate.(time.Time).After(time.Now().UTC()) { - leaky.logger.Debugf("Blackhole already triggered for %s", leaky.Mapkey) + + if expiration, ok := BlackholeTracking.Load(leaky.Mapkey); ok { + x := expiration.(BlackholeExpiration) + if x.blExpiration.After(leaky.Ovflw_ts) { + leaky.logger.Debugf("Blackhole already triggered for %s (remaining : %s", leaky.Mapkey, x.blExpiration.Sub(time.Now().UTC())) return types.RuntimeAlert{ Mapkey: leaky.Mapkey, }, nil @@ -71,10 +71,12 @@ func (bl *Blackhole) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, } } - BlackholeTracking.Store(leaky.Mapkey, time.Now().UTC().Add(bl.duration)) - - leaky.logger.Debugf("Blackhole triggered for %s", leaky.Mapkey) + BlackholeTracking.Store(leaky.Mapkey, BlackholeExpiration{ + blExpiration: leaky.Ovflw_ts.Add(bl.duration), + cleanupExpiration: time.Now().UTC().Add(bl.duration), + }) + leaky.logger.Debugf("Blackhole triggered for %s (expiration : %s)", leaky.Mapkey, leaky.Ovflw_ts.Add(bl.duration)) return alert, queue } } diff --git a/pkg/leakybucket/manager_run.go b/pkg/leakybucket/manager_run.go index 46e2ebb28..97778fa55 100644 --- a/pkg/leakybucket/manager_run.go +++ b/pkg/leakybucket/manager_run.go @@ -277,7 +277,7 @@ func LoadOrStoreBucketFromHolder(partitionKey string, buckets *Buckets, holder B //once the created goroutine is ready to process event, we can return it <-fresh_bucket.Signal } else { - holder.logger.Debugf("Unexpectedly found exisint bucket for %s", partitionKey) + holder.logger.Debugf("Unexpectedly found existing bucket for %s", partitionKey) biface = actual } holder.logger.Debugf("Created new bucket %s", partitionKey) @@ -345,9 +345,24 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc } } buckey := GetKey(holders[idx], groupby) - if _, ok := BlackholeTracking.Load(buckey); ok { - holders[idx].logger.Tracef("Event is blackholed: %s", buckey) - continue + if x, ok := BlackholeTracking.Load(buckey); ok { + holders[idx].logger.Debugf("Checking if blackhole has expired for %s", buckey) + blackholeExp := x.(BlackholeExpiration) + t := time.Now().UTC() + if parsed.ExpectMode == TIMEMACHINE { + //This is not optimal at all, date enrichment should also set parsed.Time to avoid parsing the date twice + err := t.UnmarshalText([]byte(parsed.MarshaledTime)) + if err != nil { + holders[idx].logger.Errorf("failed parsing time : %v", err) + } + holders[idx].logger.Debugf("Found TIMEMACHINE bucket, using %s as time, comparing against %s ", t, blackholeExp.blExpiration) + } + if blackholeExp.blExpiration.After(t) { + holders[idx].logger.Debugf("Event is blackholed: %s (remaining: %s)", buckey, blackholeExp.blExpiration.Sub(t)) + continue + } + holders[idx].logger.Debugf("Event is no longer blackholed: %s", buckey) + BlackholeTracking.Delete(buckey) } //we need to either find the existing bucket, or create a new one (if it's the first event to hit it for this partition key) bucket, err := LoadOrStoreBucketFromHolder(buckey, buckets, holders[idx], parsed.ExpectMode)