diff --git a/cmd/crowdsec-cli/api.go b/cmd/crowdsec-cli/api.go index a11564275..81876e8a7 100644 --- a/cmd/crowdsec-cli/api.go +++ b/cmd/crowdsec-cli/api.go @@ -157,8 +157,9 @@ cscli api credentials # Display your API credentials outputConfig := outputs.OutputFactory{ BackendFolder: config.BackendPluginFolder, + Flush: false, } - outputCTX, err = outputs.NewOutput(&outputConfig, false) + outputCTX, err = outputs.NewOutput(&outputConfig) if err != nil { return err } diff --git a/cmd/crowdsec-cli/backup-restore.go b/cmd/crowdsec-cli/backup-restore.go index 747de7124..c9d1536a0 100644 --- a/cmd/crowdsec-cli/backup-restore.go +++ b/cmd/crowdsec-cli/backup-restore.go @@ -412,8 +412,9 @@ cscli backup restore ./my-backup`, outputConfig := outputs.OutputFactory{ BackendFolder: config.BackendPluginFolder, + Flush: false, } - outputCTX, err = outputs.NewOutput(&outputConfig, false) + outputCTX, err = outputs.NewOutput(&outputConfig) if err != nil { log.Fatalf("Failed to load output plugins : %v", err) } @@ -453,8 +454,9 @@ cscli backup restore ./my-backup`, outputConfig := outputs.OutputFactory{ BackendFolder: config.BackendPluginFolder, + Flush: false, } - outputCTX, err = outputs.NewOutput(&outputConfig, false) + outputCTX, err = outputs.NewOutput(&outputConfig) if err != nil { log.Fatalf("Failed to load output plugins : %v", err) } diff --git a/cmd/crowdsec-cli/ban.go b/cmd/crowdsec-cli/ban.go index 023e7808a..61aba814e 100644 --- a/cmd/crowdsec-cli/ban.go +++ b/cmd/crowdsec-cli/ban.go @@ -167,7 +167,7 @@ func BanAdd(target string, duration string, reason string, action string) error if err != nil { return err } - log.Infof("Wrote ban to database.") + log.Infof("%s %s for %s (%s)", action, target, duration, reason) return nil } @@ -188,9 +188,10 @@ You can add/delete/list or flush current bans in your local ban DB.`, outputConfig := outputs.OutputFactory{ BackendFolder: config.BackendPluginFolder, + Flush: false, } - outputCTX, err = outputs.NewOutput(&outputConfig, false) + outputCTX, err = outputs.NewOutput(&outputConfig) if err != nil { return fmt.Errorf(err.Error()) } @@ -220,9 +221,10 @@ cscli ban add range 1.2.3.0/24 24h "the whole range"`, Short: "Adds the specific ip to the ban db", Long: `Duration must be [time.ParseDuration](https://golang.org/pkg/time/#ParseDuration), expressed in s/m/h.`, Example: `cscli ban add ip 1.2.3.4 12h "the scan"`, - Args: cobra.ExactArgs(3), + Args: cobra.MinimumNArgs(3), Run: func(cmd *cobra.Command, args []string) { - if err := BanAdd(args[0], args[1], args[2], remediationType); err != nil { + reason := strings.Join(args[2:], " ") + if err := BanAdd(args[0], args[1], reason, remediationType); err != nil { log.Fatalf("failed to add ban to sqlite : %v", err) } }, @@ -233,9 +235,10 @@ cscli ban add range 1.2.3.0/24 24h "the whole range"`, Short: "Adds the specific ip to the ban db", Long: `Duration must be [time.ParseDuration](https://golang.org/pkg/time/#ParseDuration) compatible, expressed in s/m/h.`, Example: `cscli ban add range 1.2.3.0/24 12h "the whole range"`, - Args: cobra.ExactArgs(3), + Args: cobra.MinimumNArgs(3), Run: func(cmd *cobra.Command, args []string) { - if err := BanAdd(args[0], args[1], args[2], remediationType); err != nil { + reason := strings.Join(args[2:], " ") + if err := BanAdd(args[0], args[1], reason, remediationType); err != nil { log.Fatalf("failed to add ban to sqlite : %v", err) } }, diff --git a/cmd/crowdsec/main.go b/cmd/crowdsec/main.go index 89575984d..643e1027d 100644 --- a/cmd/crowdsec/main.go +++ b/cmd/crowdsec/main.go @@ -14,6 +14,7 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/outputs" "github.com/crowdsecurity/crowdsec/pkg/parser" "github.com/crowdsecurity/crowdsec/pkg/types" + "github.com/pkg/errors" "github.com/sevlyar/go-daemon" log "github.com/sirupsen/logrus" @@ -151,11 +152,22 @@ func LoadOutputs(cConfig *csconfig.CrowdSec) error { return fmt.Errorf("Failed to load output profiles : %v", err) } - OutputRunner, err = outputs.NewOutput(cConfig.OutputConfig, cConfig.Daemonize) + //If the user is providing a single file (ie forensic mode), don't flush expired records + if cConfig.SingleFile != "" { + log.Infof("forensic mode, disable flush") + cConfig.OutputConfig.Flush = false + } else { + cConfig.OutputConfig.Flush = true + } + OutputRunner, err = outputs.NewOutput(cConfig.OutputConfig) if err != nil { return fmt.Errorf("output plugins initialization error : %s", err.Error()) } + if err := OutputRunner.StartAutoCommit(); err != nil { + return errors.Wrap(err, "failed to start autocommit") + } + /* Init the API connector */ if cConfig.APIMode { log.Infof("Loading API client") @@ -277,7 +289,6 @@ func main() { if err := LoadBuckets(cConfig); err != nil { log.Fatalf("Failed to load scenarios: %s", err) - } if err := LoadOutputs(cConfig); err != nil { diff --git a/cmd/crowdsec/serve.go b/cmd/crowdsec/serve.go index ca3a5b095..bb1c61a73 100644 --- a/cmd/crowdsec/serve.go +++ b/cmd/crowdsec/serve.go @@ -129,8 +129,6 @@ func termHandler(sig os.Signal) error { } func serveOneTimeRun(outputRunner outputs.Output) error { - log.Infof("waiting for acquisition to finish") - if err := acquisTomb.Wait(); err != nil { log.Warningf("acquisition returned error : %s", err) } diff --git a/config/dev.yaml b/config/dev.yaml index e80ae4c56..ba6fbc50e 100644 --- a/config/dev.yaml +++ b/config/dev.yaml @@ -6,8 +6,12 @@ cscli_dir: "./config/crowdsec-cli" log_dir: "./logs" log_mode: "stdout" log_level: info +prometheus: true profiling: false -sqlite_path: "./test.db" apimode: false plugin: backend: "./config/plugins/backend" + max_records: 10000 + #30 days = 720 hours + max_records_age: 720h + \ No newline at end of file diff --git a/config/plugins/backend/sqlite.yaml b/config/plugins/backend/sqlite.yaml index b177b22a5..0d04e7664 100644 --- a/config/plugins/backend/sqlite.yaml +++ b/config/plugins/backend/sqlite.yaml @@ -2,4 +2,3 @@ name: sqlite path: /usr/local/lib/crowdsec/plugins/backend/sqlite.so config: db_path: /var/lib/crowdsec/data/crowdsec.db - flush: true diff --git a/config/prod.yaml b/config/prod.yaml index d2337c6e2..73c48f085 100644 --- a/config/prod.yaml +++ b/config/prod.yaml @@ -7,7 +7,6 @@ cscli_dir: ${CFG}/cscli log_mode: file log_level: info profiling: false -sqlite_path: ${DATA}/crowdsec.db apimode: true daemon: true prometheus: true @@ -15,3 +14,4 @@ prometheus: true http_listen: 127.0.0.1:6060 plugin: backend: "/etc/crowdsec/plugins/backend" + max_records_age: 720h diff --git a/config/user.yaml b/config/user.yaml index 8d3f83c6c..61631fa74 100644 --- a/config/user.yaml +++ b/config/user.yaml @@ -7,7 +7,6 @@ cscli_dir: ${CFG}/cscli log_mode: stdout log_level: info profiling: false -sqlite_path: ${DATA}/crowdsec.db apimode: false daemon: false prometheus: false diff --git a/pkg/csconfig/config.go b/pkg/csconfig/config.go index 66a6dbb36..823d79696 100644 --- a/pkg/csconfig/config.go +++ b/pkg/csconfig/config.go @@ -25,14 +25,13 @@ type CrowdSec struct { SingleFileLabel string //for forensic mode PIDFolder string `yaml:"pid_dir,omitempty"` LogFolder string `yaml:"log_dir,omitempty"` - LogMode string `yaml:"log_mode,omitempty"` //like file, syslog or stdout ? - LogLevel log.Level `yaml:"log_level,omitempty"` //trace,debug,info,warning,error - Daemonize bool `yaml:"daemon,omitempty"` //true -> go background - Profiling bool `yaml:"profiling,omitempty"` //true -> enable runtime profiling - SQLiteFile string `yaml:"sqlite_path,omitempty"` //path to sqlite output - APIMode bool `yaml:"apimode,omitempty"` //true -> enable api push - CsCliFolder string `yaml:"cscli_dir"` //cscli folder - NbParsers int `yaml:"parser_routines"` //the number of go routines to start for parsing + LogMode string `yaml:"log_mode,omitempty"` //like file, syslog or stdout ? + LogLevel log.Level `yaml:"log_level,omitempty"` //trace,debug,info,warning,error + Daemonize bool `yaml:"daemon,omitempty"` //true -> go background + Profiling bool `yaml:"profiling,omitempty"` //true -> enable runtime profiling + APIMode bool `yaml:"apimode,omitempty"` //true -> enable api push + CsCliFolder string `yaml:"cscli_dir"` //cscli folder + NbParsers int `yaml:"parser_routines"` //the number of go routines to start for parsing Linter bool Prometheus bool HTTPListen string `yaml:"http_listen,omitempty"` @@ -53,7 +52,6 @@ func NewCrowdSecConfig() *CrowdSec { PIDFolder: "/var/run/", LogFolder: "/var/log/", LogMode: "stdout", - SQLiteFile: "/var/lib/crowdsec/data/crowdsec.db", APIMode: false, NbParsers: 1, Prometheus: false, @@ -89,7 +87,6 @@ func (c *CrowdSec) GetOPT() error { printInfo := flag.Bool("info", false, "print info-level on stdout") printVersion := flag.Bool("version", false, "display version") APIMode := flag.Bool("api", false, "perform pushes to api") - SQLiteMode := flag.Bool("sqlite", true, "write overflows to sqlite") profileMode := flag.Bool("profile", false, "Enable performance profiling") catFile := flag.String("file", "", "Process a single file in time-machine") catFileType := flag.String("type", "", "Labels.type for file in time-machine") @@ -156,9 +153,6 @@ func (c *CrowdSec) GetOPT() error { if *printTrace { c.LogLevel = log.TraceLevel } - if !*SQLiteMode { - c.SQLiteFile = "" - } if *APIMode { c.APIMode = true } diff --git a/pkg/cwhub/hubMgmt.go b/pkg/cwhub/hubMgmt.go index 8fd822ebc..6321e78a0 100644 --- a/pkg/cwhub/hubMgmt.go +++ b/pkg/cwhub/hubMgmt.go @@ -123,7 +123,7 @@ func parser_visit(path string, f os.FileInfo, err error) error { subs := strings.Split(path, "/") - log.Debugf("path:%s, hubdir:%s, installdir:%s", path, Hubdir, Installdir) + log.Tracef("path:%s, hubdir:%s, installdir:%s", path, Hubdir, Installdir) /*we're in hub (~/.cscli/hub/)*/ if strings.HasPrefix(path, Hubdir) { inhub = true @@ -137,7 +137,7 @@ func parser_visit(path string, f os.FileInfo, err error) error { fauthor = subs[len(subs)-2] stage = subs[len(subs)-3] ftype = subs[len(subs)-4] - log.Debugf("HUBB check [%s] by [%s] in stage [%s] of type [%s]", fname, fauthor, stage, ftype) + log.Tracef("HUBB check [%s] by [%s] in stage [%s] of type [%s]", fname, fauthor, stage, ftype) } else if strings.HasPrefix(path, Installdir) { /*we're in install /etc/crowdsec//... */ if len(subs) < 3 { @@ -151,7 +151,7 @@ func parser_visit(path string, f os.FileInfo, err error) error { stage = subs[len(subs)-2] ftype = subs[len(subs)-3] fauthor = "" - log.Debugf("INSTALL check [%s] by [%s] in stage [%s] of type [%s]", fname, fauthor, stage, ftype) + log.Tracef("INSTALL check [%s] by [%s] in stage [%s] of type [%s]", fname, fauthor, stage, ftype) } //log.Printf("%s -> name:%s stage:%s", path, fname, stage) @@ -165,7 +165,7 @@ func parser_visit(path string, f os.FileInfo, err error) error { return fmt.Errorf("unknown prefix in %s : fname:%s, fauthor:%s, stage:%s, ftype:%s", path, fname, fauthor, stage, ftype) } - log.Debugf("CORRECTED [%s] by [%s] in stage [%s] of type [%s]", fname, fauthor, stage, ftype) + log.Tracef("CORRECTED [%s] by [%s] in stage [%s] of type [%s]", fname, fauthor, stage, ftype) /* we can encounter 'collections' in the form of a symlink : @@ -176,7 +176,7 @@ func parser_visit(path string, f os.FileInfo, err error) error { if f.Mode()&os.ModeSymlink == 0 { local = true skippedLocal++ - log.Debugf("%s isn't a symlink", path) + log.Tracef("%s isn't a symlink", path) } else { hubpath, err = os.Readlink(path) if err != nil { @@ -192,7 +192,7 @@ func parser_visit(path string, f os.FileInfo, err error) error { } return nil } - log.Debugf("%s points to %s", path, hubpath) + log.Tracef("%s points to %s", path, hubpath) } //if it's not a symlink and not in hub, it's a local file, don't bother @@ -214,13 +214,13 @@ func parser_visit(path string, f os.FileInfo, err error) error { return nil } //try to find which configuration item it is - log.Debugf("check [%s] of %s", fname, ftype) + log.Tracef("check [%s] of %s", fname, ftype) match := false for k, v := range HubIdx[ftype] { - log.Debugf("check [%s] vs [%s] : %s", fname, v.RemotePath, ftype+"/"+stage+"/"+fname+".yaml") + log.Tracef("check [%s] vs [%s] : %s", fname, v.RemotePath, ftype+"/"+stage+"/"+fname+".yaml") if fname != v.FileName { - log.Debugf("%s != %s (filename)", fname, v.FileName) + log.Tracef("%s != %s (filename)", fname, v.FileName) continue } //wrong stage @@ -238,7 +238,7 @@ func parser_visit(path string, f os.FileInfo, err error) error { continue } if path == Hubdir+"/"+v.RemotePath { - log.Debugf("marking %s as downloaded", v.Name) + log.Tracef("marking %s as downloaded", v.Name) v.Downloaded = true } } else { @@ -274,10 +274,8 @@ func parser_visit(path string, f os.FileInfo, err error) error { target.FileName = x[len(x)-1] } if version == v.Version { - log.Debugf("%s is up-to-date", v.Name) + log.Tracef("%s is up-to-date", v.Name) v.UpToDate = true - } else { - log.Debugf("%s is outdated", v.Name) } match = true @@ -310,18 +308,18 @@ func parser_visit(path string, f os.FileInfo, err error) error { func CollecDepsCheck(v *Item) error { /*if it's a collection, ensure all the items are installed, or tag it as tainted*/ if v.Type == COLLECTIONS { - log.Debugf("checking submembers of %s installed:%t", v.Name, v.Installed) + log.Tracef("checking submembers of %s installed:%t", v.Name, v.Installed) var tmp = [][]string{v.Parsers, v.PostOverflows, v.Scenarios, v.Collections} for idx, ptr := range tmp { ptrtype := ItemTypes[idx] for _, p := range ptr { if val, ok := HubIdx[ptrtype][p]; ok { - log.Debugf("check %s installed:%t", val.Name, val.Installed) + log.Tracef("check %s installed:%t", val.Name, val.Installed) if !v.Installed { continue } if val.Type == COLLECTIONS { - log.Debugf("collec, recurse.") + log.Tracef("collec, recurse.") if err := CollecDepsCheck(&val); err != nil { return fmt.Errorf("sub collection %s is broken : %s", val.Name, err) } @@ -341,7 +339,7 @@ func CollecDepsCheck(v *Item) error { } val.BelongsToCollections = append(val.BelongsToCollections, v.Name) HubIdx[ptrtype][p] = val - log.Debugf("checking for %s - tainted:%t uptodate:%t", p, v.Tainted, v.UpToDate) + log.Tracef("checking for %s - tainted:%t uptodate:%t", p, v.Tainted, v.UpToDate) } else { log.Fatalf("Referred %s %s in collection %s doesn't exist.", ptrtype, p, v.Name) } @@ -653,7 +651,7 @@ func DownloadLatest(target Item, tdir string, overwrite bool, dataFolder string) log.Debugf("Download %s sub-item : %s %s", target.Name, ptrtype, p) //recurse as it's a collection if ptrtype == COLLECTIONS { - log.Debugf("collection, recurse") + log.Tracef("collection, recurse") HubIdx[ptrtype][p], err = DownloadLatest(val, tdir, overwrite, dataFolder) if err != nil { log.Errorf("Encountered error while downloading sub-item %s %s : %s.", ptrtype, p, err) diff --git a/pkg/cwplugin/backend.go b/pkg/cwplugin/backend.go index 4cc59196e..5bccff4d0 100644 --- a/pkg/cwplugin/backend.go +++ b/pkg/cwplugin/backend.go @@ -22,24 +22,35 @@ type Backend interface { Flush() error Shutdown() error DeleteAll() error + StartAutoCommit() error } type BackendPlugin struct { Name string `yaml:"name"` Path string `yaml:"path"` ConfigFilePath string - Config map[string]string `yaml:"config"` - ID string - funcs Backend + //Config is passed to the backend plugin. + //It contains specific plugin config + plugin config from main yaml file + Config map[string]string `yaml:"config"` + ID string + funcs Backend } type BackendManager struct { backendPlugins map[string]BackendPlugin } -func NewBackendPlugin(path string, isDaemon bool) (*BackendManager, error) { +func NewBackendPlugin(outputConfig map[string]string) (*BackendManager, error) { var files []string var backendManager = &BackendManager{} + var path string + + if v, ok := outputConfig["backend"]; ok { + path = v + } else { + return nil, fmt.Errorf("missing 'backend' (path to backend plugins)") + } + //var path = output.BackendFolder err := filepath.Walk(path, func(path string, info os.FileInfo, err error) error { if filepath.Ext(path) == ".yaml" { files = append(files, path) @@ -88,17 +99,28 @@ func NewBackendPlugin(path string, isDaemon bool) (*BackendManager, error) { // Add the interface and Init() newPlugin.funcs = bInterface - if isDaemon { - newPlugin.Config["flush"] = "true" - } else { - newPlugin.Config["flush"] = "false" + // Merge backend config from main config file + if v, ok := outputConfig["debug"]; ok { + newPlugin.Config["debug"] = v + } + + if v, ok := outputConfig["max_records"]; ok { + newPlugin.Config["max_records"] = v + } + + if v, ok := outputConfig["max_records_age"]; ok { + newPlugin.Config["max_records_age"] = v + } + + if v, ok := outputConfig["flush"]; ok { + newPlugin.Config["flush"] = v } err = newPlugin.funcs.Init(newPlugin.Config) if err != nil { return nil, fmt.Errorf("plugin '%s' init error : %s", newPlugin.Name, err) } - log.Infof("backend plugin '%s' loaded", newPlugin.Name) + log.Debugf("backend plugin '%s' loaded", newPlugin.Name) backendManager.backendPlugins[newPlugin.Name] = newPlugin } @@ -175,6 +197,17 @@ func (b *BackendManager) IsBackendPlugin(plugin string) bool { return false } +func (b *BackendManager) StartAutoCommit() error { + var err error + for _, plugin := range b.backendPlugins { + err = plugin.funcs.StartAutoCommit() + if err != nil { + return err + } + } + return nil +} + func (b *BackendManager) ReadAT(timeAT time.Time) ([]map[string]string, error) { var ret []map[string]string var err error diff --git a/pkg/exprhelpers/exprlib.go b/pkg/exprhelpers/exprlib.go index 45d2e058e..e1ae0eef2 100644 --- a/pkg/exprhelpers/exprlib.go +++ b/pkg/exprhelpers/exprlib.go @@ -44,7 +44,6 @@ func GetExprEnv(ctx map[string]interface{}) map[string]interface{} { } func Init() error { - log.Infof("Expr helper initiated") dataFile = make(map[string][]string) dataFileRegex = make(map[string][]*regexp.Regexp) return nil diff --git a/pkg/outputs/ouputs.go b/pkg/outputs/ouputs.go index 92d69958a..9b8f97753 100644 --- a/pkg/outputs/ouputs.go +++ b/pkg/outputs/ouputs.go @@ -10,6 +10,7 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/cwplugin" "github.com/crowdsecurity/crowdsec/pkg/exprhelpers" "github.com/crowdsecurity/crowdsec/pkg/types" + "github.com/pkg/errors" "github.com/crowdsecurity/crowdsec/pkg/cwapi" @@ -18,10 +19,19 @@ import ( "gopkg.in/yaml.v2" ) +//OutputFactory is part of the main yaml configuration file, and holds generic backend config type OutputFactory struct { - BackendFolder string `yaml:"backend"` + BackendFolder string `yaml:"backend,omitempty"` + //For the db GC : how many records can we keep at most + MaxRecords string `yaml:"max_records,omitempty"` + //For the db GC what is the oldest records we tolerate + MaxRecordsAge string `yaml:"max_records_age,omitempty"` + //Should we automatically flush expired bans + Flush bool + Debug bool `yaml:"debug"` } +//Output holds the runtime objects of backend type Output struct { API *cwapi.ApiCtx bManager *cwplugin.BackendManager @@ -86,6 +96,10 @@ func OvflwToOrder(sig types.SignalOccurence, prof types.Profile) (*types.BanOrde return &ordr, nil, warn } +func (o *Output) StartAutoCommit() error { + return o.bManager.StartAutoCommit() +} + func (o *Output) Shutdown() error { var reterr error if o.API != nil { @@ -100,8 +114,6 @@ func (o *Output) Shutdown() error { reterr = err } } - //bManager - //TBD : the backend(s) should be stopped in the same way return reterr } @@ -286,19 +298,6 @@ func (o *Output) LoadAPIConfig(configFile string) error { return nil } -func (o *Output) load(config *OutputFactory, isDaemon bool) error { - var err error - if config == nil { - return fmt.Errorf("missing output plugin configuration") - } - log.Debugf("loading backend plugins ...") - o.bManager, err = cwplugin.NewBackendPlugin(config.BackendFolder, isDaemon) - if err != nil { - return err - } - return nil -} - func (o *Output) Delete(target string) (int, error) { nbDel, err := o.bManager.Delete(target) return nbDel, err @@ -327,11 +326,30 @@ func (o *Output) ReadAT(timeAT time.Time) ([]map[string]string, error) { return ret, nil } -func NewOutput(config *OutputFactory, isDaemon bool) (*Output, error) { +func NewOutput(config *OutputFactory) (*Output, error) { var output Output - err := output.load(config, isDaemon) + var err error + + if config == nil { + return nil, fmt.Errorf("missing output plugin configuration") + } + log.Debugf("loading backend plugins ...") + //turn the *OutputFactory into a map[string]string for less constraint + backendConfig := map[string]string{ + "backend": config.BackendFolder, + "flush": strconv.FormatBool(config.Flush), + "debug": strconv.FormatBool(config.Debug)} + + if config.MaxRecords != "" { + backendConfig["max_records"] = config.MaxRecords + } + if config.MaxRecordsAge != "" { + backendConfig["max_records_age"] = config.MaxRecordsAge + } + + output.bManager, err = cwplugin.NewBackendPlugin(backendConfig) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to load backend plugin") } return &output, nil } diff --git a/pkg/parser/stage.go b/pkg/parser/stage.go index 37bdb9baf..95bb66a31 100644 --- a/pkg/parser/stage.go +++ b/pkg/parser/stage.go @@ -133,7 +133,8 @@ func LoadStages(stageFiles []Stagefile, pctx *UnixParserCtx) ([]Node, error) { pctx.Stages = append(pctx.Stages, k) } sort.Strings(pctx.Stages) - log.Infof("Stages loaded: %+v", pctx.Stages) + log.Infof("Loaded %d nodes, %d stages", len(nodes), len(pctx.Stages)) + return nodes, nil } diff --git a/pkg/sqlite/commit.go b/pkg/sqlite/commit.go index cec4e5c96..01355b718 100644 --- a/pkg/sqlite/commit.go +++ b/pkg/sqlite/commit.go @@ -6,9 +6,21 @@ import ( "time" "github.com/crowdsecurity/crowdsec/pkg/types" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) +func (c *Context) DeleteExpired() error { + //Delete the expired records + if c.flush { + retx := c.Db.Where(`strftime("%s", until) < strftime("%s", "now")`).Delete(types.BanApplication{}) + if retx.RowsAffected > 0 { + log.Infof("Flushed %d expired entries from Ban Application", retx.RowsAffected) + } + } + return nil +} + func (c *Context) Flush() error { c.lock.Lock() defer c.lock.Unlock() @@ -21,18 +33,120 @@ func (c *Context) Flush() error { } c.tx = c.Db.Begin() c.lastCommit = time.Now() - //Delete the expired records - if c.flush { - retx := c.Db.Where(`strftime("%s", until) < strftime("%s", "now")`).Delete(types.BanApplication{}) - if retx.RowsAffected > 0 { - log.Infof("Flushed %d expired entries from Ban Application", retx.RowsAffected) + return nil +} + +func (c *Context) CleanUpRecordsByAge() error { + //let's fetch all expired records that are more than XX days olds + sos := []types.BanApplication{} + + if c.maxDurationRetention == 0 { + return nil + } + + //look for soft-deleted events that are OLDER than maxDurationRetention + ret := c.Db.Unscoped().Table("ban_applications").Where("deleted_at is not NULL"). + Where(fmt.Sprintf("deleted_at > date('now','-%d minutes')", int(c.maxDurationRetention.Minutes()))). + Order("updated_at desc").Find(&sos) + + if ret.Error != nil { + return errors.Wrap(ret.Error, "failed to get count of old records") + } + + //no events elligible + if len(sos) == 0 || ret.RowsAffected == 0 { + log.Debugf("no event older than %s", c.maxDurationRetention.String()) + return nil + } + //let's do it in a single transaction + delTx := c.Db.Unscoped().Begin() + delRecords := 0 + + for _, record := range sos { + copy := record + delTx.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{}) + delTx.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{}) + delTx.Unscoped().Table("ban_applications").Delete(©) + //we need to delete associations : event_sequences, signal_occurences + delRecords++ + } + ret = delTx.Unscoped().Commit() + if ret.Error != nil { + return errors.Wrap(ret.Error, "failed to delete records") + } + log.Printf("max_records_age: deleting %d events (max age:%s)", delRecords, c.maxDurationRetention) + return nil +} + +func (c *Context) CleanUpRecordsByCount() error { + var count int + + if c.maxEventRetention <= 0 { + return nil + } + + ret := c.Db.Unscoped().Table("ban_applications").Order("updated_at desc").Count(&count) + + if ret.Error != nil { + return errors.Wrap(ret.Error, "failed to get bans count") + } + if count < c.maxEventRetention { + log.Debugf("%d < %d, don't cleanup", count, c.maxEventRetention) + return nil + } + + sos := []types.BanApplication{} + /*get soft deleted records oldest to youngest*/ + records := c.Db.Unscoped().Table("ban_applications").Where("deleted_at is not NULL").Where(`strftime("%s", deleted_at) < strftime("%s", "now")`).Find(&sos) + if records.Error != nil { + return errors.Wrap(records.Error, "failed to list expired bans for flush") + } + + //let's do it in a single transaction + delTx := c.Db.Unscoped().Begin() + delRecords := 0 + for _, ld := range sos { + copy := ld + delTx.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{}) + delTx.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{}) + delTx.Unscoped().Table("ban_applications").Delete(©) + //we need to delete associations : event_sequences, signal_occurences + delRecords++ + //let's delete as well the associated event_sequence + if count-delRecords <= c.maxEventRetention { + break } } + if len(sos) > 0 { + //log.Printf("Deleting %d soft-deleted results out of %d total events (%d soft-deleted)", delRecords, count, len(sos)) + log.Printf("max_records: deleting %d events. (%d soft-deleted)", delRecords, len(sos)) + ret = delTx.Unscoped().Commit() + if ret.Error != nil { + return errors.Wrap(ret.Error, "failed to delete records") + } + } else { + log.Debugf("didn't find any record to clean") + } return nil } -func (c *Context) AutoCommit() { +func (c *Context) StartAutoCommit() error { + //TBD : we shouldn't start auto-commit if we are in cli mode ? + c.PusherTomb.Go(func() error { + c.autoCommit() + return nil + }) + return nil +} + +func (c *Context) autoCommit() { + log.Debugf("starting autocommit") ticker := time.NewTicker(200 * time.Millisecond) + cleanUpTicker := time.NewTicker(1 * time.Minute) + expireTicker := time.NewTicker(1 * time.Second) + if !c.flush { + log.Debugf("flush is disabled") + } for { select { case <-c.PusherTomb.Dying(): @@ -51,12 +165,25 @@ func (c *Context) AutoCommit() { log.Errorf("error while closing db : %s", err) } return + case <-expireTicker.C: + if err := c.DeleteExpired(); err != nil { + log.Errorf("Error while deleting expired records: %s", err) + } case <-ticker.C: if atomic.LoadInt32(&c.count) != 0 && (atomic.LoadInt32(&c.count)%100 == 0 || time.Since(c.lastCommit) >= 500*time.Millisecond) { if err := c.Flush(); err != nil { log.Errorf("failed to flush : %s", err) } + + } + case <-cleanUpTicker.C: + if err := c.CleanUpRecordsByCount(); err != nil { + log.Errorf("error in max records cleanup : %s", err) + } + if err := c.CleanUpRecordsByAge(); err != nil { + log.Errorf("error in old records cleanup : %s", err) + } } } diff --git a/pkg/sqlite/sqlite.go b/pkg/sqlite/sqlite.go index b3c5d8a63..1527ee01f 100644 --- a/pkg/sqlite/sqlite.go +++ b/pkg/sqlite/sqlite.go @@ -7,6 +7,7 @@ import ( "time" "github.com/crowdsecurity/crowdsec/pkg/types" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/jinzhu/gorm" @@ -23,12 +24,27 @@ type Context struct { count int32 lock sync.Mutex //booboo PusherTomb tomb.Tomb + //to manage auto cleanup : max number of records *or* oldest + maxEventRetention int + maxDurationRetention time.Duration } func NewSQLite(cfg map[string]string) (*Context, error) { var err error c := &Context{} + if v, ok := cfg["max_records"]; ok { + c.maxEventRetention, err = strconv.Atoi(v) + if err != nil { + log.Errorf("Ignoring invalid max_records '%s' : %s", v, err) + } + } + if v, ok := cfg["max_records_age"]; ok { + c.maxDurationRetention, err = time.ParseDuration(v) + if err != nil { + log.Errorf("Ignoring invalid duration '%s' : %s", v, err) + } + } if _, ok := cfg["db_path"]; !ok { return nil, fmt.Errorf("please specify a 'db_path' to SQLite db in the configuration") } @@ -36,6 +52,7 @@ func NewSQLite(cfg map[string]string) (*Context, error) { if cfg["db_path"] == "" { return nil, fmt.Errorf("please specify a 'db_path' to SQLite db in the configuration") } + log.Debugf("Starting SQLite backend, path:%s", cfg["db_path"]) c.Db, err = gorm.Open("sqlite3", cfg["db_path"]+"?_busy_timeout=1000") if err != nil { @@ -47,7 +64,10 @@ func NewSQLite(cfg map[string]string) (*Context, error) { c.Db.LogMode(true) } - c.flush, _ = strconv.ParseBool(cfg["flush"]) + c.flush, err = strconv.ParseBool(cfg["flush"]) + if err != nil { + return nil, errors.Wrap(err, "Unable to parse 'flush' flag") + } // Migrate the schema c.Db.AutoMigrate(&types.EventSequence{}, &types.SignalOccurence{}, &types.BanApplication{}) c.Db.Model(&types.SignalOccurence{}).Related(&types.EventSequence{}) @@ -64,9 +84,5 @@ func NewSQLite(cfg map[string]string) (*Context, error) { if c.tx == nil { return nil, fmt.Errorf("failed to begin sqlite transac : %s", err) } - c.PusherTomb.Go(func() error { - c.AutoCommit() - return nil - }) return c, nil } diff --git a/plugins/backend/sqlite.go b/plugins/backend/sqlite.go index 4e1943018..00be2f269 100644 --- a/plugins/backend/sqlite.go +++ b/plugins/backend/sqlite.go @@ -23,6 +23,10 @@ func (p *pluginDB) Shutdown() error { return nil } +func (p *pluginDB) StartAutoCommit() error { + return p.CTX.StartAutoCommit() +} + func (p *pluginDB) Init(config map[string]string) error { var err error log.Debugf("sqlite config : %+v \n", config)