From 7d0f89df29b9d4a9b45742adfeb5dd4d73d7122a Mon Sep 17 00:00:00 2001 From: Manuel Sabban Date: Tue, 30 Aug 2022 15:45:52 +0200 Subject: [PATCH] Implement reinject command to send notifications of alerts (#1638) * implement reinject command to send notifications of alerts using a profile Co-authored-by: sabban <15465465+sabban@users.noreply.github.com> --- cmd/crowdsec-cli/notifications.go | 184 +++++++++++++++++++++++++++--- pkg/csplugin/broker.go | 32 +++++- pkg/csplugin/watcher.go | 3 + pkg/csplugin/watcher_test.go | 10 +- 4 files changed, 200 insertions(+), 29 deletions(-) diff --git a/cmd/crowdsec-cli/notifications.go b/cmd/crowdsec-cli/notifications.go index bd0a0e0d3..37b4d4ac4 100644 --- a/cmd/crowdsec-cli/notifications.go +++ b/cmd/crowdsec-cli/notifications.go @@ -1,25 +1,35 @@ package main import ( + "context" "encoding/csv" "encoding/json" "fmt" "io/fs" + "net/url" "os" "path/filepath" + "strconv" "strings" + "time" + "github.com/crowdsecurity/crowdsec/pkg/apiclient" "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/csplugin" + "github.com/crowdsecurity/crowdsec/pkg/csprofiles" + "github.com/crowdsecurity/crowdsec/pkg/cwversion" + "github.com/go-openapi/strfmt" "github.com/olekukonko/tablewriter" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "gopkg.in/tomb.v2" ) type NotificationsCfg struct { Config csplugin.PluginConfig `json:"plugin_config"` Profiles []*csconfig.ProfileCfg `json:"associated_profiles"` + ids []uint } func NewNotificationsCmd() *cobra.Command { @@ -50,8 +60,12 @@ func NewNotificationsCmd() *cobra.Command { Example: `cscli notifications list`, Args: cobra.ExactArgs(0), DisableAutoGenTag: true, - Run: func(cmd *cobra.Command, arg []string) { - ncfgs := getNotificationsConfiguration() + RunE: func(cmd *cobra.Command, arg []string) error { + ncfgs, err := getNotificationsConfiguration() + if err != nil { + return errors.Wrap(err, "Can't build profiles configuration") + } + if csConfig.Cscli.Output == "human" { table := tablewriter.NewWriter(os.Stdout) table.SetCenterSeparator("") @@ -72,14 +86,14 @@ func NewNotificationsCmd() *cobra.Command { } else if csConfig.Cscli.Output == "json" { x, err := json.MarshalIndent(ncfgs, "", " ") if err != nil { - log.Fatalf("failed to marshal notification configuration") + return errors.New("failed to marshal notification configuration") } fmt.Printf("%s", string(x)) } else if csConfig.Cscli.Output == "raw" { csvwriter := csv.NewWriter(os.Stdout) err := csvwriter.Write([]string{"Name", "Type", "Profile name"}) if err != nil { - log.Fatalf("failed to write raw header: %s", err) + return errors.Wrap(err, "failed to write raw header") } for _, b := range ncfgs { profilesList := []string{} @@ -88,11 +102,12 @@ func NewNotificationsCmd() *cobra.Command { } err := csvwriter.Write([]string{b.Config.Name, b.Config.Type, strings.Join(profilesList, ", ")}) if err != nil { - log.Fatalf("failed to write raw content: %s", err) + return errors.Wrap(err, "failed to write raw content") } } csvwriter.Flush() } + return nil }, } cmdNotifications.AddCommand(cmdNotificationsList) @@ -104,7 +119,7 @@ func NewNotificationsCmd() *cobra.Command { Example: `cscli notifications inspect `, Args: cobra.ExactArgs(1), DisableAutoGenTag: true, - Run: func(cmd *cobra.Command, arg []string) { + RunE: func(cmd *cobra.Command, arg []string) error { var ( cfg NotificationsCfg ok bool @@ -113,11 +128,14 @@ func NewNotificationsCmd() *cobra.Command { pluginName := arg[0] if pluginName == "" { - log.Fatalf("Please provide a plugin name to inspect") + errors.New("Please provide a plugin name to inspect") + } + ncfgs, err := getNotificationsConfiguration() + if err != nil { + return errors.Wrap(err, "Can't build profiles configuration") } - ncfgs := getNotificationsConfiguration() if cfg, ok = ncfgs[pluginName]; !ok { - log.Fatalf("The provided plugin name doesn't exist or isn't active") + return errors.New("The provided plugin name doesn't exist or isn't active") } if csConfig.Cscli.Output == "human" || csConfig.Cscli.Output == "raw" { @@ -131,17 +149,140 @@ func NewNotificationsCmd() *cobra.Command { } else if csConfig.Cscli.Output == "json" { x, err := json.MarshalIndent(cfg, "", " ") if err != nil { - log.Fatalf("failed to marshal notification configuration") + return errors.New("failed to marshal notification configuration") } fmt.Printf("%s", string(x)) } + return nil }, } cmdNotifications.AddCommand(cmdNotificationsInspect) + var remediation bool + var alertOverride string + var cmdNotificationsReinject = &cobra.Command{ + Use: "reinject", + Short: "reinject alert into notifications system", + Long: `Reinject alert into notifications system`, + Example: ` +cscli notifications reinject +cscli notifications reinject --remediation +cscli notifications reinject -a '{"remediation": true,"scenario":"notification/test"}' +`, + Args: cobra.ExactArgs(1), + DisableAutoGenTag: true, + RunE: func(cmd *cobra.Command, args []string) error { + var ( + pluginBroker csplugin.PluginBroker + pluginTomb tomb.Tomb + ) + if len(args) != 1 { + printHelp(cmd) + return errors.New("Wrong number of argument: there should be one argument") + } + + //first: get the alert + id, err := strconv.Atoi(args[0]) + if err != nil { + return errors.New(fmt.Sprintf("bad alert id %s", args[0])) + } + if err := csConfig.LoadAPIClient(); err != nil { + return errors.Wrapf(err, "loading api client") + } + if csConfig.API.Client == nil { + return errors.New("There is no configuration on 'api_client:'") + } + if csConfig.API.Client.Credentials == nil { + return errors.New(fmt.Sprintf("Please provide credentials for the API in '%s'", csConfig.API.Client.CredentialsFilePath)) + } + apiURL, err := url.Parse(csConfig.API.Client.Credentials.URL) + if err != nil { + return errors.Wrapf(err, "error parsing the URL of the API") + } + client, err := apiclient.NewClient(&apiclient.Config{ + MachineID: csConfig.API.Client.Credentials.Login, + Password: strfmt.Password(csConfig.API.Client.Credentials.Password), + UserAgent: fmt.Sprintf("crowdsec/%s", cwversion.VersionStr()), + URL: apiURL, + VersionPrefix: "v1", + }) + if err != nil { + return errors.Wrapf(err, "error creating the client for the API") + } + alert, _, err := client.Alerts.GetByID(context.Background(), id) + if err != nil { + return errors.Wrapf(err, fmt.Sprintf("can't find alert with id %s", args[0])) + } + + if alertOverride != "" { + if err = json.Unmarshal([]byte(alertOverride), alert); err != nil { + return errors.Wrapf(err, "Can't unmarshal the data given in the alert flag") + } + } + if !remediation { + alert.Remediation = true + } + + // second we start plugins + err = pluginBroker.Init(csConfig.PluginConfig, csConfig.API.Server.Profiles, csConfig.ConfigPaths) + if err != nil { + return errors.Wrapf(err, "Can't initialize plugins") + } + + pluginTomb.Go(func() error { + pluginBroker.Run(&pluginTomb) + return nil + }) + + //third: get the profile(s), and process the whole stuff + + profiles, err := csprofiles.NewProfile(csConfig.API.Server.Profiles) + if err != nil { + return errors.Wrap(err, "Cannot extract profiles from configuration") + } + + for id, profile := range profiles { + _, matched, err := profile.EvaluateProfile(alert) + if err != nil { + return errors.Wrapf(err, "can't evaluate profile %s", profile.Cfg.Name) + } + if !matched { + log.Infof("The profile %s didn't match", profile.Cfg.Name) + continue + } + log.Infof("The profile %s matched, sending to its configured notification plugins", profile.Cfg.Name) + loop: + for { + select { + case pluginBroker.PluginChannel <- csplugin.ProfileAlert{ + ProfileID: uint(id), + Alert: alert, + }: + break loop + default: + time.Sleep(50 * time.Millisecond) + log.Info("sleeping\n") + + } + } + if profile.Cfg.OnSuccess == "break" { + log.Infof("The profile %s contains a 'on_success: break' so bailing out", profile.Cfg.Name) + break + } + } + + // time.Sleep(2 * time.Second) // There's no mechanism to ensure notification has been sent + pluginTomb.Kill(errors.New("terminating")) + pluginTomb.Wait() + return nil + }, + } + cmdNotificationsReinject.Flags().BoolVarP(&remediation, "remediation", "r", false, "Set Alert.Remediation to false in the reinjected alert (see your profile filter configuration)") + cmdNotificationsReinject.Flags().StringVarP(&alertOverride, "alert", "a", "", "JSON string used to override alert fields in the reinjected alert (see crowdsec/pkg/models/alert.go in the source tree for the full definition of the object)") + cmdNotifications.AddCommand(cmdNotificationsReinject) return cmdNotifications } -func getNotificationsConfiguration() map[string]NotificationsCfg { +func getNotificationsConfiguration() (map[string]NotificationsCfg, error) { pcfgs := map[string]csplugin.PluginConfig{} wf := func(path string, info fs.FileInfo, err error) error { if info == nil { @@ -161,38 +302,45 @@ func getNotificationsConfiguration() map[string]NotificationsCfg { } if err := filepath.Walk(csConfig.ConfigPaths.NotificationDir, wf); err != nil { - log.Fatalf("Loading notifification plugin configuration: %s", err) + return nil, errors.Wrap(err, "Loading notifification plugin configuration") } // A bit of a tricky stuf now: reconcile profiles and notification plugins ncfgs := map[string]NotificationsCfg{} - for _, profile := range csConfig.API.Server.Profiles { + profiles, err := csprofiles.NewProfile(csConfig.API.Server.Profiles) + if err != nil { + return nil, errors.Wrap(err, "Cannot extract profiles from configuration") + } + for profileID, profile := range profiles { loop: - for _, notif := range profile.Notifications { + for _, notif := range profile.Cfg.Notifications { for name, pc := range pcfgs { if notif == name { if _, ok := ncfgs[pc.Name]; !ok { ncfgs[pc.Name] = NotificationsCfg{ Config: pc, - Profiles: []*csconfig.ProfileCfg{profile}, + Profiles: []*csconfig.ProfileCfg{profile.Cfg}, + ids: []uint{uint(profileID)}, } continue loop } tmp := ncfgs[pc.Name] for _, pr := range tmp.Profiles { var profiles []*csconfig.ProfileCfg - if pr.Name == profile.Name { + if pr.Name == profile.Cfg.Name { continue } - profiles = append(tmp.Profiles, profile) + profiles = append(tmp.Profiles, profile.Cfg) + ids := append(tmp.ids, uint(profileID)) ncfgs[pc.Name] = NotificationsCfg{ Config: tmp.Config, Profiles: profiles, + ids: ids, } } } } } } - return ncfgs + return ncfgs, nil } diff --git a/pkg/csplugin/broker.go b/pkg/csplugin/broker.go index 0fa1e8e21..a4fd34ec9 100644 --- a/pkg/csplugin/broker.go +++ b/pkg/csplugin/broker.go @@ -96,9 +96,10 @@ func (pb *PluginBroker) Kill() { } } -func (pb *PluginBroker) Run(tomb *tomb.Tomb) { +func (pb *PluginBroker) Run(pluginTomb *tomb.Tomb) { //we get signaled via the channel when notifications need to be delivered to plugin (via the watcher) - pb.watcher.Start(tomb) + pb.watcher.Start(&tomb.Tomb{}) +loop: for { select { case profileAlert := <-pb.PluginChannel: @@ -117,13 +118,32 @@ func (pb *PluginBroker) Run(tomb *tomb.Tomb) { } }() - case <-tomb.Dying(): - log.Info("killing all plugins") - pb.Kill() - return + case <-pluginTomb.Dying(): + log.Infof("plugingTomb dying") + pb.watcher.tomb.Kill(errors.New("Terminating")) + for { + select { + case <-pb.watcher.tomb.Dead(): + log.Info("killing all plugins") + pb.Kill() + break loop + case pluginName := <-pb.watcher.PluginEvents: + // this can be ran in goroutine, but then locks will be needed + pluginMutex.Lock() + log.Tracef("going to deliver %d alerts to plugin %s", len(pb.alertsByPluginName[pluginName]), pluginName) + tmpAlerts := pb.alertsByPluginName[pluginName] + pb.alertsByPluginName[pluginName] = make([]*models.Alert, 0) + pluginMutex.Unlock() + + if err := pb.pushNotificationsToPlugin(pluginName, tmpAlerts); err != nil { + log.WithField("plugin:", pluginName).Error(err) + } + } + } } } } + func (pb *PluginBroker) addProfileAlert(profileAlert ProfileAlert) { for _, pluginName := range pb.profileConfigs[profileAlert.ProfileID].Notifications { if _, ok := pb.pluginConfigByName[pluginName]; !ok { diff --git a/pkg/csplugin/watcher.go b/pkg/csplugin/watcher.go index 486fc9326..a95ade5f7 100644 --- a/pkg/csplugin/watcher.go +++ b/pkg/csplugin/watcher.go @@ -139,6 +139,9 @@ func (pw *PluginWatcher) watchPluginTicker(pluginName string) { } case <-pw.tomb.Dying(): ticker.Stop() + // emptying + // no lock here because we have the broker still listening even in dying state before killing us + pw.PluginEvents <- pluginName return } } diff --git a/pkg/csplugin/watcher_test.go b/pkg/csplugin/watcher_test.go index ba3c93d69..94d8d0617 100644 --- a/pkg/csplugin/watcher_test.go +++ b/pkg/csplugin/watcher_test.go @@ -14,8 +14,9 @@ import ( var ctx = context.Background() -func resetTestTomb(testTomb *tomb.Tomb) { +func resetTestTomb(testTomb *tomb.Tomb, pw *PluginWatcher) { testTomb.Kill(nil) + <-pw.PluginEvents if err := testTomb.Wait(); err != nil { log.Fatal(err) } @@ -64,8 +65,7 @@ func TestPluginWatcherInterval(t *testing.T) { defer cancel() err := listenChannelWithTimeout(ct, pw.PluginEvents) assert.ErrorContains(t, err, "context deadline exceeded") - - resetTestTomb(&testTomb) + resetTestTomb(&testTomb, &pw) testTomb = tomb.Tomb{} pw.Start(&testTomb) @@ -73,7 +73,7 @@ func TestPluginWatcherInterval(t *testing.T) { defer cancel() err = listenChannelWithTimeout(ct, pw.PluginEvents) assert.NilError(t, err) - resetTestTomb(&testTomb) + resetTestTomb(&testTomb, &pw) // This is to avoid the int complaining } @@ -113,5 +113,5 @@ func TestPluginAlertCountWatcher(t *testing.T) { defer cancel() err = listenChannelWithTimeout(ct, pw.PluginEvents) assert.NilError(t, err) - resetTestTomb(&testTomb) + resetTestTomb(&testTomb, &pw) }