From e6a35e87146bdcb10b6d3eb35058291c6c717649 Mon Sep 17 00:00:00 2001 From: "Thibault \"bui\" Koechlin" Date: Tue, 19 Apr 2022 19:12:23 +0200 Subject: [PATCH] Improve plugins grouping (alternative to #1424) (#1437) * Fix races in test (#1446) Co-authored-by: Shivam Sandbhor Co-authored-by: alteredCoder Co-authored-by: AlteredCoder <64792091+AlteredCoder@users.noreply.github.com> --- pkg/csplugin/broker.go | 22 ++- pkg/csplugin/broker_test.go | 292 ++++++++++++++++++++++++++++++++- pkg/csplugin/watcher.go | 110 +++++++++++-- pkg/csplugin/watcher_test.go | 9 +- pkg/types/utils.go | 2 +- tests/bats/70_http_plugin.bats | 1 + tests/mock-http.py | 1 + 7 files changed, 403 insertions(+), 34 deletions(-) diff --git a/pkg/csplugin/broker.go b/pkg/csplugin/broker.go index a9ca22920..f64fafa6d 100644 --- a/pkg/csplugin/broker.go +++ b/pkg/csplugin/broker.go @@ -37,6 +37,9 @@ const ( CrowdsecPluginKey string = "CROWDSEC_PLUGIN_KEY" ) +//The broker is reponsible for running the plugins and dispatching events +//It receives all the events from the main process and stacks them up +//It is as well notified by the watcher when it needs to deliver events to plugins (based on time or count threshold) type PluginBroker struct { PluginChannel chan ProfileAlert alertsByPluginName map[string][]*models.Alert @@ -55,12 +58,12 @@ type PluginBroker struct { type PluginConfig struct { Type string `yaml:"type"` Name string `yaml:"name"` - GroupWait time.Duration `yaml:"group_wait"` - GroupThreshold int `yaml:"group_threshold"` - MaxRetry int `yaml:"max_retry"` - TimeOut time.Duration `yaml:"timeout"` + GroupWait time.Duration `yaml:"group_wait,omitempty"` + GroupThreshold int `yaml:"group_threshold,omitempty"` + MaxRetry int `yaml:"max_retry,omitempty"` + TimeOut time.Duration `yaml:"timeout,omitempty"` - Format string `yaml:"format"` // specific to notification plugins + Format string `yaml:"format,omitempty"` // specific to notification plugins Config map[string]interface{} `yaml:",inline"` //to keep the plugin-specific config @@ -100,6 +103,7 @@ func (pb *PluginBroker) Kill() { } func (pb *PluginBroker) Run(tomb *tomb.Tomb) { + //we get signaled via the channel when notifications need to be delivered to plugin (via the watcher) pb.watcher.Start(tomb) for { select { @@ -109,6 +113,7 @@ func (pb *PluginBroker) Run(tomb *tomb.Tomb) { case pluginName := <-pb.watcher.PluginEvents: // this can be ran in goroutine, but then locks will be needed pluginMutex.Lock() + log.Tracef("going to deliver %d alerts to plugin %s", len(pb.alertsByPluginName[pluginName]), pluginName) tmpAlerts := pb.alertsByPluginName[pluginName] pb.alertsByPluginName[pluginName] = make([]*models.Alert, 0) pluginMutex.Unlock() @@ -248,10 +253,12 @@ func (pb *PluginBroker) loadPlugins(path string) error { } func (pb *PluginBroker) loadNotificationPlugin(name string, binaryPath string) (Notifier, error) { + handshake, err := getHandshake() if err != nil { return nil, err } + log.Debugf("Executing plugin %s", binaryPath) cmd := exec.Command(binaryPath) if pb.pluginProcConfig.User != "" || pb.pluginProcConfig.Group != "" { if !(pb.pluginProcConfig.User != "" && pb.pluginProcConfig.Group != "") { @@ -293,7 +300,7 @@ func (pb *PluginBroker) loadNotificationPlugin(name string, binaryPath string) ( } func (pb *PluginBroker) pushNotificationsToPlugin(pluginName string, alerts []*models.Alert) error { - log.WithField("plugin", pluginName).Debug("pushing alerts to plugin") + log.WithField("plugin", pluginName).Debugf("pushing %d alerts to plugin", len(alerts)) if len(alerts) == 0 { return nil } @@ -356,9 +363,6 @@ func setRequiredFields(pluginCfg *PluginConfig) { pluginCfg.TimeOut = time.Second * 5 } - if pluginCfg.GroupWait == time.Second*0 { - pluginCfg.GroupWait = time.Second * 1 - } } func pluginIsValid(path string) error { diff --git a/pkg/csplugin/broker_test.go b/pkg/csplugin/broker_test.go index 57fe07f70..6befc893b 100644 --- a/pkg/csplugin/broker_test.go +++ b/pkg/csplugin/broker_test.go @@ -1,7 +1,8 @@ package csplugin import ( - "log" + "encoding/json" + "io/ioutil" "os" "os/exec" "path" @@ -9,12 +10,14 @@ import ( "testing" "time" + log "github.com/sirupsen/logrus" + "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/models" - "github.com/crowdsecurity/crowdsec/pkg/types" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "gopkg.in/tomb.v2" + "gopkg.in/yaml.v2" ) var testPath string @@ -252,17 +255,282 @@ func TestBrokerInit(t *testing.T) { } } -func TestBrokerRun(t *testing.T) { +func readconfig(t *testing.T, path string) ([]byte, PluginConfig) { + var config PluginConfig + orig, err := ioutil.ReadFile("tests/notifications/dummy.yaml") + if err != nil { + t.Fatalf("unable to read config file %s : %s", path, err) + } + if err := yaml.Unmarshal(orig, &config); err != nil { + t.Fatalf("unable to unmarshal config file : %s", err) + } + return orig, config +} + +func writeconfig(t *testing.T, config PluginConfig, path string) { + data, err := yaml.Marshal(&config) + if err != nil { + t.Fatalf("unable to marshal config file : %s", err) + } + if err := ioutil.WriteFile(path, data, 0644); err != nil { + t.Fatalf("unable to write config file %s : %s", path, err) + } +} + +func TestBrokerNoThreshold(t *testing.T) { + var alerts []models.Alert + DefaultEmptyTicker = 50 * time.Millisecond + buildDummyPlugin() setPluginPermTo744() defer tearDown() - procCfg := csconfig.PluginCfg{} + //init + pluginCfg := csconfig.PluginCfg{} pb := PluginBroker{} profiles := csconfig.NewDefaultConfig().API.Server.Profiles profiles = append(profiles, &csconfig.ProfileCfg{ Notifications: []string{"dummy_default"}, }) - err := pb.Init(&procCfg, profiles, &csconfig.ConfigurationPaths{ + //default config + err := pb.Init(&pluginCfg, profiles, &csconfig.ConfigurationPaths{ + PluginDir: testPath, + NotificationDir: "./tests/notifications", + }) + assert.NoError(t, err) + tomb := tomb.Tomb{} + go pb.Run(&tomb) + defer pb.Kill() + //send one item, it should be processed right now + pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}} + time.Sleep(200 * time.Millisecond) + //we expect one now + content, err := ioutil.ReadFile("./out") + if err != nil { + log.Errorf("Error reading file: %s", err) + } + err = json.Unmarshal(content, &alerts) + assert.NoError(t, err) + assert.Equal(t, 1, len(alerts)) + //remove it + os.Remove("./out") + //and another one + log.Printf("second send") + pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}} + time.Sleep(200 * time.Millisecond) + //we expect one again, as we cleaned the file + content, err = ioutil.ReadFile("./out") + if err != nil { + log.Errorf("Error reading file: %s", err) + } + err = json.Unmarshal(content, &alerts) + log.Printf("content-> %s", content) + assert.NoError(t, err) + assert.Equal(t, 1, len(alerts)) +} + +func TestBrokerRunGroupAndTimeThreshold_TimeFirst(t *testing.T) { + //test grouping by "time" + DefaultEmptyTicker = 50 * time.Millisecond + buildDummyPlugin() + setPluginPermTo744() + defer tearDown() + + //init + pluginCfg := csconfig.PluginCfg{} + pb := PluginBroker{} + profiles := csconfig.NewDefaultConfig().API.Server.Profiles + profiles = append(profiles, &csconfig.ProfileCfg{ + Notifications: []string{"dummy_default"}, + }) + //set groupwait and groupthreshold, should honor whichever comes first + raw, cfg := readconfig(t, "tests/notifications/dummy.yaml") + cfg.GroupThreshold = 4 + cfg.GroupWait = 1 * time.Second + writeconfig(t, cfg, "tests/notifications/dummy.yaml") + err := pb.Init(&pluginCfg, profiles, &csconfig.ConfigurationPaths{ + PluginDir: testPath, + NotificationDir: "./tests/notifications", + }) + assert.NoError(t, err) + tomb := tomb.Tomb{} + go pb.Run(&tomb) + defer pb.Kill() + //send data + pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}} + pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}} + pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}} + time.Sleep(500 * time.Millisecond) + //because of group threshold, we shouldn't have data yet + assert.NoFileExists(t, "./out") + time.Sleep(1 * time.Second) + //after 1 seconds, we should have data + content, err := ioutil.ReadFile("./out") + assert.NoError(t, err) + var alerts []models.Alert + err = json.Unmarshal(content, &alerts) + assert.NoError(t, err) + assert.Equal(t, 3, len(alerts)) + //restore config + if err := ioutil.WriteFile("tests/notifications/dummy.yaml", raw, 0644); err != nil { + t.Fatalf("unable to write config file %s", err) + } +} + +func TestBrokerRunGroupAndTimeThreshold_CountFirst(t *testing.T) { + DefaultEmptyTicker = 50 * time.Millisecond + buildDummyPlugin() + setPluginPermTo744() + defer tearDown() + //init + pluginCfg := csconfig.PluginCfg{} + pb := PluginBroker{} + profiles := csconfig.NewDefaultConfig().API.Server.Profiles + profiles = append(profiles, &csconfig.ProfileCfg{ + Notifications: []string{"dummy_default"}, + }) + //set groupwait and groupthreshold, should honor whichever comes first + raw, cfg := readconfig(t, "tests/notifications/dummy.yaml") + cfg.GroupThreshold = 4 + cfg.GroupWait = 4 * time.Second + writeconfig(t, cfg, "tests/notifications/dummy.yaml") + err := pb.Init(&pluginCfg, profiles, &csconfig.ConfigurationPaths{ + PluginDir: testPath, + NotificationDir: "./tests/notifications", + }) + assert.NoError(t, err) + tomb := tomb.Tomb{} + go pb.Run(&tomb) + defer pb.Kill() + //send data + pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}} + pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}} + pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}} + time.Sleep(100 * time.Millisecond) + //because of group threshold, we shouldn't have data yet + assert.NoFileExists(t, "./out") + pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}} + time.Sleep(100 * time.Millisecond) + //and now we should + content, err := ioutil.ReadFile("./out") + if err != nil { + log.Errorf("Error reading file: %s", err) + } + var alerts []models.Alert + err = json.Unmarshal(content, &alerts) + assert.NoError(t, err) + assert.Equal(t, 4, len(alerts)) + //restore config + if err := ioutil.WriteFile("tests/notifications/dummy.yaml", raw, 0644); err != nil { + t.Fatalf("unable to write config file %s", err) + } +} + +func TestBrokerRunGroupThreshold(t *testing.T) { + //test grouping by "size" + DefaultEmptyTicker = 50 * time.Millisecond + buildDummyPlugin() + setPluginPermTo744() + defer tearDown() + //init + pluginCfg := csconfig.PluginCfg{} + pb := PluginBroker{} + profiles := csconfig.NewDefaultConfig().API.Server.Profiles + profiles = append(profiles, &csconfig.ProfileCfg{ + Notifications: []string{"dummy_default"}, + }) + //set groupwait + raw, cfg := readconfig(t, "tests/notifications/dummy.yaml") + cfg.GroupThreshold = 4 + writeconfig(t, cfg, "tests/notifications/dummy.yaml") + err := pb.Init(&pluginCfg, profiles, &csconfig.ConfigurationPaths{ + PluginDir: testPath, + NotificationDir: "./tests/notifications", + }) + assert.NoError(t, err) + tomb := tomb.Tomb{} + go pb.Run(&tomb) + defer pb.Kill() + //send data + pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}} + pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}} + pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}} + time.Sleep(100 * time.Millisecond) + //because of group threshold, we shouldn't have data yet + assert.NoFileExists(t, "./out") + pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}} + time.Sleep(100 * time.Millisecond) + //and now we should + content, err := ioutil.ReadFile("./out") + if err != nil { + log.Errorf("Error reading file: %s", err) + } + var alerts []models.Alert + err = json.Unmarshal(content, &alerts) + assert.NoError(t, err) + assert.Equal(t, 4, len(alerts)) + //restore config + if err := ioutil.WriteFile("tests/notifications/dummy.yaml", raw, 0644); err != nil { + t.Fatalf("unable to write config file %s", err) + } +} + +func TestBrokerRunTimeThreshold(t *testing.T) { + DefaultEmptyTicker = 50 * time.Millisecond + buildDummyPlugin() + setPluginPermTo744() + defer tearDown() + //init + pluginCfg := csconfig.PluginCfg{} + pb := PluginBroker{} + profiles := csconfig.NewDefaultConfig().API.Server.Profiles + profiles = append(profiles, &csconfig.ProfileCfg{ + Notifications: []string{"dummy_default"}, + }) + //set groupwait + raw, cfg := readconfig(t, "tests/notifications/dummy.yaml") + cfg.GroupWait = time.Duration(1 * time.Second) + writeconfig(t, cfg, "tests/notifications/dummy.yaml") + err := pb.Init(&pluginCfg, profiles, &csconfig.ConfigurationPaths{ + PluginDir: testPath, + NotificationDir: "./tests/notifications", + }) + assert.NoError(t, err) + tomb := tomb.Tomb{} + go pb.Run(&tomb) + defer pb.Kill() + //send data + pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}} + time.Sleep(200 * time.Millisecond) + //we shouldn't have data yet + assert.NoFileExists(t, "./out") + time.Sleep(1 * time.Second) + //and now we should + content, err := ioutil.ReadFile("./out") + if err != nil { + log.Errorf("Error reading file: %s", err) + } + var alerts []models.Alert + err = json.Unmarshal(content, &alerts) + assert.NoError(t, err) + assert.Equal(t, 1, len(alerts)) + //restore config + if err := ioutil.WriteFile("tests/notifications/dummy.yaml", raw, 0644); err != nil { + t.Fatalf("unable to write config file %s", err) + } +} + +func TestBrokerRunSimple(t *testing.T) { + DefaultEmptyTicker = 50 * time.Millisecond + buildDummyPlugin() + setPluginPermTo744() + defer tearDown() + pluginCfg := csconfig.PluginCfg{} + pb := PluginBroker{} + profiles := csconfig.NewDefaultConfig().API.Server.Profiles + profiles = append(profiles, &csconfig.ProfileCfg{ + Notifications: []string{"dummy_default"}, + }) + err := pb.Init(&pluginCfg, profiles, &csconfig.ConfigurationPaths{ PluginDir: testPath, NotificationDir: "./tests/notifications", }) @@ -276,10 +544,16 @@ func TestBrokerRun(t *testing.T) { pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}} pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}} - time.Sleep(time.Second * 4) + time.Sleep(time.Millisecond * 200) - assert.FileExists(t, "./out") - assert.Equal(t, types.GetLineCountForFile("./out"), 2) + content, err := ioutil.ReadFile("./out") + if err != nil { + log.Errorf("Error reading file: %s", err) + } + var alerts []models.Alert + err = json.Unmarshal(content, &alerts) + assert.NoError(t, err) + assert.Equal(t, 2, len(alerts)) } func buildDummyPlugin() { @@ -292,6 +566,7 @@ func buildDummyPlugin() { log.Fatal(err) } testPath = dir + os.Remove("./out") } func setPluginPermTo(perm string) { @@ -325,4 +600,5 @@ func tearDown() { if err != nil { log.Fatal(err) } + os.Remove("./out") } diff --git a/pkg/csplugin/watcher.go b/pkg/csplugin/watcher.go index 7ace2911f..955133a4a 100644 --- a/pkg/csplugin/watcher.go +++ b/pkg/csplugin/watcher.go @@ -1,27 +1,66 @@ package csplugin import ( + "sync" "time" "github.com/crowdsecurity/crowdsec/pkg/models" + log "github.com/sirupsen/logrus" "gopkg.in/tomb.v2" ) +/* + PluginWatcher is here to allow grouping and threshold features for notification plugins : + by frequency : it will signal the plugin to deliver notifications at this frequence (watchPluginTicker) + by threshold : it will signal the plugin to deliver notifications when the number of alerts for this plugin reaches this threshold (watchPluginAlertCounts) +*/ + +// TODO: When we start using go 1.18, consider moving this struct in some utils pkg. Make the implementation more generic using generics :) +type alertCounterByPluginName struct { + sync.Mutex + data map[string]int +} + +func newAlertCounterByPluginName() alertCounterByPluginName { + return alertCounterByPluginName{ + data: make(map[string]int), + } +} + +func (acp *alertCounterByPluginName) Init() { + acp.data = make(map[string]int) +} + +func (acp *alertCounterByPluginName) Get(key string) (int, bool) { + acp.Lock() + val, ok := acp.data[key] + acp.Unlock() + return val, ok +} + +func (acp *alertCounterByPluginName) Set(key string, val int) { + acp.Lock() + acp.data[key] = val + acp.Unlock() +} + type PluginWatcher struct { PluginConfigByName map[string]PluginConfig - AlertCountByPluginName map[string]int + AlertCountByPluginName alertCounterByPluginName PluginEvents chan string Inserts chan string tomb *tomb.Tomb } +var DefaultEmptyTicker = time.Second * 1 + func (pw *PluginWatcher) Init(configs map[string]PluginConfig, alertsByPluginName map[string][]*models.Alert) { pw.PluginConfigByName = configs pw.PluginEvents = make(chan string) - pw.AlertCountByPluginName = make(map[string]int) + pw.AlertCountByPluginName = newAlertCounterByPluginName() pw.Inserts = make(chan string) for name := range alertsByPluginName { - pw.AlertCountByPluginName[name] = 0 + pw.AlertCountByPluginName.Set(name, 0) } } @@ -42,15 +81,62 @@ func (pw *PluginWatcher) Start(tomb *tomb.Tomb) { } func (pw *PluginWatcher) watchPluginTicker(pluginName string) { - if pw.PluginConfigByName[pluginName].GroupWait <= time.Second*0 { - return + var watchTime time.Duration + var watchCount int = -1 + // Threshold can be set : by time, by count, or both + // if only time is set, honor it + // if only count is set, put timer to 1 second and just check size + // if both are set, set timer to 1 second, but check size && time + interval := pw.PluginConfigByName[pluginName].GroupWait + threshold := pw.PluginConfigByName[pluginName].GroupThreshold + + //only size is set + if threshold > 0 && interval == 0 { + watchCount = threshold + watchTime = DefaultEmptyTicker + } else if interval != 0 && threshold == 0 { + //only time is set + watchTime = interval + } else if interval != 0 && threshold != 0 { + //both are set + watchTime = DefaultEmptyTicker + watchCount = threshold + } else { + //none are set, we sent every event we receive + watchTime = DefaultEmptyTicker + watchCount = 1 } - ticker := time.NewTicker(pw.PluginConfigByName[pluginName].GroupWait) + + ticker := time.NewTicker(watchTime) + var lastSend time.Time = time.Now() for { select { case <-ticker.C: - pw.PluginEvents <- pluginName + send := false + //if count threshold was set, honor no matter what + if pc, _ := pw.AlertCountByPluginName.Get(pluginName); watchCount > 0 && pc >= watchCount { + log.Tracef("[%s] %d alerts received, sending\n", pluginName, pc) + send = true + pw.AlertCountByPluginName.Set(pluginName, 0) + } + //if time threshold only was set + if watchTime > 0 && watchTime == interval { + log.Tracef("sending alerts to %s, duration %s elapsed", pluginName, interval) + send = true + } + //if we hit timer because it was set low to honor count, check if we should trigger + if watchTime == DefaultEmptyTicker && watchTime != interval && interval != 0 { + if lastSend.Add(interval).Before(time.Now()) { + log.Tracef("sending alerts to %s, duration %s elapsed", pluginName, interval) + send = true + lastSend = time.Now() + } + } + if send { + log.Tracef("sending alerts to %s", pluginName) + pw.PluginEvents <- pluginName + } case <-pw.tomb.Dying(): ticker.Stop() return @@ -62,12 +148,10 @@ func (pw *PluginWatcher) watchPluginAlertCounts() { for { select { case pluginName := <-pw.Inserts: - if threshold := pw.PluginConfigByName[pluginName].GroupThreshold; threshold > 0 { - pw.AlertCountByPluginName[pluginName]++ - if pw.AlertCountByPluginName[pluginName] >= threshold { - pw.PluginEvents <- pluginName - pw.AlertCountByPluginName[pluginName] = 0 - } + //we only "count" pending alerts, and watchPluginTicker is actually going to send it + if _, ok := pw.PluginConfigByName[pluginName]; ok { + curr, _ := pw.AlertCountByPluginName.Get(pluginName) + pw.AlertCountByPluginName.Set(pluginName, curr+1) } case <-pw.tomb.Dying(): return diff --git a/pkg/csplugin/watcher_test.go b/pkg/csplugin/watcher_test.go index 41ddded9c..acc23f29a 100644 --- a/pkg/csplugin/watcher_test.go +++ b/pkg/csplugin/watcher_test.go @@ -21,9 +21,11 @@ func resetTestTomb(testTomb *tomb.Tomb) { } func resetWatcherAlertCounter(pw *PluginWatcher) { - for k := range pw.AlertCountByPluginName { - pw.AlertCountByPluginName[k] = 0 + pw.AlertCountByPluginName.Lock() + for k := range pw.AlertCountByPluginName.data { + pw.AlertCountByPluginName.data[k] = 0 } + pw.AlertCountByPluginName.Unlock() } func insertNAlertsToPlugin(pw *PluginWatcher, n int, pluginName string) { @@ -34,7 +36,8 @@ func insertNAlertsToPlugin(pw *PluginWatcher, n int, pluginName string) { func listenChannelWithTimeout(ctx context.Context, channel chan string) error { select { - case <-channel: + case x := <-channel: + log.Printf("received -> %v", x) case <-ctx.Done(): return ctx.Err() } diff --git a/pkg/types/utils.go b/pkg/types/utils.go index 5bdee7b04..0315465cb 100644 --- a/pkg/types/utils.go +++ b/pkg/types/utils.go @@ -248,7 +248,7 @@ func UtcNow() time.Time { func GetLineCountForFile(filepath string) int { f, err := os.Open(filepath) if err != nil { - log.Fatalf("unable to open log file %s", filepath) + log.Fatalf("unable to open log file %s : %s", filepath, err) } defer f.Close() lc := 0 diff --git a/tests/bats/70_http_plugin.bats b/tests/bats/70_http_plugin.bats index ce691dbea..c56aaf4ef 100644 --- a/tests/bats/70_http_plugin.bats +++ b/tests/bats/70_http_plugin.bats @@ -54,6 +54,7 @@ setup() { run -0 cscli decisions add --ip 1.2.3.5 --duration 30s assert_output --partial 'Decision successfully added' + sleep 5 } @test "$FILE expected 1 log line from http server" { diff --git a/tests/mock-http.py b/tests/mock-http.py index 029c9f971..3f26271b4 100644 --- a/tests/mock-http.py +++ b/tests/mock-http.py @@ -21,6 +21,7 @@ class RequestHandler(BaseHTTPRequestHandler): self.send_header('Content-type','application/json') self.end_headers() self.wfile.write(json.dumps({}).encode()) + self.wfile.flush() return def log_message(self, format, *args):