diff --git a/pkg/leakybucket/manager_run.go b/pkg/leakybucket/manager_run.go index 5cd94efe5..6fb24829b 100644 --- a/pkg/leakybucket/manager_run.go +++ b/pkg/leakybucket/manager_run.go @@ -2,13 +2,14 @@ package leakybucket import ( "encoding/json" - "errors" "fmt" "io/ioutil" "math" "os" "time" + "github.com/pkg/errors" + "github.com/mohae/deepcopy" log "github.com/sirupsen/logrus" @@ -154,14 +155,137 @@ func ShutdownAllBuckets(buckets *Buckets) error { return nil } +func PourItemToBucket(bucket *Leaky, holder BucketFactory, buckets *Buckets, parsed types.Event) (bool, error) { + var sent bool + var buckey = bucket.Mapkey + var err error + + sigclosed := 0 + failed_sent := 0 + attempts := 0 + start := time.Now() + + for !sent { + attempts += 1 + /* Warn the user if we used more than a 100 ms to pour an event, it's at least an half lock*/ + if attempts%100000 == 0 && start.Add(100*time.Millisecond).Before(time.Now()) { + holder.logger.Warningf("stuck for %s sending event to %s (sigclosed:%d failed_sent:%d attempts:%d)", time.Since(start), + buckey, sigclosed, failed_sent, attempts) + } + + /* check if leak routine is up */ + select { + case _, ok := <-bucket.Signal: + if !ok { + //the bucket was found and dead, get a new one and continue + bucket.logger.Tracef("Bucket %s found dead, cleanup the body", buckey) + buckets.Bucket_map.Delete(buckey) + sigclosed += 1 + bucket, err = LoadOrStoreBucketFromHolder(buckey, buckets, holder, parsed.ExpectMode) + if err != nil { + return false, err + } + continue + } + holder.logger.Tracef("Signal exists, try to pour :)") + default: + /*nothing to read, but not closed, try to pour */ + holder.logger.Tracef("Signal exists but empty, try to pour :)") + } + + /*let's see if this time-bucket should have expired */ + if bucket.Mode == TIMEMACHINE { + bucket.mutex.Lock() + firstTs := bucket.First_ts + lastTs := bucket.Last_ts + bucket.mutex.Unlock() + + if !firstTs.IsZero() { + var d time.Time + err = d.UnmarshalText([]byte(parsed.MarshaledTime)) + if err != nil { + holder.logger.Warningf("Failed unmarshaling event time (%s) : %v", parsed.MarshaledTime, err) + } + if d.After(lastTs.Add(bucket.Duration)) { + bucket.logger.Tracef("bucket is expired (curr event: %s, bucket deadline: %s), kill", d, lastTs.Add(bucket.Duration)) + buckets.Bucket_map.Delete(buckey) + //not sure about this, should we create a new one ? + sigclosed += 1 + bucket, err = LoadOrStoreBucketFromHolder(buckey, buckets, holder, parsed.ExpectMode) + if err != nil { + return false, err + } + continue + } + } + } + /*the bucket seems to be up & running*/ + select { + case bucket.In <- parsed: + holder.logger.Tracef("Successfully sent !") + if BucketPourTrack { + if _, ok := BucketPourCache[bucket.Name]; !ok { + BucketPourCache[bucket.Name] = make([]types.Event, 0) + } + evt := deepcopy.Copy(parsed) + BucketPourCache[bucket.Name] = append(BucketPourCache[bucket.Name], evt.(types.Event)) + } + sent = true + continue + default: + failed_sent += 1 + holder.logger.Tracef("Failed to send, try again") + continue + + } + } + holder.logger.Debugf("bucket '%s' is poured", holder.Name) + return sent, nil +} + +func LoadOrStoreBucketFromHolder(partitionKey string, buckets *Buckets, holder BucketFactory, expectMode int) (*Leaky, error) { + + biface, ok := buckets.Bucket_map.Load(partitionKey) + + /* the bucket doesn't exist, create it !*/ + if !ok { + var fresh_bucket *Leaky + + switch expectMode { + case TIMEMACHINE: + fresh_bucket = NewTimeMachine(holder) + holder.logger.Debugf("Creating TimeMachine bucket") + case LIVE: + fresh_bucket = NewLeaky(holder) + holder.logger.Debugf("Creating Live bucket") + default: + return nil, fmt.Errorf("input event has no expected mode : %+v", expectMode) + } + fresh_bucket.In = make(chan types.Event) + fresh_bucket.Mapkey = partitionKey + fresh_bucket.Signal = make(chan bool, 1) + actual, stored := buckets.Bucket_map.LoadOrStore(partitionKey, fresh_bucket) + if !stored { + holder.tomb.Go(func() error { + return LeakRoutine(fresh_bucket) + }) + biface = fresh_bucket + //once the created goroutine is ready to process event, we can return it + <-fresh_bucket.Signal + } else { + holder.logger.Debugf("Unexpectedly found exisint bucket for %s", partitionKey) + biface = actual + } + holder.logger.Debugf("Created new bucket %s", partitionKey) + } + return biface.(*Leaky), nil +} + func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buckets) (bool, error) { var ( - ok, condition, sent bool - err error + ok, condition, poured bool ) - //synchronize with DumpBucketsStateAt - //to track bucket pour : track items that enter the pour routine if BucketPourTrack { if BucketPourCache == nil { BucketPourCache = make(map[string][]types.Event) @@ -173,8 +297,10 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc BucketPourCache["OK"] = append(BucketPourCache["OK"], evt.(types.Event)) } + //find the relevant holders (scenarios) for idx, holder := range holders { + //evaluate bucket's condition if holder.RunTimeFilter != nil { holder.logger.Tracef("event against holder %d/%d", idx, len(holders)) output, err := expr.Run(holder.RunTimeFilter, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed})) @@ -197,7 +323,7 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc } } - sent = false + //groupby determines the partition key for the specific bucket var groupby string if holder.RunTimeGroupBy != nil { tmpGroupBy, err := expr.Run(holder.RunTimeGroupBy, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed})) @@ -213,119 +339,19 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc } buckey := GetKey(holder, groupby) - sigclosed := 0 - keymiss := 0 - failed_sent := 0 - attempts := 0 - start := time.Now() - for !sent { - attempts += 1 - /* Warn the user if we used more than a 100 ms to pour an event, it's at least an half lock*/ - if attempts%100000 == 0 && start.Add(100*time.Millisecond).Before(time.Now()) { - holder.logger.Warningf("stuck for %s sending event to %s (sigclosed:%d keymiss:%d failed_sent:%d attempts:%d)", time.Since(start), - buckey, sigclosed, keymiss, failed_sent, attempts) - } - biface, ok := buckets.Bucket_map.Load(buckey) - //biface, bigout - /* the bucket doesn't exist, create it !*/ - if !ok { - /* - not found in map - */ - - holder.logger.Debugf("Creating bucket %s", buckey) - keymiss += 1 - var fresh_bucket *Leaky - - switch parsed.ExpectMode { - case TIMEMACHINE: - fresh_bucket = NewTimeMachine(holder) - holder.logger.Debugf("Creating TimeMachine bucket") - case LIVE: - fresh_bucket = NewLeaky(holder) - holder.logger.Debugf("Creating Live bucket") - default: - holder.logger.Fatalf("input event has no expected mode, malformed : %+v", parsed) - } - fresh_bucket.In = make(chan types.Event) - fresh_bucket.Mapkey = buckey - fresh_bucket.Signal = make(chan bool, 1) - buckets.Bucket_map.Store(buckey, fresh_bucket) - holder.tomb.Go(func() error { - return LeakRoutine(fresh_bucket) - }) - - holder.logger.Debugf("Created new bucket %s", buckey) - - //wait for signal to be opened - <-fresh_bucket.Signal - continue - } - - bucket := biface.(*Leaky) - /* check if leak routine is up */ - select { - case _, ok := <-bucket.Signal: - if !ok { - //it's closed, delete it - bucket.logger.Debugf("Bucket %s found dead, cleanup the body", buckey) - buckets.Bucket_map.Delete(buckey) - sigclosed += 1 - continue - } - holder.logger.Tracef("Signal exists, try to pour :)") - - default: - /*nothing to read, but not closed, try to pour */ - holder.logger.Tracef("Signal exists but empty, try to pour :)") - - } - /*let's see if this time-bucket should have expired */ - if bucket.Mode == TIMEMACHINE { - bucket.mutex.Lock() - firstTs := bucket.First_ts - lastTs := bucket.Last_ts - bucket.mutex.Unlock() - - if !firstTs.IsZero() { - var d time.Time - err = d.UnmarshalText([]byte(parsed.MarshaledTime)) - if err != nil { - holder.logger.Warningf("Failed unmarshaling event time (%s) : %v", parsed.MarshaledTime, err) - } - if d.After(lastTs.Add(bucket.Duration)) { - bucket.logger.Tracef("bucket is expired (curr event: %s, bucket deadline: %s), kill", d, lastTs.Add(bucket.Duration)) - buckets.Bucket_map.Delete(buckey) - continue - } - } - } - /*if we're here, let's try to pour */ - - select { - case bucket.In <- parsed: - holder.logger.Tracef("Successfully sent !") - //and track item poured to each bucket - if BucketPourTrack { - if _, ok := BucketPourCache[bucket.Name]; !ok { - BucketPourCache[bucket.Name] = make([]types.Event, 0) - } - evt := deepcopy.Copy(parsed) - BucketPourCache[bucket.Name] = append(BucketPourCache[bucket.Name], evt.(types.Event)) - } - - //sent was successful ! - sent = true - continue - default: - failed_sent += 1 - holder.logger.Tracef("Failed to send, try again") - continue - - } + //we need to either find the existing bucket, or create a new one (if it's the first event to hit it for this partition key) + bucket, err := LoadOrStoreBucketFromHolder(buckey, buckets, holder, parsed.ExpectMode) + if err != nil { + return false, errors.Wrap(err, "failed to load or store bucket") + } + //finally, pour the even into the bucket + ok, err := PourItemToBucket(bucket, holder, buckets, parsed) + if err != nil { + return false, errors.Wrap(err, "failed to pour bucket") + } + if ok { + poured = true } - - holder.logger.Debugf("bucket '%s' is poured", holder.Name) } - return sent, nil + return poured, nil }