From 7525f11975a0107746213862dc41c69e00122ac7 Mon Sep 17 00:00:00 2001 From: "Thibault \"bui\" Koechlin" Date: Wed, 5 Aug 2020 11:39:54 +0200 Subject: [PATCH] improve tests in pkg/leakybuckets (#171) --- pkg/leakybucket/bucket.go | 137 +-------- .../{manager.go => manager_load.go} | 269 ----------------- pkg/leakybucket/manager_load_test.go | 118 ++++++++ pkg/leakybucket/manager_run.go | 284 ++++++++++++++++++ pkg/leakybucket/manager_run_test.go | 125 ++++++++ pkg/leakybucket/overflows.go | 135 +++++++++ pkg/leakybucket/queue.go | 36 +-- .../tests/leaky-fixedqueue/bucket.yaml | 12 + .../tests/leaky-fixedqueue/scenarios.yaml | 2 + .../tests/leaky-fixedqueue/test.yaml | 51 ++++ .../tests/overflow-with-meta/bucket.yaml | 9 + .../tests/overflow-with-meta/scenarios.yaml | 2 + .../tests/overflow-with-meta/test.yaml | 39 +++ .../tests/simple-counter-bh/bucket.yaml | 11 + .../tests/simple-counter-bh/scenarios.yaml | 2 + .../tests/simple-counter-bh/test.yaml | 22 ++ .../tests/simple-counter-timeout/bucket.yaml | 10 + .../simple-counter-timeout/scenarios.yaml | 2 + .../tests/simple-counter-timeout/test.yaml | 18 ++ .../tests/simple-counter/bucket.yaml | 10 + .../tests/simple-counter/scenarios.yaml | 2 + .../tests/simple-counter/test.yaml | 23 ++ .../tests/simple-leaky-underflow/bucket.yaml | 4 +- .../tests/simple-leaky-underflow/test.yaml | 10 +- 24 files changed, 887 insertions(+), 446 deletions(-) rename pkg/leakybucket/{manager.go => manager_load.go} (60%) create mode 100644 pkg/leakybucket/manager_load_test.go create mode 100644 pkg/leakybucket/manager_run.go create mode 100644 pkg/leakybucket/manager_run_test.go create mode 100644 pkg/leakybucket/overflows.go create mode 100644 pkg/leakybucket/tests/leaky-fixedqueue/bucket.yaml create mode 100644 pkg/leakybucket/tests/leaky-fixedqueue/scenarios.yaml create mode 100644 pkg/leakybucket/tests/leaky-fixedqueue/test.yaml create mode 100644 pkg/leakybucket/tests/overflow-with-meta/bucket.yaml create mode 100644 pkg/leakybucket/tests/overflow-with-meta/scenarios.yaml create mode 100644 pkg/leakybucket/tests/overflow-with-meta/test.yaml create mode 100644 pkg/leakybucket/tests/simple-counter-bh/bucket.yaml create mode 100644 pkg/leakybucket/tests/simple-counter-bh/scenarios.yaml create mode 100644 pkg/leakybucket/tests/simple-counter-bh/test.yaml create mode 100644 pkg/leakybucket/tests/simple-counter-timeout/bucket.yaml create mode 100644 pkg/leakybucket/tests/simple-counter-timeout/scenarios.yaml create mode 100644 pkg/leakybucket/tests/simple-counter-timeout/test.yaml create mode 100644 pkg/leakybucket/tests/simple-counter/bucket.yaml create mode 100644 pkg/leakybucket/tests/simple-counter/scenarios.yaml create mode 100644 pkg/leakybucket/tests/simple-counter/test.yaml diff --git a/pkg/leakybucket/bucket.go b/pkg/leakybucket/bucket.go index a60f7097b..bcbe2f422 100644 --- a/pkg/leakybucket/bucket.go +++ b/pkg/leakybucket/bucket.go @@ -1,21 +1,17 @@ package leakybucket import ( - "encoding/json" - "fmt" - "net" - "strconv" "sync/atomic" "time" //"log" "github.com/crowdsecurity/crowdsec/pkg/time/rate" "github.com/crowdsecurity/crowdsec/pkg/types" + "github.com/goombaio/namegenerator" //rate "time/rate" "github.com/davecgh/go-spew/spew" - "github.com/goombaio/namegenerator" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" //"golang.org/x/time/rate" @@ -106,6 +102,8 @@ var BucketsCurrentCount = prometheus.NewGaugeVec( []string{"name"}, ) +var LeakyRoutineCount int64 + // Newleaky creates a new leaky bucket from a BucketFactory // Events created by the bucket (overflow, bucket empty) are sent to a chan defined by BucketFactory // The leaky bucket implementation is based on rate limiter (see https://godoc.org/golang.org/x/time/rate) @@ -165,8 +163,6 @@ func FromFactory(g BucketFactory) *Leaky { return l } -var LeakyRoutineCount int64 - /* for now mimic a leak routine */ //LeakRoutine us the life of a bucket. It dies when the bucket underflows or overflows func LeakRoutine(l *Leaky) { @@ -221,6 +217,8 @@ func LeakRoutine(l *Leaky) { /*a kill chan to allow externally killing the leaky routines*/ case <-l.KillSwitch: close(l.Signal) + l.logger.Debugf("Bucket externally killed, return") + l.AllOut <- types.Event{Overflow: types.SignalOccurence{MapKey: l.Mapkey}, Type: types.OVFLW} return /*we overflowed*/ case ofw := <-l.Out: @@ -292,128 +290,3 @@ func Pour(l *Leaky, msg types.Event) { l.Out <- l.Queue } } - -func FormatOverflow(l *Leaky, queue *Queue) types.SignalOccurence { - var am string - - l.logger.Debugf("Overflow (start: %s, end: %s)", l.First_ts, l.Ovflw_ts) - - sig := types.SignalOccurence{ - Scenario: l.Name, - Bucket_id: l.Uuid, - Alert_message: am, - Start_at: l.First_ts, - Stop_at: l.Ovflw_ts, - Events_count: l.Total_count, - Capacity: l.Capacity, - Reprocess: l.Reprocess, - Leak_speed: l.Leakspeed, - MapKey: l.Mapkey, - Sources: make(map[string]types.Source), - Labels: l.BucketConfig.Labels, - } - - for _, evt := range queue.Queue { - //either it's a collection of logs, or a collection of past overflows being reprocessed. - //one overflow can have multiple sources for example - if evt.Type == types.LOG { - if _, ok := evt.Meta["source_ip"]; !ok { - continue - } - source_ip := evt.Meta["source_ip"] - if _, ok := sig.Sources[source_ip]; !ok { - src := types.Source{} - src.Ip = net.ParseIP(source_ip) - if v, ok := evt.Enriched["ASNNumber"]; ok { - src.AutonomousSystemNumber = v - } - if v, ok := evt.Enriched["IsoCode"]; ok { - src.Country = v - } - if v, ok := evt.Enriched["ASNOrg"]; ok { - src.AutonomousSystemOrganization = v - } - if v, ok := evt.Enriched["Latitude"]; ok { - src.Latitude, _ = strconv.ParseFloat(v, 32) - } - if v, ok := evt.Enriched["Longitude"]; ok { - src.Longitude, _ = strconv.ParseFloat(v, 32) - } - if v, ok := evt.Meta["SourceRange"]; ok { - _, ipNet, err := net.ParseCIDR(v) - if err != nil { - l.logger.Errorf("Declared range %s of %s can't be parsed", v, src.Ip.String()) - } else if ipNet != nil { - src.Range = *ipNet - l.logger.Tracef("Valid range from %s : %s", src.Ip.String(), src.Range.String()) - } - } - sig.Sources[source_ip] = src - if sig.Source == nil { - sig.Source = &src - sig.Source_ip = src.Ip.String() - sig.Source_AutonomousSystemNumber = src.AutonomousSystemNumber - sig.Source_AutonomousSystemOrganization = src.AutonomousSystemOrganization - sig.Source_Country = src.Country - sig.Source_range = src.Range.String() - sig.Source_Latitude = src.Latitude - sig.Source_Longitude = src.Longitude - } - } - } else if evt.Type == types.OVFLW { - for _, src := range evt.Overflow.Sources { - if _, ok := sig.Sources[src.Ip.String()]; !ok { - sig.Sources[src.Ip.String()] = src - if sig.Source == nil { - l.logger.Tracef("populating overflow with source : %+v", src) - src := src //src will be reused, copy before giving pointer - sig.Source = &src - sig.Source_ip = src.Ip.String() - sig.Source_AutonomousSystemNumber = src.AutonomousSystemNumber - sig.Source_AutonomousSystemOrganization = src.AutonomousSystemOrganization - sig.Source_Country = src.Country - sig.Source_range = src.Range.String() - sig.Source_Latitude = src.Latitude - sig.Source_Longitude = src.Longitude - } - } - - } - - } - - strret, err := json.Marshal(evt.Meta) - if err != nil { - l.logger.Errorf("failed to marshal ret : %v", err) - continue - } - if sig.Source != nil { - sig.Events_sequence = append(sig.Events_sequence, types.EventSequence{ - Source: *sig.Source, - Source_ip: sig.Source_ip, - Source_AutonomousSystemNumber: sig.Source.AutonomousSystemNumber, - Source_AutonomousSystemOrganization: sig.Source.AutonomousSystemOrganization, - Source_Country: sig.Source.Country, - Serialized: string(strret), - Time: l.First_ts}) - } else { - l.logger.Warningf("Event without source ?!") - } - } - - if len(sig.Sources) > 1 { - am = fmt.Sprintf("%d IPs", len(sig.Sources)) - } else if len(sig.Sources) == 1 { - if sig.Source != nil { - am = sig.Source.Ip.String() - } else { - am = "??" - } - } else { - am = "UNKNOWN" - } - - am += fmt.Sprintf(" performed '%s' (%d events over %s) at %s", l.Name, l.Total_count, l.Ovflw_ts.Sub(l.First_ts), l.Ovflw_ts) - sig.Alert_message = am - return sig -} diff --git a/pkg/leakybucket/manager.go b/pkg/leakybucket/manager_load.go similarity index 60% rename from pkg/leakybucket/manager.go rename to pkg/leakybucket/manager_load.go index 7a8f2a5a6..9086a8e7e 100644 --- a/pkg/leakybucket/manager.go +++ b/pkg/leakybucket/manager_load.go @@ -2,11 +2,9 @@ package leakybucket import ( "encoding/json" - "errors" "fmt" "io" "io/ioutil" - "math" "os" "path/filepath" "strings" @@ -16,7 +14,6 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/types" "github.com/davecgh/go-spew/spew" - "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" @@ -364,269 +361,3 @@ func LoadBucketsState(file string, buckets *Buckets, holders []BucketFactory) er return nil } - -var serialized map[string]Leaky - -/*The leaky routines lifecycle are based on "real" time. -But when we are running in time-machine mode, the reference time is in logs and not "real" time. -Thus we need to garbage collect them to avoid a skyrocketing memory usage.*/ -func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) error { - total := 0 - discard := 0 - toflush := []string{} - buckets.Bucket_map.Range(func(rkey, rvalue interface{}) bool { - key := rkey.(string) - val := rvalue.(*Leaky) - total += 1 - //bucket already overflowed, we can kill it - if !val.Ovflw_ts.IsZero() { - discard += 1 - val.logger.Debugf("overflowed at %s.", val.Ovflw_ts) - toflush = append(toflush, key) - val.KillSwitch <- true - return true - } - /*FIXME : sometimes the gettokenscountat has some rounding issues when we try to - match it with bucket capacity, even if the bucket has long due underflow. Round to 2 decimals*/ - tokat := val.Limiter.GetTokensCountAt(deadline) - tokcapa := float64(val.Capacity) - tokat = math.Round(tokat*100) / 100 - tokcapa = math.Round(tokcapa*100) / 100 - //bucket actually underflowed based on log time, but no in real time - if tokat >= tokcapa { - BucketsUnderflow.With(prometheus.Labels{"name": val.Name}).Inc() - val.logger.Debugf("UNDERFLOW : first_ts:%s tokens_at:%f capcity:%f", val.First_ts, tokat, tokcapa) - toflush = append(toflush, key) - val.KillSwitch <- true - return true - } else { - val.logger.Debugf("(%s) not dead, count:%f capacity:%f", val.First_ts, tokat, tokcapa) - } - if _, ok := serialized[key]; ok { - log.Errorf("entry %s already exists", key) - return false - } else { - log.Debugf("serialize %s of %s : %s", val.Name, val.Uuid, val.Mapkey) - } - return true - }) - log.Infof("Cleaned %d buckets", len(toflush)) - for _, flushkey := range toflush { - buckets.Bucket_map.Delete(flushkey) - } - return nil -} - -func DumpBucketsStateAt(deadline time.Time, buckets *Buckets) (string, error) { - //var file string - tmpFd, err := ioutil.TempFile(os.TempDir(), "crowdsec-buckets-dump-") - if err != nil { - return "", fmt.Errorf("failed to create temp file : %s", err) - } - defer tmpFd.Close() - tmpFileName := tmpFd.Name() - serialized = make(map[string]Leaky) - log.Printf("Dumping buckets state at %s", deadline) - total := 0 - discard := 0 - buckets.Bucket_map.Range(func(rkey, rvalue interface{}) bool { - key := rkey.(string) - val := rvalue.(*Leaky) - total += 1 - if !val.Ovflw_ts.IsZero() { - discard += 1 - val.logger.Debugf("overflowed at %s.", val.Ovflw_ts) - return true - } - /*FIXME : sometimes the gettokenscountat has some rounding issues when we try to - match it with bucket capacity, even if the bucket has long due underflow. Round to 2 decimals*/ - tokat := val.Limiter.GetTokensCountAt(deadline) - tokcapa := float64(val.Capacity) - tokat = math.Round(tokat*100) / 100 - tokcapa = math.Round(tokcapa*100) / 100 - - if tokat >= tokcapa { - BucketsUnderflow.With(prometheus.Labels{"name": val.Name}).Inc() - val.logger.Debugf("UNDERFLOW : first_ts:%s tokens_at:%f capcity:%f", val.First_ts, tokat, tokcapa) - discard += 1 - return true - } else { - val.logger.Debugf("(%s) not dead, count:%f capacity:%f", val.First_ts, tokat, tokcapa) - } - if _, ok := serialized[key]; ok { - log.Errorf("entry %s already exists", key) - return false - } else { - log.Debugf("serialize %s of %s : %s", val.Name, val.Uuid, val.Mapkey) - } - val.SerializedState = val.Limiter.Dump() - serialized[key] = *val - return true - }) - bbuckets, err := json.MarshalIndent(serialized, "", " ") - if err != nil { - log.Fatalf("Failed to unmarshal buckets : %s", err) - } - size, err := tmpFd.Write(bbuckets) - if err != nil { - return "", fmt.Errorf("failed to write temp file : %s", err) - } - log.Infof("Serialized %d live buckets (+%d expired) in %d bytes to %s", len(serialized), discard, size, tmpFd.Name()) - serialized = nil - return tmpFileName, nil -} - -func ShutdownAllBuckets(buckets *Buckets) error { - buckets.Bucket_map.Range(func(rkey, rvalue interface{}) bool { - key := rkey.(string) - val := rvalue.(*Leaky) - val.KillSwitch <- true - log.Infof("killed %s", key) - return true - }) - return nil -} - -func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buckets) (bool, error) { - var ( - ok, condition, sent bool - err error - ) - - for idx, holder := range holders { - - if holder.RunTimeFilter != nil { - log.Debugf("event against holder %d/%d", idx, len(holders)) - output, err := expr.Run(holder.RunTimeFilter, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed})) - if err != nil { - holder.logger.Errorf("failed parsing : %v", err) - return false, fmt.Errorf("leaky failed : %s", err) - } - // we assume we a bool should add type check here - if condition, ok = output.(bool); !ok { - holder.logger.Errorf("unexpected non-bool return : %T", output) - log.Fatalf("Filter issue") - } - - if holder.Debug { - holder.ExprDebugger.Run(holder.logger, condition, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed})) - } - if !condition { - holder.logger.Debugf("Event leaving node : ko") - continue - } - } - - sent = false - var groupby string - if holder.RunTimeGroupBy != nil { - tmpGroupBy, err := expr.Run(holder.RunTimeGroupBy, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed})) - if err != nil { - log.Errorf("failed groupby : %v", err) - return false, errors.New("leaky failed :/") - } - - if groupby, ok = tmpGroupBy.(string); !ok { - log.Fatalf("failed groupby type : %v", err) - return false, errors.New("groupby wrong type") - } - } - buckey := GetKey(holder, groupby) - - sigclosed := 0 - keymiss := 0 - failed_sent := 0 - attempts := 0 - start := time.Now() - for !sent { - attempts += 1 - /* Warn the user if we used more than a 100 ms to pour an event, it's at least an half lock*/ - if attempts%100000 == 0 && start.Add(100*time.Millisecond).Before(time.Now()) { - log.Warningf("stuck for %s sending event to %s (sigclosed:%d keymiss:%d failed_sent:%d attempts:%d)", time.Since(start), - buckey, sigclosed, keymiss, failed_sent, attempts) - } - biface, ok := buckets.Bucket_map.Load(buckey) - //biface, bigout - /* the bucket doesn't exist, create it !*/ - if !ok { - /* - not found in map - */ - - log.Debugf("Creating bucket %s", buckey) - keymiss += 1 - var fresh_bucket *Leaky - - switch parsed.ExpectMode { - case TIMEMACHINE: - fresh_bucket = NewTimeMachine(holder) - holder.logger.Debugf("Creating TimeMachine bucket") - case LIVE: - fresh_bucket = NewLeaky(holder) - holder.logger.Debugf("Creating Live bucket") - default: - log.Fatalf("input event has no expected mode, malformed : %+v", parsed) - } - fresh_bucket.In = make(chan types.Event) - fresh_bucket.Mapkey = buckey - fresh_bucket.Signal = make(chan bool, 1) - fresh_bucket.KillSwitch = make(chan bool, 1) - buckets.Bucket_map.Store(buckey, fresh_bucket) - go LeakRoutine(fresh_bucket) - log.Debugf("Created new bucket %s", buckey) - //wait for signal to be opened - <-fresh_bucket.Signal - continue - } - - bucket := biface.(*Leaky) - /* check if leak routine is up */ - select { - case _, ok := <-bucket.Signal: - if !ok { - //it's closed, delete it - bucket.logger.Debugf("Bucket %s found dead, cleanup the body", buckey) - buckets.Bucket_map.Delete(buckey) - sigclosed += 1 - continue - } - log.Debugf("Signal exists, try to pour :)") - - default: - /*nothing to read, but not closed, try to pour */ - log.Debugf("Signal exists but empty, try to pour :)") - - } - /*let's see if this time-bucket should have expired */ - if bucket.Mode == TIMEMACHINE && !bucket.First_ts.IsZero() { - var d time.Time - err = d.UnmarshalText([]byte(parsed.MarshaledTime)) - if err != nil { - log.Warningf("Failed unmarshaling event time (%s) : %v", parsed.MarshaledTime, err) - } - if d.After(bucket.Last_ts.Add(bucket.Duration)) { - bucket.logger.Debugf("bucket is expired (curr event: %s, bucket deadline: %s), kill", d, bucket.Last_ts.Add(bucket.Duration)) - buckets.Bucket_map.Delete(buckey) - continue - } - } - /*if we're here, let's try to pour */ - - select { - case bucket.In <- parsed: - log.Debugf("Successfully sent !") - //sent was successful ! - sent = true - continue - default: - failed_sent += 1 - log.Debugf("Failed to send, try again") - continue - - } - } - - log.Debugf("bucket '%s' is poured", holder.Name) - } - return sent, nil -} diff --git a/pkg/leakybucket/manager_load_test.go b/pkg/leakybucket/manager_load_test.go new file mode 100644 index 000000000..861679673 --- /dev/null +++ b/pkg/leakybucket/manager_load_test.go @@ -0,0 +1,118 @@ +package leakybucket + +import ( + "fmt" + "testing" +) + +type cfgTest struct { + cfg BucketFactory + loadable bool + valid bool +} + +func runTest(tests []cfgTest) error { + for idx, cfg := range tests { + err := LoadBucket(&cfg.cfg, ".") + if cfg.loadable && err != nil { + return fmt.Errorf("expected loadable result (%d/%d), got: %s", idx+1, len(tests), err) + } + if !cfg.loadable && err == nil { + return fmt.Errorf("expected unloadable result (%d/%d)", idx+1, len(tests)) + } + err = ValidateFactory(&cfg.cfg) + if cfg.valid && err != nil { + return fmt.Errorf("expected valid result (%d/%d), got: %s", idx+1, len(tests), err) + } + if !cfg.valid && err == nil { + return fmt.Errorf("expected invalid result (%d/%d)", idx+1, len(tests)) + } + } + return nil +} + +func TestBadBucketsConfig(t *testing.T) { + var CfgTests = []cfgTest{ + //empty + {BucketFactory{}, false, false}, + //missing description + {BucketFactory{Name: "test"}, false, false}, + //missing type + {BucketFactory{Name: "test", Description: "test1"}, false, false}, + //bad type + {BucketFactory{Name: "test", Description: "test1", Type: "ratata"}, false, false}, + } + if err := runTest(CfgTests); err != nil { + t.Fatalf("%s", err) + } +} + +func TestLeakyBucketsConfig(t *testing.T) { + var CfgTests = []cfgTest{ + //leaky with bad capacity + {BucketFactory{Name: "test", Description: "test1", Type: "leaky", Capacity: 0}, false, false}, + //leaky with empty leakspeed + {BucketFactory{Name: "test", Description: "test1", Type: "leaky", Capacity: 1}, false, false}, + //leaky with missing filter + {BucketFactory{Name: "test", Description: "test1", Type: "leaky", Capacity: 1, LeakSpeed: "1s"}, false, true}, + //leaky with invalid leakspeed + {BucketFactory{Name: "test", Description: "test1", Type: "leaky", Capacity: 1, LeakSpeed: "abs", Filter: "true"}, false, false}, + //leaky with valid filter + {BucketFactory{Name: "test", Description: "test1", Type: "leaky", Capacity: 1, LeakSpeed: "1s", Filter: "true"}, true, true}, + //leaky with invalid filter + {BucketFactory{Name: "test", Description: "test1", Type: "leaky", Capacity: 1, LeakSpeed: "1s", Filter: "xu"}, false, true}, + //leaky with valid filter + {BucketFactory{Name: "test", Description: "test1", Type: "leaky", Capacity: 1, LeakSpeed: "1s", Filter: "true"}, true, true}, + //leaky with bad overflow filter + {BucketFactory{Name: "test", Description: "test1", Type: "leaky", Capacity: 1, LeakSpeed: "1s", Filter: "true", OverflowFilter: "xu"}, false, true}, + } + + if err := runTest(CfgTests); err != nil { + t.Fatalf("%s", err) + } + +} + +func TestBlackholeConfig(t *testing.T) { + var CfgTests = []cfgTest{ + //basic bh + {BucketFactory{Name: "test", Description: "test1", Type: "trigger", Filter: "true", Blackhole: "15s"}, true, true}, + //bad bh + {BucketFactory{Name: "test", Description: "test1", Type: "trigger", Filter: "true", Blackhole: "abc"}, false, true}, + } + + if err := runTest(CfgTests); err != nil { + t.Fatalf("%s", err) + } + +} + +func TestTriggerBucketsConfig(t *testing.T) { + var CfgTests = []cfgTest{ + //basic valid counter + {BucketFactory{Name: "test", Description: "test1", Type: "trigger", Filter: "true"}, true, true}, + } + + if err := runTest(CfgTests); err != nil { + t.Fatalf("%s", err) + } + +} + +func TestCounterBucketsConfig(t *testing.T) { + var CfgTests = []cfgTest{ + + //basic valid counter + {BucketFactory{Name: "test", Description: "test1", Type: "counter", Capacity: -1, Duration: "5s", Filter: "true"}, true, true}, + //missing duration + {BucketFactory{Name: "test", Description: "test1", Type: "counter", Capacity: -1, Filter: "true"}, false, false}, + //bad duration + {BucketFactory{Name: "test", Description: "test1", Type: "counter", Capacity: -1, Duration: "abc", Filter: "true"}, false, false}, + //capacity must be -1 + {BucketFactory{Name: "test", Description: "test1", Type: "counter", Capacity: 0, Duration: "5s", Filter: "true"}, false, false}, + } + if err := runTest(CfgTests); err != nil { + t.Fatalf("%s", err) + } + +} diff --git a/pkg/leakybucket/manager_run.go b/pkg/leakybucket/manager_run.go new file mode 100644 index 000000000..900da2769 --- /dev/null +++ b/pkg/leakybucket/manager_run.go @@ -0,0 +1,284 @@ +package leakybucket + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "math" + "os" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/antonmedv/expr" + "github.com/crowdsecurity/crowdsec/pkg/exprhelpers" + "github.com/crowdsecurity/crowdsec/pkg/types" + "github.com/prometheus/client_golang/prometheus" +) + +var serialized map[string]Leaky + +/*The leaky routines lifecycle are based on "real" time. +But when we are running in time-machine mode, the reference time is in logs and not "real" time. +Thus we need to garbage collect them to avoid a skyrocketing memory usage.*/ +func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) error { + total := 0 + discard := 0 + toflush := []string{} + buckets.Bucket_map.Range(func(rkey, rvalue interface{}) bool { + key := rkey.(string) + val := rvalue.(*Leaky) + total += 1 + //bucket already overflowed, we can kill it + if !val.Ovflw_ts.IsZero() { + discard += 1 + val.logger.Debugf("overflowed at %s.", val.Ovflw_ts) + toflush = append(toflush, key) + val.KillSwitch <- true + return true + } + /*FIXME : sometimes the gettokenscountat has some rounding issues when we try to + match it with bucket capacity, even if the bucket has long due underflow. Round to 2 decimals*/ + tokat := val.Limiter.GetTokensCountAt(deadline) + tokcapa := float64(val.Capacity) + tokat = math.Round(tokat*100) / 100 + tokcapa = math.Round(tokcapa*100) / 100 + //bucket actually underflowed based on log time, but no in real time + if tokat >= tokcapa { + BucketsUnderflow.With(prometheus.Labels{"name": val.Name}).Inc() + val.logger.Debugf("UNDERFLOW : first_ts:%s tokens_at:%f capcity:%f", val.First_ts, tokat, tokcapa) + toflush = append(toflush, key) + val.KillSwitch <- true + return true + } else { + val.logger.Debugf("(%s) not dead, count:%f capacity:%f", val.First_ts, tokat, tokcapa) + } + if _, ok := serialized[key]; ok { + log.Errorf("entry %s already exists", key) + return false + } else { + log.Debugf("serialize %s of %s : %s", val.Name, val.Uuid, val.Mapkey) + } + return true + }) + log.Infof("Cleaned %d buckets", len(toflush)) + for _, flushkey := range toflush { + buckets.Bucket_map.Delete(flushkey) + } + return nil +} + +func DumpBucketsStateAt(deadline time.Time, buckets *Buckets) (string, error) { + //var file string + tmpFd, err := ioutil.TempFile(os.TempDir(), "crowdsec-buckets-dump-") + if err != nil { + return "", fmt.Errorf("failed to create temp file : %s", err) + } + defer tmpFd.Close() + tmpFileName := tmpFd.Name() + serialized = make(map[string]Leaky) + log.Printf("Dumping buckets state at %s", deadline) + total := 0 + discard := 0 + buckets.Bucket_map.Range(func(rkey, rvalue interface{}) bool { + key := rkey.(string) + val := rvalue.(*Leaky) + total += 1 + if !val.Ovflw_ts.IsZero() { + discard += 1 + val.logger.Debugf("overflowed at %s.", val.Ovflw_ts) + return true + } + /*FIXME : sometimes the gettokenscountat has some rounding issues when we try to + match it with bucket capacity, even if the bucket has long due underflow. Round to 2 decimals*/ + tokat := val.Limiter.GetTokensCountAt(deadline) + tokcapa := float64(val.Capacity) + tokat = math.Round(tokat*100) / 100 + tokcapa = math.Round(tokcapa*100) / 100 + + if tokat >= tokcapa { + BucketsUnderflow.With(prometheus.Labels{"name": val.Name}).Inc() + val.logger.Debugf("UNDERFLOW : first_ts:%s tokens_at:%f capcity:%f", val.First_ts, tokat, tokcapa) + discard += 1 + return true + } else { + val.logger.Debugf("(%s) not dead, count:%f capacity:%f", val.First_ts, tokat, tokcapa) + } + if _, ok := serialized[key]; ok { + log.Errorf("entry %s already exists", key) + return false + } else { + log.Debugf("serialize %s of %s : %s", val.Name, val.Uuid, val.Mapkey) + } + val.SerializedState = val.Limiter.Dump() + serialized[key] = *val + return true + }) + bbuckets, err := json.MarshalIndent(serialized, "", " ") + if err != nil { + log.Fatalf("Failed to unmarshal buckets : %s", err) + } + size, err := tmpFd.Write(bbuckets) + if err != nil { + return "", fmt.Errorf("failed to write temp file : %s", err) + } + log.Infof("Serialized %d live buckets (+%d expired) in %d bytes to %s", len(serialized), discard, size, tmpFd.Name()) + serialized = nil + return tmpFileName, nil +} + +func ShutdownAllBuckets(buckets *Buckets) error { + buckets.Bucket_map.Range(func(rkey, rvalue interface{}) bool { + key := rkey.(string) + val := rvalue.(*Leaky) + val.KillSwitch <- true + log.Infof("killed %s", key) + return true + }) + return nil +} + +func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buckets) (bool, error) { + var ( + ok, condition, sent bool + err error + ) + + for idx, holder := range holders { + + if holder.RunTimeFilter != nil { + log.Debugf("event against holder %d/%d", idx, len(holders)) + output, err := expr.Run(holder.RunTimeFilter, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed})) + if err != nil { + holder.logger.Errorf("failed parsing : %v", err) + return false, fmt.Errorf("leaky failed : %s", err) + } + // we assume we a bool should add type check here + if condition, ok = output.(bool); !ok { + holder.logger.Errorf("unexpected non-bool return : %T", output) + log.Fatalf("Filter issue") + } + if !condition { + holder.logger.Debugf("eval(FALSE) %s", holder.Filter) + //log.Debugf("%s -> FALSE", holder.Filter) + //holder.logger.Debugf("Filter eval failed") + continue + } else { + holder.logger.Debugf("eval(TRUE) %s", holder.Filter) + } + } + + sent = false + var groupby string + if holder.RunTimeGroupBy != nil { + tmpGroupBy, err := expr.Run(holder.RunTimeGroupBy, exprhelpers.GetExprEnv(map[string]interface{}{"evt": &parsed})) + if err != nil { + log.Errorf("failed groupby : %v", err) + return false, errors.New("leaky failed :/") + } + + if groupby, ok = tmpGroupBy.(string); !ok { + log.Fatalf("failed groupby type : %v", err) + return false, errors.New("groupby wrong type") + } + } + buckey := GetKey(holder, groupby) + + sigclosed := 0 + keymiss := 0 + failed_sent := 0 + attempts := 0 + start := time.Now() + for !sent { + attempts += 1 + /* Warn the user if we used more than a 100 ms to pour an event, it's at least an half lock*/ + if attempts%100000 == 0 && start.Add(100*time.Millisecond).Before(time.Now()) { + log.Warningf("stuck for %s sending event to %s (sigclosed:%d keymiss:%d failed_sent:%d attempts:%d)", time.Since(start), + buckey, sigclosed, keymiss, failed_sent, attempts) + } + biface, ok := buckets.Bucket_map.Load(buckey) + //biface, bigout + /* the bucket doesn't exist, create it !*/ + if !ok { + /* + not found in map + */ + + log.Debugf("Creating bucket %s", buckey) + keymiss += 1 + var fresh_bucket *Leaky + + switch parsed.ExpectMode { + case TIMEMACHINE: + fresh_bucket = NewTimeMachine(holder) + holder.logger.Debugf("Creating TimeMachine bucket") + case LIVE: + fresh_bucket = NewLeaky(holder) + holder.logger.Debugf("Creating Live bucket") + default: + log.Fatalf("input event has no expected mode, malformed : %+v", parsed) + } + fresh_bucket.In = make(chan types.Event) + fresh_bucket.Mapkey = buckey + fresh_bucket.Signal = make(chan bool, 1) + fresh_bucket.KillSwitch = make(chan bool, 1) + buckets.Bucket_map.Store(buckey, fresh_bucket) + go LeakRoutine(fresh_bucket) + log.Debugf("Created new bucket %s", buckey) + //wait for signal to be opened + <-fresh_bucket.Signal + continue + } + + bucket := biface.(*Leaky) + /* check if leak routine is up */ + select { + case _, ok := <-bucket.Signal: + if !ok { + //it's closed, delete it + bucket.logger.Debugf("Bucket %s found dead, cleanup the body", buckey) + buckets.Bucket_map.Delete(buckey) + sigclosed += 1 + continue + } + log.Debugf("Signal exists, try to pour :)") + + default: + /*nothing to read, but not closed, try to pour */ + log.Debugf("Signal exists but empty, try to pour :)") + + } + /*let's see if this time-bucket should have expired */ + if bucket.Mode == TIMEMACHINE && !bucket.First_ts.IsZero() { + var d time.Time + err = d.UnmarshalText([]byte(parsed.MarshaledTime)) + if err != nil { + log.Warningf("Failed unmarshaling event time (%s) : %v", parsed.MarshaledTime, err) + } + if d.After(bucket.Last_ts.Add(bucket.Duration)) { + bucket.logger.Debugf("bucket is expired (curr event: %s, bucket deadline: %s), kill", d, bucket.Last_ts.Add(bucket.Duration)) + buckets.Bucket_map.Delete(buckey) + continue + } + } + /*if we're here, let's try to pour */ + + select { + case bucket.In <- parsed: + log.Debugf("Successfully sent !") + //sent was successful ! + sent = true + continue + default: + failed_sent += 1 + log.Debugf("Failed to send, try again") + continue + + } + } + + log.Debugf("bucket '%s' is poured", holder.Name) + } + return sent, nil +} diff --git a/pkg/leakybucket/manager_run_test.go b/pkg/leakybucket/manager_run_test.go new file mode 100644 index 000000000..e5b01de2d --- /dev/null +++ b/pkg/leakybucket/manager_run_test.go @@ -0,0 +1,125 @@ +package leakybucket + +import ( + "fmt" + "testing" + "time" + + "github.com/crowdsecurity/crowdsec/pkg/types" + log "github.com/sirupsen/logrus" +) + +func expectBucketCount(buckets *Buckets, expected int) error { + count := 0 + buckets.Bucket_map.Range(func(rkey, rvalue interface{}) bool { + count++ + return true + }) + if count != expected { + return fmt.Errorf("expected %d live buckets, got %d", expected, count) + } + return nil + +} + +func TestGCandDump(t *testing.T) { + var buckets *Buckets = NewBuckets() + + var Holders = []BucketFactory{ + //one overflowing soon + bh + BucketFactory{Name: "test_counter_fast", Description: "test_counter_fast", Debug: true, Type: "counter", Capacity: -1, Duration: "0.5s", Blackhole: "1m", Filter: "true"}, + //one long counter + BucketFactory{Name: "test_counter_slow", Description: "test_counter_slow", Debug: true, Type: "counter", Capacity: -1, Duration: "10m", Filter: "true"}, + //slow leaky + BucketFactory{Name: "test_leaky_slow", Description: "test_leaky_slow", Debug: true, Type: "leaky", Capacity: 5, LeakSpeed: "10m", Filter: "true"}, + } + + for idx := range Holders { + if err := LoadBucket(&Holders[idx], "."); err != nil { + t.Fatalf("while loading (%d/%d): %s", idx, len(Holders), err) + } + if err := ValidateFactory(&Holders[idx]); err != nil { + t.Fatalf("while validating (%d/%d): %s", idx, len(Holders), err) + } + } + + log.Printf("Pouring to bucket") + + var in = types.Event{Parsed: map[string]string{"something": "something"}} + //pour an item that will go to leaky + counter + ok, err := PourItemToHolders(in, Holders, buckets) + if err != nil { + t.Fatalf("while pouring item : %s", err) + } + if !ok { + t.Fatalf("didn't pour item") + } + + time.Sleep(2 * time.Second) + + if err := expectBucketCount(buckets, 3); err != nil { + t.Fatal(err) + } + log.Printf("Bucket GC") + + //call garbage collector + if err := GarbageCollectBuckets(time.Now(), buckets); err != nil { + t.Fatalf("failed to garbage collect buckets : %s", err) + } + + if err := expectBucketCount(buckets, 1); err != nil { + t.Fatal(err) + } + + log.Printf("Dumping buckets state") + //dump remaining buckets + if _, err := DumpBucketsStateAt(time.Now(), buckets); err != nil { + t.Fatalf("failed to dump buckets : %s", err) + } +} + +func TestBucketsShutdown(t *testing.T) { + var buckets *Buckets = NewBuckets() + + var Holders = []BucketFactory{ + //one long counter + BucketFactory{Name: "test_counter_slow", Description: "test_counter_slow", Debug: true, Type: "counter", Capacity: -1, Duration: "10m", Filter: "true"}, + //slow leaky + BucketFactory{Name: "test_leaky_slow", Description: "test_leaky_slow", Debug: true, Type: "leaky", Capacity: 5, LeakSpeed: "10m", Filter: "true"}, + } + + for idx := range Holders { + if err := LoadBucket(&Holders[idx], "."); err != nil { + t.Fatalf("while loading (%d/%d): %s", idx, len(Holders), err) + } + if err := ValidateFactory(&Holders[idx]); err != nil { + t.Fatalf("while validating (%d/%d): %s", idx, len(Holders), err) + } + } + + log.Printf("Pouring to bucket") + + var in = types.Event{Parsed: map[string]string{"something": "something"}} + //pour an item that will go to leaky + counter + ok, err := PourItemToHolders(in, Holders, buckets) + if err != nil { + t.Fatalf("while pouring item : %s", err) + } + if !ok { + t.Fatalf("didn't pour item") + } + + time.Sleep(1 * time.Second) + + if err := expectBucketCount(buckets, 2); err != nil { + t.Fatal(err) + } + if err := ShutdownAllBuckets(buckets); err != nil { + t.Fatalf("while shuting down buckets : %s", err) + } + time.Sleep(2 * time.Second) + if err := expectBucketCount(buckets, 2); err != nil { + t.Fatal(err) + } + +} diff --git a/pkg/leakybucket/overflows.go b/pkg/leakybucket/overflows.go new file mode 100644 index 000000000..f90114626 --- /dev/null +++ b/pkg/leakybucket/overflows.go @@ -0,0 +1,135 @@ +package leakybucket + +import ( + "encoding/json" + "fmt" + "net" + "strconv" + + "github.com/crowdsecurity/crowdsec/pkg/types" +) + +func FormatOverflow(l *Leaky, queue *Queue) types.SignalOccurence { + var am string + + l.logger.Debugf("Overflow (start: %s, end: %s)", l.First_ts, l.Ovflw_ts) + + sig := types.SignalOccurence{ + Scenario: l.Name, + Bucket_id: l.Uuid, + Alert_message: am, + Start_at: l.First_ts, + Stop_at: l.Ovflw_ts, + Events_count: l.Total_count, + Capacity: l.Capacity, + Reprocess: l.Reprocess, + Leak_speed: l.Leakspeed, + MapKey: l.Mapkey, + Sources: make(map[string]types.Source), + Labels: l.BucketConfig.Labels, + } + + for _, evt := range queue.Queue { + //either it's a collection of logs, or a collection of past overflows being reprocessed. + //one overflow can have multiple sources for example + if evt.Type == types.LOG { + if _, ok := evt.Meta["source_ip"]; !ok { + continue + } + source_ip := evt.Meta["source_ip"] + if _, ok := sig.Sources[source_ip]; !ok { + src := types.Source{} + src.Ip = net.ParseIP(source_ip) + if v, ok := evt.Enriched["ASNNumber"]; ok { + src.AutonomousSystemNumber = v + } + if v, ok := evt.Enriched["IsoCode"]; ok { + src.Country = v + } + if v, ok := evt.Enriched["ASNOrg"]; ok { + src.AutonomousSystemOrganization = v + } + if v, ok := evt.Enriched["Latitude"]; ok { + src.Latitude, _ = strconv.ParseFloat(v, 32) + } + if v, ok := evt.Enriched["Longitude"]; ok { + src.Longitude, _ = strconv.ParseFloat(v, 32) + } + if v, ok := evt.Meta["SourceRange"]; ok { + _, ipNet, err := net.ParseCIDR(v) + if err != nil { + l.logger.Errorf("Declared range %s of %s can't be parsed", v, src.Ip.String()) + } else if ipNet != nil { + src.Range = *ipNet + l.logger.Tracef("Valid range from %s : %s", src.Ip.String(), src.Range.String()) + } + } + sig.Sources[source_ip] = src + if sig.Source == nil { + sig.Source = &src + sig.Source_ip = src.Ip.String() + sig.Source_AutonomousSystemNumber = src.AutonomousSystemNumber + sig.Source_AutonomousSystemOrganization = src.AutonomousSystemOrganization + sig.Source_Country = src.Country + sig.Source_range = src.Range.String() + sig.Source_Latitude = src.Latitude + sig.Source_Longitude = src.Longitude + } + } + } else if evt.Type == types.OVFLW { + for _, src := range evt.Overflow.Sources { + if _, ok := sig.Sources[src.Ip.String()]; !ok { + sig.Sources[src.Ip.String()] = src + if sig.Source == nil { + l.logger.Tracef("populating overflow with source : %+v", src) + src := src //src will be reused, copy before giving pointer + sig.Source = &src + sig.Source_ip = src.Ip.String() + sig.Source_AutonomousSystemNumber = src.AutonomousSystemNumber + sig.Source_AutonomousSystemOrganization = src.AutonomousSystemOrganization + sig.Source_Country = src.Country + sig.Source_range = src.Range.String() + sig.Source_Latitude = src.Latitude + sig.Source_Longitude = src.Longitude + } + } + + } + + } + + strret, err := json.Marshal(evt.Meta) + if err != nil { + l.logger.Errorf("failed to marshal ret : %v", err) + continue + } + if sig.Source != nil { + sig.Events_sequence = append(sig.Events_sequence, types.EventSequence{ + Source: *sig.Source, + Source_ip: sig.Source_ip, + Source_AutonomousSystemNumber: sig.Source.AutonomousSystemNumber, + Source_AutonomousSystemOrganization: sig.Source.AutonomousSystemOrganization, + Source_Country: sig.Source.Country, + Serialized: string(strret), + Time: l.First_ts}) + } else { + l.logger.Warningf("Event without source ?!") + } + } + + if len(sig.Sources) > 1 { + am = fmt.Sprintf("%d IPs", len(sig.Sources)) + } else if len(sig.Sources) == 1 { + if sig.Source != nil { + am = sig.Source.Ip.String() + } else { + am = "??" + } + } else { + am = "UNKNOWN" + } + + am += fmt.Sprintf(" performed '%s' (%d events over %s) at %s", l.Name, l.Total_count, l.Ovflw_ts.Sub(l.First_ts), l.Ovflw_ts) + sig.Alert_message = am + return sig +} diff --git a/pkg/leakybucket/queue.go b/pkg/leakybucket/queue.go index 9bbe04462..03130b71f 100644 --- a/pkg/leakybucket/queue.go +++ b/pkg/leakybucket/queue.go @@ -1,15 +1,11 @@ package leakybucket import ( - "reflect" - "github.com/crowdsecurity/crowdsec/pkg/types" log "github.com/sirupsen/logrus" ) -//A very simple queue mechanism to hold track of the objects in the bucket - -// Queue is a simple struct that holds a limited size queue +// Queue holds a limited size queue type Queue struct { Queue []types.Event L int //capacity @@ -40,37 +36,7 @@ func (q *Queue) Add(m types.Event) { q.Queue = append(q.Queue, m) } -//Remove removes and return the last element of the queue -func (q *Queue) Remove() *types.Event { - if len(q.Queue) > 0 { - var dropped types.Event = q.Queue[0] - q.Queue = q.Queue[1:] - return &dropped - } - return nil -} - // GetQueue returns the entire queue func (q *Queue) GetQueue() []types.Event { return q.Queue } - -// In test if evt is in the queue -func (q *Queue) In(evt types.Event) bool { - for _, element := range q.Queue { - if reflect.DeepEqual(element, evt) { - return true - } - } - return false -} - -// Len gives de the Len of queue -func (q *Queue) Len() int { - return len(q.Queue) -} - -// Size gives de the Size of queue -func (q *Queue) Size() int { - return q.L -} diff --git a/pkg/leakybucket/tests/leaky-fixedqueue/bucket.yaml b/pkg/leakybucket/tests/leaky-fixedqueue/bucket.yaml new file mode 100644 index 000000000..fae9e5fb2 --- /dev/null +++ b/pkg/leakybucket/tests/leaky-fixedqueue/bucket.yaml @@ -0,0 +1,12 @@ +type: leaky +debug: true +name: test/simple-leaky +description: "Simple leaky" +filter: "evt.Line.Labels.type =='testlog'" +leakspeed: "10s" +capacity: 5 +cache_size: 3 +groupby: evt.Meta.source_ip +labels: + type: overflow_1 + diff --git a/pkg/leakybucket/tests/leaky-fixedqueue/scenarios.yaml b/pkg/leakybucket/tests/leaky-fixedqueue/scenarios.yaml new file mode 100644 index 000000000..f45f7be12 --- /dev/null +++ b/pkg/leakybucket/tests/leaky-fixedqueue/scenarios.yaml @@ -0,0 +1,2 @@ + - filename: {{.TestDirectory}}/bucket.yaml + diff --git a/pkg/leakybucket/tests/leaky-fixedqueue/test.yaml b/pkg/leakybucket/tests/leaky-fixedqueue/test.yaml new file mode 100644 index 000000000..93097dac3 --- /dev/null +++ b/pkg/leakybucket/tests/leaky-fixedqueue/test.yaml @@ -0,0 +1,51 @@ +#this one will trigger a simple overflow +lines: + - Line: + Labels: + type: testlog + Raw: xxheader VALUE1 trailing stuff + MarshaledTime: 2020-01-01T10:00:00Z + Meta: + source_ip: 1.2.3.4 + - Line: + Labels: + type: testlog + Raw: xxheader VALUE2 trailing stuff + MarshaledTime: 2020-01-01T10:00:05Z + Meta: + source_ip: 1.2.3.4 + - Line: + Labels: + type: testlog + Raw: xxheader VALUE3 trailing stuff + MarshaledTime: 2020-01-01T10:00:05Z + Meta: + source_ip: 1.2.3.4 + - Line: + Labels: + type: testlog + Raw: xxheader VALUE4 trailing stuff + MarshaledTime: 2020-01-01T10:00:05Z + Meta: + source_ip: 1.2.3.4 + - Line: + Labels: + type: testlog + Raw: xxheader VALUE5 trailing stuff + MarshaledTime: 2020-01-01T10:00:05Z + Meta: + source_ip: 1.2.3.4 + - Line: + Labels: + type: testlog + Raw: xxheader VALUE6 trailing stuff + MarshaledTime: 2020-01-01T10:00:05Z + Meta: + source_ip: 1.2.3.4 +results: + - Overflow: + scenario: test/simple-leaky + Source_ip: 1.2.3.4 + Events_count: 6 + + diff --git a/pkg/leakybucket/tests/overflow-with-meta/bucket.yaml b/pkg/leakybucket/tests/overflow-with-meta/bucket.yaml new file mode 100644 index 000000000..378aff887 --- /dev/null +++ b/pkg/leakybucket/tests/overflow-with-meta/bucket.yaml @@ -0,0 +1,9 @@ +# ssh bruteforce +type: trigger +debug: true +name: test/simple-trigger +description: "Simple trigger" +filter: "evt.Line.Labels.type =='testlog'" +labels: + type: overflow_1 + diff --git a/pkg/leakybucket/tests/overflow-with-meta/scenarios.yaml b/pkg/leakybucket/tests/overflow-with-meta/scenarios.yaml new file mode 100644 index 000000000..f45f7be12 --- /dev/null +++ b/pkg/leakybucket/tests/overflow-with-meta/scenarios.yaml @@ -0,0 +1,2 @@ + - filename: {{.TestDirectory}}/bucket.yaml + diff --git a/pkg/leakybucket/tests/overflow-with-meta/test.yaml b/pkg/leakybucket/tests/overflow-with-meta/test.yaml new file mode 100644 index 000000000..1434489ce --- /dev/null +++ b/pkg/leakybucket/tests/overflow-with-meta/test.yaml @@ -0,0 +1,39 @@ +#this one won't due to leakspeed / delay +lines: + - Line: + Labels: + type: testlog + Raw: xxheader VALUE1 trailing stuff + MarshaledTime: 2020-01-01T10:00:00Z + Meta: + source_ip: 1.2.3.4 + uniq_key: aaa + Enriched: + ASNumber: 1234 + IsoCode: FR + ASNOrg: random AS + SourceRange: 1.2.3.0/24 + - Line: + Labels: + type: testlog + Raw: xxheader VALUE1 trailing stuff + MarshaledTime: 2020-01-01T10:00:00Z + Meta: + source_ip: 1.2.3.4 + uniq_key: aaa + Enriched: + ASNumber: 1234 + IsoCode: FR + ASNOrg: random AS + SourceRange: ratata +results: + - Overflow: + scenario: test/simple-trigger + Source_ip: 1.2.3.4 + Events_count: 1 + - Overflow: + scenario: test/simple-trigger + Source_ip: 1.2.3.4 + Events_count: 1 + + diff --git a/pkg/leakybucket/tests/simple-counter-bh/bucket.yaml b/pkg/leakybucket/tests/simple-counter-bh/bucket.yaml new file mode 100644 index 000000000..c53d31de5 --- /dev/null +++ b/pkg/leakybucket/tests/simple-counter-bh/bucket.yaml @@ -0,0 +1,11 @@ +type: counter +name: test/simple-trigger +description: "Simple leaky" +filter: "evt.Line.Labels.type =='testlog'" +duration: 1s +overflow_filter: any(queue.Queue, {.Meta.source_ip != '1.2.3.4'} ) +capacity: -1 +groupby: evt.Meta.source_ip +labels: + type: overflow_1 + diff --git a/pkg/leakybucket/tests/simple-counter-bh/scenarios.yaml b/pkg/leakybucket/tests/simple-counter-bh/scenarios.yaml new file mode 100644 index 000000000..f45f7be12 --- /dev/null +++ b/pkg/leakybucket/tests/simple-counter-bh/scenarios.yaml @@ -0,0 +1,2 @@ + - filename: {{.TestDirectory}}/bucket.yaml + diff --git a/pkg/leakybucket/tests/simple-counter-bh/test.yaml b/pkg/leakybucket/tests/simple-counter-bh/test.yaml new file mode 100644 index 000000000..fe457aef1 --- /dev/null +++ b/pkg/leakybucket/tests/simple-counter-bh/test.yaml @@ -0,0 +1,22 @@ +#this one will trigger a simple overflow +lines: + - Line: + Labels: + type: testlog + Raw: xxheader VALUE1 trailing stuff + MarshaledTime: 2020-01-01T10:00:00Z + Meta: + source_ip: 1.2.3.4 + - Line: + Labels: + type: testlog + Raw: xxheader VALUE1 trailing stuff + MarshaledTime: 2020-01-01T10:00:00Z + Meta: + source_ip: 1.2.3.4 +results: + - Overflow: + scenario: "" + + + diff --git a/pkg/leakybucket/tests/simple-counter-timeout/bucket.yaml b/pkg/leakybucket/tests/simple-counter-timeout/bucket.yaml new file mode 100644 index 000000000..1b8078a50 --- /dev/null +++ b/pkg/leakybucket/tests/simple-counter-timeout/bucket.yaml @@ -0,0 +1,10 @@ +type: counter +name: test/simple-trigger +description: "Simple leaky" +filter: "evt.Line.Labels.type =='testlog'" +duration: 10s +capacity: -1 +groupby: evt.Meta.source_ip +labels: + type: overflow_1 + diff --git a/pkg/leakybucket/tests/simple-counter-timeout/scenarios.yaml b/pkg/leakybucket/tests/simple-counter-timeout/scenarios.yaml new file mode 100644 index 000000000..f45f7be12 --- /dev/null +++ b/pkg/leakybucket/tests/simple-counter-timeout/scenarios.yaml @@ -0,0 +1,2 @@ + - filename: {{.TestDirectory}}/bucket.yaml + diff --git a/pkg/leakybucket/tests/simple-counter-timeout/test.yaml b/pkg/leakybucket/tests/simple-counter-timeout/test.yaml new file mode 100644 index 000000000..e58f4c828 --- /dev/null +++ b/pkg/leakybucket/tests/simple-counter-timeout/test.yaml @@ -0,0 +1,18 @@ +#this one will trigger a simple overflow +lines: + - Line: + Labels: + type: testlog + Raw: xxheader VALUE1 trailing stuff + MarshaledTime: 2020-01-01T10:00:00Z + Meta: + source_ip: 1.2.3.4 + - Line: + Labels: + type: testlog + Raw: xxheader VALUE1 trailing stuff + MarshaledTime: 2020-01-01T10:00:00Z + Meta: + source_ip: 1.2.3.4 +results: + diff --git a/pkg/leakybucket/tests/simple-counter/bucket.yaml b/pkg/leakybucket/tests/simple-counter/bucket.yaml new file mode 100644 index 000000000..50427aa98 --- /dev/null +++ b/pkg/leakybucket/tests/simple-counter/bucket.yaml @@ -0,0 +1,10 @@ +type: counter +name: test/simple-trigger +description: "Simple leaky" +filter: "evt.Line.Labels.type =='testlog'" +duration: 1s +capacity: -1 +groupby: evt.Meta.source_ip +labels: + type: overflow_1 + diff --git a/pkg/leakybucket/tests/simple-counter/scenarios.yaml b/pkg/leakybucket/tests/simple-counter/scenarios.yaml new file mode 100644 index 000000000..f45f7be12 --- /dev/null +++ b/pkg/leakybucket/tests/simple-counter/scenarios.yaml @@ -0,0 +1,2 @@ + - filename: {{.TestDirectory}}/bucket.yaml + diff --git a/pkg/leakybucket/tests/simple-counter/test.yaml b/pkg/leakybucket/tests/simple-counter/test.yaml new file mode 100644 index 000000000..d87dcdbe3 --- /dev/null +++ b/pkg/leakybucket/tests/simple-counter/test.yaml @@ -0,0 +1,23 @@ +#this one will trigger a simple overflow +lines: + - Line: + Labels: + type: testlog + Raw: xxheader VALUE1 trailing stuff + MarshaledTime: 2020-01-01T10:00:00Z + Meta: + source_ip: 1.2.3.4 + - Line: + Labels: + type: testlog + Raw: xxheader VALUE1 trailing stuff + MarshaledTime: 2020-01-01T10:00:00Z + Meta: + source_ip: 1.2.3.4 +results: + - Overflow: + scenario: test/simple-trigger + Source_ip: 1.2.3.4 + Events_count: 2 + + diff --git a/pkg/leakybucket/tests/simple-leaky-underflow/bucket.yaml b/pkg/leakybucket/tests/simple-leaky-underflow/bucket.yaml index e02e1c0a9..531766632 100644 --- a/pkg/leakybucket/tests/simple-leaky-underflow/bucket.yaml +++ b/pkg/leakybucket/tests/simple-leaky-underflow/bucket.yaml @@ -4,8 +4,8 @@ debug: true name: test/simple-leaky description: "Simple leaky" filter: "evt.Line.Labels.type =='testlog'" -leakspeed: "10s" -capacity: 1 +leakspeed: "0.5s" +capacity: 2 groupby: evt.Meta.source_ip labels: type: overflow_1 diff --git a/pkg/leakybucket/tests/simple-leaky-underflow/test.yaml b/pkg/leakybucket/tests/simple-leaky-underflow/test.yaml index 2c12f79aa..32da72930 100644 --- a/pkg/leakybucket/tests/simple-leaky-underflow/test.yaml +++ b/pkg/leakybucket/tests/simple-leaky-underflow/test.yaml @@ -7,14 +7,8 @@ lines: MarshaledTime: 2020-01-01T10:00:00Z Meta: source_ip: 1.2.3.4 - - Line: - Labels: - type: testlog - Raw: xxheader VALUE2 trailing stuff - MarshaledTime: 2020-01-01T10:00:10Z - Meta: - source_ip: 1.2.3.4 results: - + - Overflow: + scenario: ""