Improve plugins grouping (alternative to #1424) (#1437)

* Fix races in test (#1446)

Co-authored-by: Shivam Sandbhor <shivam.sandbhor@gmail.com>
Co-authored-by: alteredCoder <kevin@crowdsec.net>
Co-authored-by: AlteredCoder <64792091+AlteredCoder@users.noreply.github.com>
This commit is contained in:
Thibault "bui" Koechlin 2022-04-19 19:12:23 +02:00 committed by GitHub
parent 0f7f21a7db
commit e6a35e8714
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 403 additions and 34 deletions

View file

@ -37,6 +37,9 @@ const (
CrowdsecPluginKey string = "CROWDSEC_PLUGIN_KEY"
)
//The broker is reponsible for running the plugins and dispatching events
//It receives all the events from the main process and stacks them up
//It is as well notified by the watcher when it needs to deliver events to plugins (based on time or count threshold)
type PluginBroker struct {
PluginChannel chan ProfileAlert
alertsByPluginName map[string][]*models.Alert
@ -55,12 +58,12 @@ type PluginBroker struct {
type PluginConfig struct {
Type string `yaml:"type"`
Name string `yaml:"name"`
GroupWait time.Duration `yaml:"group_wait"`
GroupThreshold int `yaml:"group_threshold"`
MaxRetry int `yaml:"max_retry"`
TimeOut time.Duration `yaml:"timeout"`
GroupWait time.Duration `yaml:"group_wait,omitempty"`
GroupThreshold int `yaml:"group_threshold,omitempty"`
MaxRetry int `yaml:"max_retry,omitempty"`
TimeOut time.Duration `yaml:"timeout,omitempty"`
Format string `yaml:"format"` // specific to notification plugins
Format string `yaml:"format,omitempty"` // specific to notification plugins
Config map[string]interface{} `yaml:",inline"` //to keep the plugin-specific config
@ -100,6 +103,7 @@ func (pb *PluginBroker) Kill() {
}
func (pb *PluginBroker) Run(tomb *tomb.Tomb) {
//we get signaled via the channel when notifications need to be delivered to plugin (via the watcher)
pb.watcher.Start(tomb)
for {
select {
@ -109,6 +113,7 @@ func (pb *PluginBroker) Run(tomb *tomb.Tomb) {
case pluginName := <-pb.watcher.PluginEvents:
// this can be ran in goroutine, but then locks will be needed
pluginMutex.Lock()
log.Tracef("going to deliver %d alerts to plugin %s", len(pb.alertsByPluginName[pluginName]), pluginName)
tmpAlerts := pb.alertsByPluginName[pluginName]
pb.alertsByPluginName[pluginName] = make([]*models.Alert, 0)
pluginMutex.Unlock()
@ -248,10 +253,12 @@ func (pb *PluginBroker) loadPlugins(path string) error {
}
func (pb *PluginBroker) loadNotificationPlugin(name string, binaryPath string) (Notifier, error) {
handshake, err := getHandshake()
if err != nil {
return nil, err
}
log.Debugf("Executing plugin %s", binaryPath)
cmd := exec.Command(binaryPath)
if pb.pluginProcConfig.User != "" || pb.pluginProcConfig.Group != "" {
if !(pb.pluginProcConfig.User != "" && pb.pluginProcConfig.Group != "") {
@ -293,7 +300,7 @@ func (pb *PluginBroker) loadNotificationPlugin(name string, binaryPath string) (
}
func (pb *PluginBroker) pushNotificationsToPlugin(pluginName string, alerts []*models.Alert) error {
log.WithField("plugin", pluginName).Debug("pushing alerts to plugin")
log.WithField("plugin", pluginName).Debugf("pushing %d alerts to plugin", len(alerts))
if len(alerts) == 0 {
return nil
}
@ -356,9 +363,6 @@ func setRequiredFields(pluginCfg *PluginConfig) {
pluginCfg.TimeOut = time.Second * 5
}
if pluginCfg.GroupWait == time.Second*0 {
pluginCfg.GroupWait = time.Second * 1
}
}
func pluginIsValid(path string) error {

View file

@ -1,7 +1,8 @@
package csplugin
import (
"log"
"encoding/json"
"io/ioutil"
"os"
"os/exec"
"path"
@ -9,12 +10,14 @@ import (
"testing"
"time"
log "github.com/sirupsen/logrus"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
"github.com/crowdsecurity/crowdsec/pkg/models"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"gopkg.in/tomb.v2"
"gopkg.in/yaml.v2"
)
var testPath string
@ -252,17 +255,282 @@ func TestBrokerInit(t *testing.T) {
}
}
func TestBrokerRun(t *testing.T) {
func readconfig(t *testing.T, path string) ([]byte, PluginConfig) {
var config PluginConfig
orig, err := ioutil.ReadFile("tests/notifications/dummy.yaml")
if err != nil {
t.Fatalf("unable to read config file %s : %s", path, err)
}
if err := yaml.Unmarshal(orig, &config); err != nil {
t.Fatalf("unable to unmarshal config file : %s", err)
}
return orig, config
}
func writeconfig(t *testing.T, config PluginConfig, path string) {
data, err := yaml.Marshal(&config)
if err != nil {
t.Fatalf("unable to marshal config file : %s", err)
}
if err := ioutil.WriteFile(path, data, 0644); err != nil {
t.Fatalf("unable to write config file %s : %s", path, err)
}
}
func TestBrokerNoThreshold(t *testing.T) {
var alerts []models.Alert
DefaultEmptyTicker = 50 * time.Millisecond
buildDummyPlugin()
setPluginPermTo744()
defer tearDown()
procCfg := csconfig.PluginCfg{}
//init
pluginCfg := csconfig.PluginCfg{}
pb := PluginBroker{}
profiles := csconfig.NewDefaultConfig().API.Server.Profiles
profiles = append(profiles, &csconfig.ProfileCfg{
Notifications: []string{"dummy_default"},
})
err := pb.Init(&procCfg, profiles, &csconfig.ConfigurationPaths{
//default config
err := pb.Init(&pluginCfg, profiles, &csconfig.ConfigurationPaths{
PluginDir: testPath,
NotificationDir: "./tests/notifications",
})
assert.NoError(t, err)
tomb := tomb.Tomb{}
go pb.Run(&tomb)
defer pb.Kill()
//send one item, it should be processed right now
pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
time.Sleep(200 * time.Millisecond)
//we expect one now
content, err := ioutil.ReadFile("./out")
if err != nil {
log.Errorf("Error reading file: %s", err)
}
err = json.Unmarshal(content, &alerts)
assert.NoError(t, err)
assert.Equal(t, 1, len(alerts))
//remove it
os.Remove("./out")
//and another one
log.Printf("second send")
pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
time.Sleep(200 * time.Millisecond)
//we expect one again, as we cleaned the file
content, err = ioutil.ReadFile("./out")
if err != nil {
log.Errorf("Error reading file: %s", err)
}
err = json.Unmarshal(content, &alerts)
log.Printf("content-> %s", content)
assert.NoError(t, err)
assert.Equal(t, 1, len(alerts))
}
func TestBrokerRunGroupAndTimeThreshold_TimeFirst(t *testing.T) {
//test grouping by "time"
DefaultEmptyTicker = 50 * time.Millisecond
buildDummyPlugin()
setPluginPermTo744()
defer tearDown()
//init
pluginCfg := csconfig.PluginCfg{}
pb := PluginBroker{}
profiles := csconfig.NewDefaultConfig().API.Server.Profiles
profiles = append(profiles, &csconfig.ProfileCfg{
Notifications: []string{"dummy_default"},
})
//set groupwait and groupthreshold, should honor whichever comes first
raw, cfg := readconfig(t, "tests/notifications/dummy.yaml")
cfg.GroupThreshold = 4
cfg.GroupWait = 1 * time.Second
writeconfig(t, cfg, "tests/notifications/dummy.yaml")
err := pb.Init(&pluginCfg, profiles, &csconfig.ConfigurationPaths{
PluginDir: testPath,
NotificationDir: "./tests/notifications",
})
assert.NoError(t, err)
tomb := tomb.Tomb{}
go pb.Run(&tomb)
defer pb.Kill()
//send data
pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
time.Sleep(500 * time.Millisecond)
//because of group threshold, we shouldn't have data yet
assert.NoFileExists(t, "./out")
time.Sleep(1 * time.Second)
//after 1 seconds, we should have data
content, err := ioutil.ReadFile("./out")
assert.NoError(t, err)
var alerts []models.Alert
err = json.Unmarshal(content, &alerts)
assert.NoError(t, err)
assert.Equal(t, 3, len(alerts))
//restore config
if err := ioutil.WriteFile("tests/notifications/dummy.yaml", raw, 0644); err != nil {
t.Fatalf("unable to write config file %s", err)
}
}
func TestBrokerRunGroupAndTimeThreshold_CountFirst(t *testing.T) {
DefaultEmptyTicker = 50 * time.Millisecond
buildDummyPlugin()
setPluginPermTo744()
defer tearDown()
//init
pluginCfg := csconfig.PluginCfg{}
pb := PluginBroker{}
profiles := csconfig.NewDefaultConfig().API.Server.Profiles
profiles = append(profiles, &csconfig.ProfileCfg{
Notifications: []string{"dummy_default"},
})
//set groupwait and groupthreshold, should honor whichever comes first
raw, cfg := readconfig(t, "tests/notifications/dummy.yaml")
cfg.GroupThreshold = 4
cfg.GroupWait = 4 * time.Second
writeconfig(t, cfg, "tests/notifications/dummy.yaml")
err := pb.Init(&pluginCfg, profiles, &csconfig.ConfigurationPaths{
PluginDir: testPath,
NotificationDir: "./tests/notifications",
})
assert.NoError(t, err)
tomb := tomb.Tomb{}
go pb.Run(&tomb)
defer pb.Kill()
//send data
pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
time.Sleep(100 * time.Millisecond)
//because of group threshold, we shouldn't have data yet
assert.NoFileExists(t, "./out")
pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
time.Sleep(100 * time.Millisecond)
//and now we should
content, err := ioutil.ReadFile("./out")
if err != nil {
log.Errorf("Error reading file: %s", err)
}
var alerts []models.Alert
err = json.Unmarshal(content, &alerts)
assert.NoError(t, err)
assert.Equal(t, 4, len(alerts))
//restore config
if err := ioutil.WriteFile("tests/notifications/dummy.yaml", raw, 0644); err != nil {
t.Fatalf("unable to write config file %s", err)
}
}
func TestBrokerRunGroupThreshold(t *testing.T) {
//test grouping by "size"
DefaultEmptyTicker = 50 * time.Millisecond
buildDummyPlugin()
setPluginPermTo744()
defer tearDown()
//init
pluginCfg := csconfig.PluginCfg{}
pb := PluginBroker{}
profiles := csconfig.NewDefaultConfig().API.Server.Profiles
profiles = append(profiles, &csconfig.ProfileCfg{
Notifications: []string{"dummy_default"},
})
//set groupwait
raw, cfg := readconfig(t, "tests/notifications/dummy.yaml")
cfg.GroupThreshold = 4
writeconfig(t, cfg, "tests/notifications/dummy.yaml")
err := pb.Init(&pluginCfg, profiles, &csconfig.ConfigurationPaths{
PluginDir: testPath,
NotificationDir: "./tests/notifications",
})
assert.NoError(t, err)
tomb := tomb.Tomb{}
go pb.Run(&tomb)
defer pb.Kill()
//send data
pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
time.Sleep(100 * time.Millisecond)
//because of group threshold, we shouldn't have data yet
assert.NoFileExists(t, "./out")
pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
time.Sleep(100 * time.Millisecond)
//and now we should
content, err := ioutil.ReadFile("./out")
if err != nil {
log.Errorf("Error reading file: %s", err)
}
var alerts []models.Alert
err = json.Unmarshal(content, &alerts)
assert.NoError(t, err)
assert.Equal(t, 4, len(alerts))
//restore config
if err := ioutil.WriteFile("tests/notifications/dummy.yaml", raw, 0644); err != nil {
t.Fatalf("unable to write config file %s", err)
}
}
func TestBrokerRunTimeThreshold(t *testing.T) {
DefaultEmptyTicker = 50 * time.Millisecond
buildDummyPlugin()
setPluginPermTo744()
defer tearDown()
//init
pluginCfg := csconfig.PluginCfg{}
pb := PluginBroker{}
profiles := csconfig.NewDefaultConfig().API.Server.Profiles
profiles = append(profiles, &csconfig.ProfileCfg{
Notifications: []string{"dummy_default"},
})
//set groupwait
raw, cfg := readconfig(t, "tests/notifications/dummy.yaml")
cfg.GroupWait = time.Duration(1 * time.Second)
writeconfig(t, cfg, "tests/notifications/dummy.yaml")
err := pb.Init(&pluginCfg, profiles, &csconfig.ConfigurationPaths{
PluginDir: testPath,
NotificationDir: "./tests/notifications",
})
assert.NoError(t, err)
tomb := tomb.Tomb{}
go pb.Run(&tomb)
defer pb.Kill()
//send data
pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
time.Sleep(200 * time.Millisecond)
//we shouldn't have data yet
assert.NoFileExists(t, "./out")
time.Sleep(1 * time.Second)
//and now we should
content, err := ioutil.ReadFile("./out")
if err != nil {
log.Errorf("Error reading file: %s", err)
}
var alerts []models.Alert
err = json.Unmarshal(content, &alerts)
assert.NoError(t, err)
assert.Equal(t, 1, len(alerts))
//restore config
if err := ioutil.WriteFile("tests/notifications/dummy.yaml", raw, 0644); err != nil {
t.Fatalf("unable to write config file %s", err)
}
}
func TestBrokerRunSimple(t *testing.T) {
DefaultEmptyTicker = 50 * time.Millisecond
buildDummyPlugin()
setPluginPermTo744()
defer tearDown()
pluginCfg := csconfig.PluginCfg{}
pb := PluginBroker{}
profiles := csconfig.NewDefaultConfig().API.Server.Profiles
profiles = append(profiles, &csconfig.ProfileCfg{
Notifications: []string{"dummy_default"},
})
err := pb.Init(&pluginCfg, profiles, &csconfig.ConfigurationPaths{
PluginDir: testPath,
NotificationDir: "./tests/notifications",
})
@ -276,10 +544,16 @@ func TestBrokerRun(t *testing.T) {
pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
pb.PluginChannel <- ProfileAlert{ProfileID: uint(0), Alert: &models.Alert{}}
time.Sleep(time.Second * 4)
time.Sleep(time.Millisecond * 200)
assert.FileExists(t, "./out")
assert.Equal(t, types.GetLineCountForFile("./out"), 2)
content, err := ioutil.ReadFile("./out")
if err != nil {
log.Errorf("Error reading file: %s", err)
}
var alerts []models.Alert
err = json.Unmarshal(content, &alerts)
assert.NoError(t, err)
assert.Equal(t, 2, len(alerts))
}
func buildDummyPlugin() {
@ -292,6 +566,7 @@ func buildDummyPlugin() {
log.Fatal(err)
}
testPath = dir
os.Remove("./out")
}
func setPluginPermTo(perm string) {
@ -325,4 +600,5 @@ func tearDown() {
if err != nil {
log.Fatal(err)
}
os.Remove("./out")
}

View file

@ -1,27 +1,66 @@
package csplugin
import (
"sync"
"time"
"github.com/crowdsecurity/crowdsec/pkg/models"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
)
/*
PluginWatcher is here to allow grouping and threshold features for notification plugins :
by frequency : it will signal the plugin to deliver notifications at this frequence (watchPluginTicker)
by threshold : it will signal the plugin to deliver notifications when the number of alerts for this plugin reaches this threshold (watchPluginAlertCounts)
*/
// TODO: When we start using go 1.18, consider moving this struct in some utils pkg. Make the implementation more generic using generics :)
type alertCounterByPluginName struct {
sync.Mutex
data map[string]int
}
func newAlertCounterByPluginName() alertCounterByPluginName {
return alertCounterByPluginName{
data: make(map[string]int),
}
}
func (acp *alertCounterByPluginName) Init() {
acp.data = make(map[string]int)
}
func (acp *alertCounterByPluginName) Get(key string) (int, bool) {
acp.Lock()
val, ok := acp.data[key]
acp.Unlock()
return val, ok
}
func (acp *alertCounterByPluginName) Set(key string, val int) {
acp.Lock()
acp.data[key] = val
acp.Unlock()
}
type PluginWatcher struct {
PluginConfigByName map[string]PluginConfig
AlertCountByPluginName map[string]int
AlertCountByPluginName alertCounterByPluginName
PluginEvents chan string
Inserts chan string
tomb *tomb.Tomb
}
var DefaultEmptyTicker = time.Second * 1
func (pw *PluginWatcher) Init(configs map[string]PluginConfig, alertsByPluginName map[string][]*models.Alert) {
pw.PluginConfigByName = configs
pw.PluginEvents = make(chan string)
pw.AlertCountByPluginName = make(map[string]int)
pw.AlertCountByPluginName = newAlertCounterByPluginName()
pw.Inserts = make(chan string)
for name := range alertsByPluginName {
pw.AlertCountByPluginName[name] = 0
pw.AlertCountByPluginName.Set(name, 0)
}
}
@ -42,15 +81,62 @@ func (pw *PluginWatcher) Start(tomb *tomb.Tomb) {
}
func (pw *PluginWatcher) watchPluginTicker(pluginName string) {
if pw.PluginConfigByName[pluginName].GroupWait <= time.Second*0 {
return
var watchTime time.Duration
var watchCount int = -1
// Threshold can be set : by time, by count, or both
// if only time is set, honor it
// if only count is set, put timer to 1 second and just check size
// if both are set, set timer to 1 second, but check size && time
interval := pw.PluginConfigByName[pluginName].GroupWait
threshold := pw.PluginConfigByName[pluginName].GroupThreshold
//only size is set
if threshold > 0 && interval == 0 {
watchCount = threshold
watchTime = DefaultEmptyTicker
} else if interval != 0 && threshold == 0 {
//only time is set
watchTime = interval
} else if interval != 0 && threshold != 0 {
//both are set
watchTime = DefaultEmptyTicker
watchCount = threshold
} else {
//none are set, we sent every event we receive
watchTime = DefaultEmptyTicker
watchCount = 1
}
ticker := time.NewTicker(pw.PluginConfigByName[pluginName].GroupWait)
ticker := time.NewTicker(watchTime)
var lastSend time.Time = time.Now()
for {
select {
case <-ticker.C:
pw.PluginEvents <- pluginName
send := false
//if count threshold was set, honor no matter what
if pc, _ := pw.AlertCountByPluginName.Get(pluginName); watchCount > 0 && pc >= watchCount {
log.Tracef("[%s] %d alerts received, sending\n", pluginName, pc)
send = true
pw.AlertCountByPluginName.Set(pluginName, 0)
}
//if time threshold only was set
if watchTime > 0 && watchTime == interval {
log.Tracef("sending alerts to %s, duration %s elapsed", pluginName, interval)
send = true
}
//if we hit timer because it was set low to honor count, check if we should trigger
if watchTime == DefaultEmptyTicker && watchTime != interval && interval != 0 {
if lastSend.Add(interval).Before(time.Now()) {
log.Tracef("sending alerts to %s, duration %s elapsed", pluginName, interval)
send = true
lastSend = time.Now()
}
}
if send {
log.Tracef("sending alerts to %s", pluginName)
pw.PluginEvents <- pluginName
}
case <-pw.tomb.Dying():
ticker.Stop()
return
@ -62,12 +148,10 @@ func (pw *PluginWatcher) watchPluginAlertCounts() {
for {
select {
case pluginName := <-pw.Inserts:
if threshold := pw.PluginConfigByName[pluginName].GroupThreshold; threshold > 0 {
pw.AlertCountByPluginName[pluginName]++
if pw.AlertCountByPluginName[pluginName] >= threshold {
pw.PluginEvents <- pluginName
pw.AlertCountByPluginName[pluginName] = 0
}
//we only "count" pending alerts, and watchPluginTicker is actually going to send it
if _, ok := pw.PluginConfigByName[pluginName]; ok {
curr, _ := pw.AlertCountByPluginName.Get(pluginName)
pw.AlertCountByPluginName.Set(pluginName, curr+1)
}
case <-pw.tomb.Dying():
return

View file

@ -21,9 +21,11 @@ func resetTestTomb(testTomb *tomb.Tomb) {
}
func resetWatcherAlertCounter(pw *PluginWatcher) {
for k := range pw.AlertCountByPluginName {
pw.AlertCountByPluginName[k] = 0
pw.AlertCountByPluginName.Lock()
for k := range pw.AlertCountByPluginName.data {
pw.AlertCountByPluginName.data[k] = 0
}
pw.AlertCountByPluginName.Unlock()
}
func insertNAlertsToPlugin(pw *PluginWatcher, n int, pluginName string) {
@ -34,7 +36,8 @@ func insertNAlertsToPlugin(pw *PluginWatcher, n int, pluginName string) {
func listenChannelWithTimeout(ctx context.Context, channel chan string) error {
select {
case <-channel:
case x := <-channel:
log.Printf("received -> %v", x)
case <-ctx.Done():
return ctx.Err()
}

View file

@ -248,7 +248,7 @@ func UtcNow() time.Time {
func GetLineCountForFile(filepath string) int {
f, err := os.Open(filepath)
if err != nil {
log.Fatalf("unable to open log file %s", filepath)
log.Fatalf("unable to open log file %s : %s", filepath, err)
}
defer f.Close()
lc := 0

View file

@ -54,6 +54,7 @@ setup() {
run -0 cscli decisions add --ip 1.2.3.5 --duration 30s
assert_output --partial 'Decision successfully added'
sleep 5
}
@test "$FILE expected 1 log line from http server" {

View file

@ -21,6 +21,7 @@ class RequestHandler(BaseHTTPRequestHandler):
self.send_header('Content-type','application/json')
self.end_headers()
self.wfile.write(json.dumps({}).encode())
self.wfile.flush()
return
def log_message(self, format, *args):