From 581ddf78fcd620f3aa03d9edede1150b8854b171 Mon Sep 17 00:00:00 2001 From: "Thibault \"bui\" Koechlin" Date: Mon, 13 Jun 2022 14:41:05 +0200 Subject: [PATCH] Performance improvements (#1583) * fix concurrent map write on distinct cache * cache compiled expressions for groupby and cancel_on filters * limit objects copy when it's going to lock a shared goroutine --- pkg/leakybucket/bucket.go | 6 ++-- pkg/leakybucket/manager_load.go | 2 +- pkg/leakybucket/manager_run.go | 49 ++++++++++++++++---------------- pkg/leakybucket/reset_filter.go | 50 +++++++++++++++++++++++++++------ pkg/leakybucket/uniq.go | 22 ++++++++++++++- 5 files changed, 92 insertions(+), 37 deletions(-) diff --git a/pkg/leakybucket/bucket.go b/pkg/leakybucket/bucket.go index a51e9322f..48e5a4868 100644 --- a/pkg/leakybucket/bucket.go +++ b/pkg/leakybucket/bucket.go @@ -35,7 +35,7 @@ type Leaky struct { //Queue is used to held the cache of objects in the bucket, it is used to know 'how many' objects we have in buffer. Queue *Queue //Leaky buckets are receiving message through a chan - In chan types.Event `json:"-"` + In chan *types.Event `json:"-"` //Leaky buckets are pushing their overflows through a chan Out chan *Queue `json:"-"` // shared for all buckets (the idea is to kill this afterwards) @@ -227,7 +227,7 @@ func LeakRoutine(leaky *Leaky) error { case msg := <-leaky.In: /*the msg var use is confusing and is redeclared in a different type :/*/ for _, processor := range leaky.BucketConfig.processors { - msg := processor.OnBucketPour(leaky.BucketConfig)(msg, leaky) + msg := processor.OnBucketPour(leaky.BucketConfig)(*msg, leaky) // if &msg == nil we stop processing if msg == nil { goto End @@ -238,7 +238,7 @@ func LeakRoutine(leaky *Leaky) error { } BucketsPour.With(prometheus.Labels{"name": leaky.Name, "source": msg.Line.Src, "type": msg.Line.Module}).Inc() - leaky.Pour(leaky, msg) // glue for now + leaky.Pour(leaky, *msg) // glue for now //Clear cache on behalf of pour tmp := time.NewTicker(leaky.Duration) durationTicker = tmp.C diff --git a/pkg/leakybucket/manager_load.go b/pkg/leakybucket/manager_load.go index e1da04012..759247c6f 100644 --- a/pkg/leakybucket/manager_load.go +++ b/pkg/leakybucket/manager_load.go @@ -385,7 +385,7 @@ func LoadBucketsState(file string, buckets *Buckets, bucketFactories []BucketFac tbucket.Queue = v.Queue /*Trying to set the limiter to the saved values*/ tbucket.Limiter.Load(v.SerializedState) - tbucket.In = make(chan types.Event) + tbucket.In = make(chan *types.Event) tbucket.Mapkey = k tbucket.Signal = make(chan bool, 1) tbucket.First_ts = v.First_ts diff --git a/pkg/leakybucket/manager_run.go b/pkg/leakybucket/manager_run.go index b7363e4b4..78739afd6 100644 --- a/pkg/leakybucket/manager_run.go +++ b/pkg/leakybucket/manager_run.go @@ -154,7 +154,7 @@ func ShutdownAllBuckets(buckets *Buckets) error { return nil } -func PourItemToBucket(bucket *Leaky, holder BucketFactory, buckets *Buckets, parsed types.Event) (bool, error) { +func PourItemToBucket(bucket *Leaky, holder BucketFactory, buckets *Buckets, parsed *types.Event) (bool, error) { var sent bool var buckey = bucket.Mapkey var err error @@ -186,10 +186,10 @@ func PourItemToBucket(bucket *Leaky, holder BucketFactory, buckets *Buckets, par } continue } - holder.logger.Tracef("Signal exists, try to pour :)") + //holder.logger.Tracef("Signal exists, try to pour :)") default: /*nothing to read, but not closed, try to pour */ - holder.logger.Tracef("Signal exists but empty, try to pour :)") + //holder.logger.Tracef("Signal exists but empty, try to pour :)") } /*let's see if this time-bucket should have expired */ @@ -221,19 +221,19 @@ func PourItemToBucket(bucket *Leaky, holder BucketFactory, buckets *Buckets, par /*the bucket seems to be up & running*/ select { case bucket.In <- parsed: - holder.logger.Tracef("Successfully sent !") + //holder.logger.Tracef("Successfully sent !") if BucketPourTrack { if _, ok := BucketPourCache[bucket.Name]; !ok { BucketPourCache[bucket.Name] = make([]types.Event, 0) } - evt := deepcopy.Copy(parsed) + evt := deepcopy.Copy(*parsed) BucketPourCache[bucket.Name] = append(BucketPourCache[bucket.Name], evt.(types.Event)) } sent = true continue default: failed_sent += 1 - holder.logger.Tracef("Failed to send, try again") + //holder.logger.Tracef("Failed to send, try again") continue } @@ -260,7 +260,7 @@ func LoadOrStoreBucketFromHolder(partitionKey string, buckets *Buckets, holder B default: return nil, fmt.Errorf("input event has no expected mode : %+v", expectMode) } - fresh_bucket.In = make(chan types.Event) + fresh_bucket.In = make(chan *types.Event) fresh_bucket.Mapkey = partitionKey fresh_bucket.Signal = make(chan bool, 1) actual, stored := buckets.Bucket_map.LoadOrStore(partitionKey, fresh_bucket) @@ -299,54 +299,55 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc cachedExprEnv := exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed}) //find the relevant holders (scenarios) - for idx, holder := range holders { + for idx := 0; idx < len(holders); idx++ { + //for idx, holder := range holders { //evaluate bucket's condition - if holder.RunTimeFilter != nil { - holder.logger.Tracef("event against holder %d/%d", idx, len(holders)) - output, err := expr.Run(holder.RunTimeFilter, cachedExprEnv) + if holders[idx].RunTimeFilter != nil { + holders[idx].logger.Tracef("event against holder %d/%d", idx, len(holders)) + output, err := expr.Run(holders[idx].RunTimeFilter, cachedExprEnv) if err != nil { - holder.logger.Errorf("failed parsing : %v", err) + holders[idx].logger.Errorf("failed parsing : %v", err) return false, fmt.Errorf("leaky failed : %s", err) } // we assume we a bool should add type check here if condition, ok = output.(bool); !ok { - holder.logger.Errorf("unexpected non-bool return : %T", output) - holder.logger.Fatalf("Filter issue") + holders[idx].logger.Errorf("unexpected non-bool return : %T", output) + holders[idx].logger.Fatalf("Filter issue") } - if holder.Debug { - holder.ExprDebugger.Run(holder.logger, condition, cachedExprEnv) + if holders[idx].Debug { + holders[idx].ExprDebugger.Run(holders[idx].logger, condition, cachedExprEnv) } if !condition { - holder.logger.Debugf("Event leaving node : ko (filter mismatch)") + holders[idx].logger.Debugf("Event leaving node : ko (filter mismatch)") continue } } //groupby determines the partition key for the specific bucket var groupby string - if holder.RunTimeGroupBy != nil { - tmpGroupBy, err := expr.Run(holder.RunTimeGroupBy, cachedExprEnv) + if holders[idx].RunTimeGroupBy != nil { + tmpGroupBy, err := expr.Run(holders[idx].RunTimeGroupBy, cachedExprEnv) if err != nil { - holder.logger.Errorf("failed groupby : %v", err) + holders[idx].logger.Errorf("failed groupby : %v", err) return false, errors.New("leaky failed :/") } if groupby, ok = tmpGroupBy.(string); !ok { - holder.logger.Fatalf("failed groupby type : %v", err) + holders[idx].logger.Fatalf("failed groupby type : %v", err) return false, errors.New("groupby wrong type") } } - buckey := GetKey(holder, groupby) + buckey := GetKey(holders[idx], groupby) //we need to either find the existing bucket, or create a new one (if it's the first event to hit it for this partition key) - bucket, err := LoadOrStoreBucketFromHolder(buckey, buckets, holder, parsed.ExpectMode) + bucket, err := LoadOrStoreBucketFromHolder(buckey, buckets, holders[idx], parsed.ExpectMode) if err != nil { return false, errors.Wrap(err, "failed to load or store bucket") } //finally, pour the even into the bucket - ok, err := PourItemToBucket(bucket, holder, buckets, parsed) + ok, err := PourItemToBucket(bucket, holders[idx], buckets, &parsed) if err != nil { return false, errors.Wrap(err, "failed to pour bucket") } diff --git a/pkg/leakybucket/reset_filter.go b/pkg/leakybucket/reset_filter.go index 0d42f49d7..c7430c3e8 100644 --- a/pkg/leakybucket/reset_filter.go +++ b/pkg/leakybucket/reset_filter.go @@ -1,6 +1,8 @@ package leakybucket import ( + "sync" + "github.com/antonmedv/expr" "github.com/antonmedv/expr/vm" @@ -21,6 +23,12 @@ type CancelOnFilter struct { CancelOnFilterDebug *exprhelpers.ExprDebugger } +var cancelExprCacheLock sync.Mutex +var cancelExprCache map[string]struct { + CancelOnFilter *vm.Program + CancelOnFilterDebug *exprhelpers.ExprDebugger +} + func (u *CancelOnFilter) OnBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event { return func(msg types.Event, leaky *Leaky) *types.Event { var condition, ok bool @@ -58,18 +66,44 @@ func (u *CancelOnFilter) OnBucketOverflow(bucketFactory *BucketFactory) func(*Le func (u *CancelOnFilter) OnBucketInit(bucketFactory *BucketFactory) error { var err error - - u.CancelOnFilter, err = expr.Compile(bucketFactory.CancelOnFilter, expr.Env(exprhelpers.GetExprEnv(map[string]interface{}{"evt": &types.Event{}}))) - if err != nil { - bucketFactory.logger.Errorf("reset_filter compile error : %s", err) - return err + var compiledExpr struct { + CancelOnFilter *vm.Program + CancelOnFilterDebug *exprhelpers.ExprDebugger } - if bucketFactory.Debug { - u.CancelOnFilterDebug, err = exprhelpers.NewDebugger(bucketFactory.CancelOnFilter, expr.Env(exprhelpers.GetExprEnv(map[string]interface{}{"evt": &types.Event{}}))) + + if cancelExprCache == nil { + cancelExprCache = make(map[string]struct { + CancelOnFilter *vm.Program + CancelOnFilterDebug *exprhelpers.ExprDebugger + }) + } + + cancelExprCacheLock.Lock() + if compiled, ok := cancelExprCache[bucketFactory.CancelOnFilter]; ok { + cancelExprCacheLock.Unlock() + u.CancelOnFilter = compiled.CancelOnFilter + u.CancelOnFilterDebug = compiled.CancelOnFilterDebug + return nil + } else { + cancelExprCacheLock.Unlock() + //release the lock during compile + compiledExpr.CancelOnFilter, err = expr.Compile(bucketFactory.CancelOnFilter, expr.Env(exprhelpers.GetExprEnv(map[string]interface{}{"evt": &types.Event{}}))) if err != nil { - bucketFactory.logger.Errorf("reset_filter debug error : %s", err) + bucketFactory.logger.Errorf("reset_filter compile error : %s", err) return err } + u.CancelOnFilter = compiledExpr.CancelOnFilter + if bucketFactory.Debug { + compiledExpr.CancelOnFilterDebug, err = exprhelpers.NewDebugger(bucketFactory.CancelOnFilter, expr.Env(exprhelpers.GetExprEnv(map[string]interface{}{"evt": &types.Event{}}))) + if err != nil { + bucketFactory.logger.Errorf("reset_filter debug error : %s", err) + return err + } + u.CancelOnFilterDebug = compiledExpr.CancelOnFilterDebug + } + cancelExprCacheLock.Lock() + cancelExprCache[bucketFactory.CancelOnFilter] = compiledExpr + cancelExprCacheLock.Unlock() } return err } diff --git a/pkg/leakybucket/uniq.go b/pkg/leakybucket/uniq.go index ca7408f39..1e0cddeba 100644 --- a/pkg/leakybucket/uniq.go +++ b/pkg/leakybucket/uniq.go @@ -16,6 +16,9 @@ import ( // on overflow // on leak +var uniqExprCache map[string]vm.Program +var uniqExprCacheLock sync.Mutex + type Uniq struct { DistinctCompiled *vm.Program KeyCache map[string]bool @@ -52,8 +55,25 @@ func (u *Uniq) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, types func (u *Uniq) OnBucketInit(bucketFactory *BucketFactory) error { var err error + var compiledExpr *vm.Program - u.DistinctCompiled, err = expr.Compile(bucketFactory.Distinct, expr.Env(exprhelpers.GetExprEnv(map[string]interface{}{"evt": &types.Event{}}))) + if uniqExprCache == nil { + uniqExprCache = make(map[string]vm.Program) + } + + uniqExprCacheLock.Lock() + if compiled, ok := uniqExprCache[bucketFactory.Distinct]; ok { + uniqExprCacheLock.Unlock() + u.DistinctCompiled = &compiled + } else { + uniqExprCacheLock.Unlock() + //release the lock during compile + compiledExpr, err = expr.Compile(bucketFactory.Distinct, expr.Env(exprhelpers.GetExprEnv(map[string]interface{}{"evt": &types.Event{}}))) + u.DistinctCompiled = compiledExpr + uniqExprCacheLock.Lock() + uniqExprCache[bucketFactory.Distinct] = *compiledExpr + uniqExprCacheLock.Unlock() + } u.KeyCache = make(map[string]bool) return err }