fix race condition on repetitive trigger buckets creation (#1144)

This commit is contained in:
Thibault "bui" Koechlin 2022-01-04 14:02:07 +01:00 committed by GitHub
parent 6c4ec64ca9
commit 8e3004ebb3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -2,13 +2,14 @@ package leakybucket
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math"
"os"
"time"
"github.com/pkg/errors"
"github.com/mohae/deepcopy"
log "github.com/sirupsen/logrus"
@ -154,14 +155,137 @@ func ShutdownAllBuckets(buckets *Buckets) error {
return nil
}
func PourItemToBucket(bucket *Leaky, holder BucketFactory, buckets *Buckets, parsed types.Event) (bool, error) {
var sent bool
var buckey = bucket.Mapkey
var err error
sigclosed := 0
failed_sent := 0
attempts := 0
start := time.Now()
for !sent {
attempts += 1
/* Warn the user if we used more than a 100 ms to pour an event, it's at least an half lock*/
if attempts%100000 == 0 && start.Add(100*time.Millisecond).Before(time.Now()) {
holder.logger.Warningf("stuck for %s sending event to %s (sigclosed:%d failed_sent:%d attempts:%d)", time.Since(start),
buckey, sigclosed, failed_sent, attempts)
}
/* check if leak routine is up */
select {
case _, ok := <-bucket.Signal:
if !ok {
//the bucket was found and dead, get a new one and continue
bucket.logger.Tracef("Bucket %s found dead, cleanup the body", buckey)
buckets.Bucket_map.Delete(buckey)
sigclosed += 1
bucket, err = LoadOrStoreBucketFromHolder(buckey, buckets, holder, parsed.ExpectMode)
if err != nil {
return false, err
}
continue
}
holder.logger.Tracef("Signal exists, try to pour :)")
default:
/*nothing to read, but not closed, try to pour */
holder.logger.Tracef("Signal exists but empty, try to pour :)")
}
/*let's see if this time-bucket should have expired */
if bucket.Mode == TIMEMACHINE {
bucket.mutex.Lock()
firstTs := bucket.First_ts
lastTs := bucket.Last_ts
bucket.mutex.Unlock()
if !firstTs.IsZero() {
var d time.Time
err = d.UnmarshalText([]byte(parsed.MarshaledTime))
if err != nil {
holder.logger.Warningf("Failed unmarshaling event time (%s) : %v", parsed.MarshaledTime, err)
}
if d.After(lastTs.Add(bucket.Duration)) {
bucket.logger.Tracef("bucket is expired (curr event: %s, bucket deadline: %s), kill", d, lastTs.Add(bucket.Duration))
buckets.Bucket_map.Delete(buckey)
//not sure about this, should we create a new one ?
sigclosed += 1
bucket, err = LoadOrStoreBucketFromHolder(buckey, buckets, holder, parsed.ExpectMode)
if err != nil {
return false, err
}
continue
}
}
}
/*the bucket seems to be up & running*/
select {
case bucket.In <- parsed:
holder.logger.Tracef("Successfully sent !")
if BucketPourTrack {
if _, ok := BucketPourCache[bucket.Name]; !ok {
BucketPourCache[bucket.Name] = make([]types.Event, 0)
}
evt := deepcopy.Copy(parsed)
BucketPourCache[bucket.Name] = append(BucketPourCache[bucket.Name], evt.(types.Event))
}
sent = true
continue
default:
failed_sent += 1
holder.logger.Tracef("Failed to send, try again")
continue
}
}
holder.logger.Debugf("bucket '%s' is poured", holder.Name)
return sent, nil
}
func LoadOrStoreBucketFromHolder(partitionKey string, buckets *Buckets, holder BucketFactory, expectMode int) (*Leaky, error) {
biface, ok := buckets.Bucket_map.Load(partitionKey)
/* the bucket doesn't exist, create it !*/
if !ok {
var fresh_bucket *Leaky
switch expectMode {
case TIMEMACHINE:
fresh_bucket = NewTimeMachine(holder)
holder.logger.Debugf("Creating TimeMachine bucket")
case LIVE:
fresh_bucket = NewLeaky(holder)
holder.logger.Debugf("Creating Live bucket")
default:
return nil, fmt.Errorf("input event has no expected mode : %+v", expectMode)
}
fresh_bucket.In = make(chan types.Event)
fresh_bucket.Mapkey = partitionKey
fresh_bucket.Signal = make(chan bool, 1)
actual, stored := buckets.Bucket_map.LoadOrStore(partitionKey, fresh_bucket)
if !stored {
holder.tomb.Go(func() error {
return LeakRoutine(fresh_bucket)
})
biface = fresh_bucket
//once the created goroutine is ready to process event, we can return it
<-fresh_bucket.Signal
} else {
holder.logger.Debugf("Unexpectedly found exisint bucket for %s", partitionKey)
biface = actual
}
holder.logger.Debugf("Created new bucket %s", partitionKey)
}
return biface.(*Leaky), nil
}
func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buckets) (bool, error) {
var (
ok, condition, sent bool
err error
ok, condition, poured bool
)
//synchronize with DumpBucketsStateAt
//to track bucket pour : track items that enter the pour routine
if BucketPourTrack {
if BucketPourCache == nil {
BucketPourCache = make(map[string][]types.Event)
@ -173,8 +297,10 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc
BucketPourCache["OK"] = append(BucketPourCache["OK"], evt.(types.Event))
}
//find the relevant holders (scenarios)
for idx, holder := range holders {
//evaluate bucket's condition
if holder.RunTimeFilter != nil {
holder.logger.Tracef("event against holder %d/%d", idx, len(holders))
output, err := expr.Run(holder.RunTimeFilter, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed}))
@ -197,7 +323,7 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc
}
}
sent = false
//groupby determines the partition key for the specific bucket
var groupby string
if holder.RunTimeGroupBy != nil {
tmpGroupBy, err := expr.Run(holder.RunTimeGroupBy, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed}))
@ -213,119 +339,19 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc
}
buckey := GetKey(holder, groupby)
sigclosed := 0
keymiss := 0
failed_sent := 0
attempts := 0
start := time.Now()
for !sent {
attempts += 1
/* Warn the user if we used more than a 100 ms to pour an event, it's at least an half lock*/
if attempts%100000 == 0 && start.Add(100*time.Millisecond).Before(time.Now()) {
holder.logger.Warningf("stuck for %s sending event to %s (sigclosed:%d keymiss:%d failed_sent:%d attempts:%d)", time.Since(start),
buckey, sigclosed, keymiss, failed_sent, attempts)
}
biface, ok := buckets.Bucket_map.Load(buckey)
//biface, bigout
/* the bucket doesn't exist, create it !*/
if !ok {
/*
not found in map
*/
holder.logger.Debugf("Creating bucket %s", buckey)
keymiss += 1
var fresh_bucket *Leaky
switch parsed.ExpectMode {
case TIMEMACHINE:
fresh_bucket = NewTimeMachine(holder)
holder.logger.Debugf("Creating TimeMachine bucket")
case LIVE:
fresh_bucket = NewLeaky(holder)
holder.logger.Debugf("Creating Live bucket")
default:
holder.logger.Fatalf("input event has no expected mode, malformed : %+v", parsed)
}
fresh_bucket.In = make(chan types.Event)
fresh_bucket.Mapkey = buckey
fresh_bucket.Signal = make(chan bool, 1)
buckets.Bucket_map.Store(buckey, fresh_bucket)
holder.tomb.Go(func() error {
return LeakRoutine(fresh_bucket)
})
holder.logger.Debugf("Created new bucket %s", buckey)
//wait for signal to be opened
<-fresh_bucket.Signal
continue
}
bucket := biface.(*Leaky)
/* check if leak routine is up */
select {
case _, ok := <-bucket.Signal:
if !ok {
//it's closed, delete it
bucket.logger.Debugf("Bucket %s found dead, cleanup the body", buckey)
buckets.Bucket_map.Delete(buckey)
sigclosed += 1
continue
}
holder.logger.Tracef("Signal exists, try to pour :)")
default:
/*nothing to read, but not closed, try to pour */
holder.logger.Tracef("Signal exists but empty, try to pour :)")
}
/*let's see if this time-bucket should have expired */
if bucket.Mode == TIMEMACHINE {
bucket.mutex.Lock()
firstTs := bucket.First_ts
lastTs := bucket.Last_ts
bucket.mutex.Unlock()
if !firstTs.IsZero() {
var d time.Time
err = d.UnmarshalText([]byte(parsed.MarshaledTime))
if err != nil {
holder.logger.Warningf("Failed unmarshaling event time (%s) : %v", parsed.MarshaledTime, err)
}
if d.After(lastTs.Add(bucket.Duration)) {
bucket.logger.Tracef("bucket is expired (curr event: %s, bucket deadline: %s), kill", d, lastTs.Add(bucket.Duration))
buckets.Bucket_map.Delete(buckey)
continue
}
}
}
/*if we're here, let's try to pour */
select {
case bucket.In <- parsed:
holder.logger.Tracef("Successfully sent !")
//and track item poured to each bucket
if BucketPourTrack {
if _, ok := BucketPourCache[bucket.Name]; !ok {
BucketPourCache[bucket.Name] = make([]types.Event, 0)
}
evt := deepcopy.Copy(parsed)
BucketPourCache[bucket.Name] = append(BucketPourCache[bucket.Name], evt.(types.Event))
}
//sent was successful !
sent = true
continue
default:
failed_sent += 1
holder.logger.Tracef("Failed to send, try again")
continue
}
//we need to either find the existing bucket, or create a new one (if it's the first event to hit it for this partition key)
bucket, err := LoadOrStoreBucketFromHolder(buckey, buckets, holder, parsed.ExpectMode)
if err != nil {
return false, errors.Wrap(err, "failed to load or store bucket")
}
//finally, pour the even into the bucket
ok, err := PourItemToBucket(bucket, holder, buckets, parsed)
if err != nil {
return false, errors.Wrap(err, "failed to pour bucket")
}
if ok {
poured = true
}
holder.logger.Debugf("bucket '%s' is poured", holder.Name)
}
return sent, nil
return poured, nil
}