From 88d06260d76b9f7dd8226bc8c7fdaf95463ee04f Mon Sep 17 00:00:00 2001 From: AlteredCoder <64792091+AlteredCoder@users.noreply.github.com> Date: Wed, 15 Dec 2021 11:39:37 +0100 Subject: [PATCH] add cscli decisions import (#1038) * add cscli decisions import Co-authored-by: Sebastien Blot Co-authored-by: bui --- cmd/crowdsec-cli/decisions.go | 160 ++++++++++++++++++++++++- go.mod | 1 + go.sum | 2 + pkg/apiserver/controllers/v1/alerts.go | 12 +- pkg/database/alerts.go | 92 +++++++++++--- pkg/database/database.go | 9 +- 6 files changed, 254 insertions(+), 22 deletions(-) diff --git a/cmd/crowdsec-cli/decisions.go b/cmd/crowdsec-cli/decisions.go index 59c5aaa45..5f86c59e8 100644 --- a/cmd/crowdsec-cli/decisions.go +++ b/cmd/crowdsec-cli/decisions.go @@ -6,6 +6,7 @@ import ( "fmt" "net/url" "os" + "path/filepath" "strconv" "strings" "time" @@ -15,6 +16,7 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/models" "github.com/crowdsecurity/crowdsec/pkg/types" "github.com/go-openapi/strfmt" + "github.com/jszwec/csvutil" "github.com/olekukonko/tablewriter" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -22,6 +24,13 @@ import ( var Client *apiclient.ApiClient +var ( + defaultDuration = "4h" + defaultScope = "ip" + defaultType = "ban" + defaultReason = "manual" +) + func DecisionsToTable(alerts *models.GetAlertsResponse) error { /*here we cheat a bit : to make it more readable for the user, we dedup some entries*/ var spamLimit map[string]bool = make(map[string]bool) @@ -100,7 +109,7 @@ func NewDecisionsCmd() *cobra.Command { var cmdDecisions = &cobra.Command{ Use: "decisions [action]", Short: "Manage decisions", - Long: `Add/List/Delete decisions from LAPI`, + Long: `Add/List/Delete/Import decisions from LAPI`, Example: `cscli decisions [action] [filter]`, /*TBD example*/ Args: cobra.MinimumNArgs(1), @@ -305,7 +314,6 @@ cscli decisions add --scope username --value foobar if addReason == "" { addReason = fmt.Sprintf("manual '%s' from '%s'", addType, csConfig.API.Client.Credentials.Login) } - decision := models.Decision{ Duration: &addDuration, Scope: &addScope, @@ -446,5 +454,153 @@ cscli decisions delete --type captcha cmdDecisions.AddCommand(cmdDecisionsDelete) + var ( + importDuration string + importScope string + importReason string + importType string + importFile string + ) + + var cmdDecisionImport = &cobra.Command{ + Use: "import [options]", + Short: "Import decisions from json or csv file", + Long: "expected format :\n" + + "csv : any of duration,origin,reason,scope,type,value, with a header line\n" + + `json : {"duration" : "24h", "origin" : "my-list", "reason" : "my_scenario", "scope" : "ip", "type" : "ban", "value" : "x.y.z.z"}`, + DisableAutoGenTag: true, + Example: `decisions.csv : +duration,scope,value +24h,ip,1.2.3.4 + +cscsli decisions import -i decisions.csv + +decisions.json : +[{"duration" : "4h", "scope" : "ip", "type" : "ban", "value" : "1.2.3.4"}] +`, + Run: func(cmd *cobra.Command, args []string) { + if importFile == "" { + log.Fatalf("Please provide a input file contaning decisions with -i flag") + } + csvData, err := os.ReadFile(importFile) + if err != nil { + log.Fatalf("unable to open '%s': %s", importFile, err) + } + type decisionRaw struct { + Duration string `csv:"duration,omitempty" json:"duration,omitempty"` + Origin string `csv:"origin,omitempty" json:"origin,omitempty"` + Scenario string `csv:"reason,omitempty" json:"reason,omitempty"` + Scope string `csv:"scope,omitempty" json:"scope,omitempty"` + Type string `csv:"type,omitempty" json:"type,omitempty"` + Value string `csv:"value" json:"value"` + } + var decisionsListRaw []decisionRaw + switch fileFormat := filepath.Ext(importFile); fileFormat { + case ".json": + if err := json.Unmarshal(csvData, &decisionsListRaw); err != nil { + log.Fatalf("unable to unmarshall json: '%s'", err) + } + case ".csv": + if err := csvutil.Unmarshal(csvData, &decisionsListRaw); err != nil { + log.Fatalf("unable to unmarshall csv: '%s'", err) + } + default: + log.Fatalf("file format not supported for '%s'. supported format are 'json' and 'csv'", importFile) + } + + decisionsList := make([]*models.Decision, 0) + for i, decisionLine := range decisionsListRaw { + line := i + 2 + if decisionLine.Value == "" { + log.Fatalf("please provide a 'value' in your csv line %d", line) + } + /*deal with defaults and cli-override*/ + if decisionLine.Duration == "" { + decisionLine.Duration = defaultDuration + log.Debugf("No 'duration' line %d, using default value: '%s'", line, defaultDuration) + } + if importDuration != "" { + decisionLine.Duration = importDuration + log.Debugf("'duration' line %d, using supplied value: '%s'", line, importDuration) + } + decisionLine.Origin = "cscli-import" + + if decisionLine.Scenario == "" { + decisionLine.Scenario = defaultReason + log.Debugf("No 'reason' line %d, using value: '%s'", line, decisionLine.Scenario) + } + if importReason != "" { + decisionLine.Scenario = importReason + log.Debugf("No 'reason' line %d, using supplied value: '%s'", line, importReason) + } + if decisionLine.Type == "" { + decisionLine.Type = defaultType + log.Debugf("No 'type' line %d, using default value: '%s'", line, decisionLine.Type) + } + if importType != "" { + decisionLine.Type = importType + log.Debugf("'type' line %d, using supplied value: '%s'", line, importType) + } + if decisionLine.Scope == "" { + decisionLine.Scope = defaultScope + log.Debugf("No 'scope' line %d, using default value: '%s'", line, decisionLine.Scope) + } + if importScope != "" { + decisionLine.Scope = importScope + log.Debugf("'scope' line %d, using supplied value: '%s'", line, importScope) + } + decision := models.Decision{ + Value: types.StrPtr(decisionLine.Value), + Duration: types.StrPtr(decisionLine.Duration), + Origin: types.StrPtr(decisionLine.Origin), + Scenario: types.StrPtr(decisionLine.Scenario), + Type: types.StrPtr(decisionLine.Type), + Scope: types.StrPtr(decisionLine.Scope), + Simulated: new(bool), + } + decisionsList = append(decisionsList, &decision) + } + alerts := models.AddAlertsRequest{} + importAlert := models.Alert{ + CreatedAt: time.Now().Format(time.RFC3339), + Scenario: types.StrPtr(fmt.Sprintf("add: %d IPs", len(decisionsList))), + Message: types.StrPtr(""), + Events: []*models.Event{}, + Source: &models.Source{ + Scope: types.StrPtr("cscli/manual-import"), + Value: types.StrPtr(""), + }, + StartAt: types.StrPtr(time.Now().Format(time.RFC3339)), + StopAt: types.StrPtr(time.Now().Format(time.RFC3339)), + Capacity: types.Int32Ptr(0), + Simulated: types.BoolPtr(false), + EventsCount: types.Int32Ptr(int32(len(decisionsList))), + Leakspeed: types.StrPtr(""), + ScenarioHash: types.StrPtr(""), + ScenarioVersion: types.StrPtr(""), + Decisions: decisionsList, + } + alerts = append(alerts, &importAlert) + + if len(decisionsList) > 1000 { + log.Infof("You are about to add %d decisions, this may take a while", len(decisionsList)) + } + + _, _, err = Client.Alerts.Add(context.Background(), alerts) + if err != nil { + log.Fatalf(err.Error()) + } + log.Infof("%d decisions successfully imported", len(decisionsList)) + }, + } + + cmdDecisionImport.Flags().SortFlags = false + cmdDecisionImport.Flags().StringVarP(&importFile, "input", "i", "", "Input file") + cmdDecisionImport.Flags().StringVarP(&importDuration, "duration", "d", "", "Decision duration (ie. 1h,4h,30m)") + cmdDecisionImport.Flags().StringVar(&importScope, "scope", types.Ip, "Decision scope (ie. ip,range,username)") + cmdDecisionImport.Flags().StringVarP(&importReason, "reason", "R", "", "Decision reason (ie. scenario-name)") + cmdDecisionImport.Flags().StringVarP(&importType, "type", "t", "", "Decision type (ie. ban,captcha,throttle)") + cmdDecisions.AddCommand(cmdDecisionImport) + return cmdDecisions } diff --git a/go.mod b/go.mod index 03c4c5fc8..10e610ec1 100644 --- a/go.mod +++ b/go.mod @@ -44,6 +44,7 @@ require ( github.com/huandu/xstrings v1.3.2 // indirect github.com/imdario/mergo v0.3.12 // indirect github.com/influxdata/go-syslog/v3 v3.0.0 + github.com/jszwec/csvutil v1.5.1 // indirect github.com/leodido/go-urn v1.2.1 // indirect github.com/lib/pq v1.10.2 github.com/mattn/go-runewidth v0.0.10 // indirect diff --git a/go.sum b/go.sum index 1c0c276eb..ad8a81aaa 100644 --- a/go.sum +++ b/go.sum @@ -436,6 +436,8 @@ github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/u github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= +github.com/jszwec/csvutil v1.5.1 h1:c3GFBhj6DFMUl4dMK3+B6rz2+LWWS/e9VJiVJ9t9kfQ= +github.com/jszwec/csvutil v1.5.1/go.mod h1:Rpu7Uu9giO9subDyMCIQfHVDuLrcaC36UA4YcJjGBkg= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= diff --git a/pkg/apiserver/controllers/v1/alerts.go b/pkg/apiserver/controllers/v1/alerts.go index 963b61712..f3999d28a 100644 --- a/pkg/apiserver/controllers/v1/alerts.go +++ b/pkg/apiserver/controllers/v1/alerts.go @@ -125,7 +125,7 @@ func (c *Controller) CreateAlert(gctx *gin.Context) { c.HandleDBErrors(gctx, err) return } - + stopFlush := false for _, alert := range input { alert.MachineID = machineID if len(alert.Decisions) != 0 { @@ -143,6 +143,10 @@ func (c *Controller) CreateAlert(gctx *gin.Context) { break } } + decision := alert.Decisions[0] + if decision.Origin != nil && *decision.Origin == "cscli-import" { + stopFlush = true + } continue } @@ -164,7 +168,13 @@ func (c *Controller) CreateAlert(gctx *gin.Context) { } } + if stopFlush { + c.DBClient.CanFlush = false + } + alerts, err := c.DBClient.CreateAlert(machineID, input) + c.DBClient.CanFlush = true + if err != nil { c.HandleDBErrors(gctx, err) return diff --git a/pkg/database/alerts.go b/pkg/database/alerts.go index f77531df1..c8f61242d 100644 --- a/pkg/database/alerts.go +++ b/pkg/database/alerts.go @@ -20,9 +20,10 @@ import ( ) const ( - paginationSize = 100 // used to queryAlert to avoid 'too many SQL variable' - defaultLimit = 100 // default limit of element to returns when query alerts - bulkSize = 50 // bulk size when create alerts + paginationSize = 100 // used to queryAlert to avoid 'too many SQL variable' + defaultLimit = 100 // default limit of element to returns when query alerts + bulkSize = 50 // bulk size when create alerts + decisionBulkSize = 50 ) func formatAlertAsString(machineId string, alert *models.Alert) []string { @@ -117,7 +118,6 @@ func (c *Client) CreateAlert(machineID string, alertList []*models.Alert) ([]str /*We can't bulk both the alert and the decision at the same time. With new consensus, we want to bulk a single alert with a lot of decisions.*/ func (c *Client) UpdateCommunityBlocklist(alertItem *models.Alert) (int, int, int, error) { - decisionBulkSize := 50 var err error var deleted, inserted int @@ -278,6 +278,23 @@ func (c *Client) UpdateCommunityBlocklist(alertItem *models.Alert) (int, int, in return alertRef.ID, inserted, deleted, nil } +func chunkDecisions(decisions []*ent.Decision, chunkSize int) [][]*ent.Decision { + var ret [][]*ent.Decision + var chunk []*ent.Decision + + for _, d := range decisions { + chunk = append(chunk, d) + if len(chunk) == chunkSize { + ret = append(ret, chunk) + chunk = nil + } + } + if len(chunk) > 0 { + ret = append(ret, chunk) + } + return ret +} + func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([]string, error) { ret := []string{} @@ -285,6 +302,7 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([ c.Log.Debugf("writting %d items", len(alertList)) bulk := make([]*ent.AlertCreate, 0, bulkSize) + alertDecisions := make([][]*ent.Decision, 0, bulkSize) for i, alertItem := range alertList { var decisions []*ent.Decision var metas []*ent.Meta @@ -394,8 +412,10 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([ c.Log.Errorf("While parsing StartAt of item %s : %s", *alertItem.StopAt, err) ts = time.Now() } + + decisions = make([]*ent.Decision, 0) if len(alertItem.Decisions) > 0 { - decisionBulk := make([]*ent.DecisionCreate, len(alertItem.Decisions)) + decisionBulk := make([]*ent.DecisionCreate, 0, decisionBulkSize) for i, decisionItem := range alertItem.Decisions { var start_ip, start_sfx, end_ip, end_sfx int64 var sz int @@ -412,7 +432,8 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([ return []string{}, errors.Wrapf(ParseDurationFail, "invalid addr/range %s : %s", *decisionItem.Value, err) } } - decisionBulk[i] = c.Ent.Decision.Create(). + + decisionCreate := c.Ent.Decision.Create(). SetUntil(ts.Add(duration)). SetScenario(*decisionItem.Scenario). SetType(*decisionItem.Type). @@ -425,12 +446,27 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([ SetScope(*decisionItem.Scope). SetOrigin(*decisionItem.Origin). SetSimulated(*alertItem.Simulated) + + decisionBulk = append(decisionBulk, decisionCreate) + if len(decisionBulk) == decisionBulkSize { + decisionsCreateRet, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX) + if err != nil { + return []string{}, errors.Wrapf(BulkError, "creating alert decisions: %s", err) + + } + decisions = append(decisions, decisionsCreateRet...) + if len(alertItem.Decisions)-i <= decisionBulkSize { + decisionBulk = make([]*ent.DecisionCreate, 0, (len(alertItem.Decisions) - i)) + } else { + decisionBulk = make([]*ent.DecisionCreate, 0, decisionBulkSize) + } + } } - decisions, err = c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX) + decisionsCreateRet, err := c.Ent.Decision.CreateBulk(decisionBulk...).Save(c.CTX) if err != nil { return []string{}, errors.Wrapf(BulkError, "creating alert decisions: %s", err) - } + decisions = append(decisions, decisionsCreateRet...) } alertB := c.Ent.Alert. @@ -454,7 +490,6 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([ SetSimulated(*alertItem.Simulated). SetScenarioVersion(*alertItem.ScenarioVersion). SetScenarioHash(*alertItem.ScenarioHash). - AddDecisions(decisions...). AddEvents(events...). AddMetas(metas...) @@ -462,20 +497,31 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([ alertB.SetOwner(owner) } bulk = append(bulk, alertB) + alertDecisions = append(alertDecisions, decisions) if len(bulk) == bulkSize { alerts, err := c.Ent.Alert.CreateBulk(bulk...).Save(c.CTX) if err != nil { return []string{}, errors.Wrapf(BulkError, "bulk creating alert : %s", err) } - for _, alert := range alerts { - ret = append(ret, strconv.Itoa(alert.ID)) + for _, a := range alerts { + ret = append(ret, strconv.Itoa(a.ID)) + for _, d := range alertDecisions { + decisionsChunk := chunkDecisions(d, bulkSize) + for _, d2 := range decisionsChunk { + _, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(c.CTX) + if err != nil { + return []string{}, fmt.Errorf("error while updating decisions: %s", err.Error()) + } + } + } } - if len(alertList)-i <= bulkSize { bulk = make([]*ent.AlertCreate, 0, (len(alertList) - i)) + alertDecisions = make([][]*ent.Decision, 0, (len(alertList) - i)) } else { bulk = make([]*ent.AlertCreate, 0, bulkSize) + alertDecisions = make([][]*ent.Decision, 0, bulkSize) } } } @@ -485,8 +531,17 @@ func (c *Client) CreateAlertBulk(machineId string, alertList []*models.Alert) ([ return []string{}, errors.Wrapf(BulkError, "leftovers creating alert : %s", err) } - for _, alert := range alerts { - ret = append(ret, strconv.Itoa(alert.ID)) + for _, a := range alerts { + ret = append(ret, strconv.Itoa(a.ID)) + for _, d := range alertDecisions { + decisionsChunk := chunkDecisions(d, bulkSize) + for _, d2 := range decisionsChunk { + _, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(c.CTX) + if err != nil { + return []string{}, fmt.Errorf("error while updating decisions: %s", err.Error()) + } + } + } } return ret, nil @@ -812,7 +867,9 @@ func (c *Client) FlushOrphans() { c.Log.Infof("%d deleted orphan events", events_count) } - events_count, err = c.Ent.Decision.Delete().Where(decision.Not(decision.HasOwner())).Exec(c.CTX) + events_count, err = c.Ent.Decision.Delete().Where( + decision.Not(decision.HasOwner())).Where(decision.UntilLTE(time.Now())).Exec(c.CTX) + if err != nil { c.Log.Warningf("error while deleting orphan decisions : %s", err) return @@ -828,6 +885,11 @@ func (c *Client) FlushAlerts(MaxAge string, MaxItems int) error { var totalAlerts int var err error + if !c.CanFlush { + c.Log.Debug("a list is being imported, flushing later") + return nil + } + c.Log.Debug("Flushing orphan alerts") c.FlushOrphans() c.Log.Debug("Done flushing orphan alerts") diff --git a/pkg/database/database.go b/pkg/database/database.go index a28a8fde4..3da2a9c65 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -18,9 +18,10 @@ import ( ) type Client struct { - Ent *ent.Client - CTX context.Context - Log *log.Logger + Ent *ent.Client + CTX context.Context + Log *log.Logger + CanFlush bool } func NewClient(config *csconfig.DatabaseCfg) (*Client, error) { @@ -82,7 +83,7 @@ func NewClient(config *csconfig.DatabaseCfg) (*Client, error) { if err = client.Schema.Create(context.Background()); err != nil { return nil, fmt.Errorf("failed creating schema resources: %v", err) } - return &Client{Ent: client, CTX: context.Background(), Log: clog}, nil + return &Client{Ent: client, CTX: context.Background(), Log: clog, CanFlush: true}, nil } func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Scheduler, error) {