Sqlite : Support automatic db flushing (#91)

* add support for sqlite retention : max_records, max_records_age

* reduce verbosity of cwhub
This commit is contained in:
Thibault "bui" Koechlin 2020-07-01 17:04:29 +02:00 committed by GitHub
parent 81ef26f406
commit b9ae94b874
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 296 additions and 89 deletions

View file

@ -157,8 +157,9 @@ cscli api credentials # Display your API credentials
outputConfig := outputs.OutputFactory{
BackendFolder: config.BackendPluginFolder,
Flush: false,
}
outputCTX, err = outputs.NewOutput(&outputConfig, false)
outputCTX, err = outputs.NewOutput(&outputConfig)
if err != nil {
return err
}

View file

@ -412,8 +412,9 @@ cscli backup restore ./my-backup`,
outputConfig := outputs.OutputFactory{
BackendFolder: config.BackendPluginFolder,
Flush: false,
}
outputCTX, err = outputs.NewOutput(&outputConfig, false)
outputCTX, err = outputs.NewOutput(&outputConfig)
if err != nil {
log.Fatalf("Failed to load output plugins : %v", err)
}
@ -453,8 +454,9 @@ cscli backup restore ./my-backup`,
outputConfig := outputs.OutputFactory{
BackendFolder: config.BackendPluginFolder,
Flush: false,
}
outputCTX, err = outputs.NewOutput(&outputConfig, false)
outputCTX, err = outputs.NewOutput(&outputConfig)
if err != nil {
log.Fatalf("Failed to load output plugins : %v", err)
}

View file

@ -167,7 +167,7 @@ func BanAdd(target string, duration string, reason string, action string) error
if err != nil {
return err
}
log.Infof("Wrote ban to database.")
log.Infof("%s %s for %s (%s)", action, target, duration, reason)
return nil
}
@ -188,9 +188,10 @@ You can add/delete/list or flush current bans in your local ban DB.`,
outputConfig := outputs.OutputFactory{
BackendFolder: config.BackendPluginFolder,
Flush: false,
}
outputCTX, err = outputs.NewOutput(&outputConfig, false)
outputCTX, err = outputs.NewOutput(&outputConfig)
if err != nil {
return fmt.Errorf(err.Error())
}
@ -220,9 +221,10 @@ cscli ban add range 1.2.3.0/24 24h "the whole range"`,
Short: "Adds the specific ip to the ban db",
Long: `Duration must be [time.ParseDuration](https://golang.org/pkg/time/#ParseDuration), expressed in s/m/h.`,
Example: `cscli ban add ip 1.2.3.4 12h "the scan"`,
Args: cobra.ExactArgs(3),
Args: cobra.MinimumNArgs(3),
Run: func(cmd *cobra.Command, args []string) {
if err := BanAdd(args[0], args[1], args[2], remediationType); err != nil {
reason := strings.Join(args[2:], " ")
if err := BanAdd(args[0], args[1], reason, remediationType); err != nil {
log.Fatalf("failed to add ban to sqlite : %v", err)
}
},
@ -233,9 +235,10 @@ cscli ban add range 1.2.3.0/24 24h "the whole range"`,
Short: "Adds the specific ip to the ban db",
Long: `Duration must be [time.ParseDuration](https://golang.org/pkg/time/#ParseDuration) compatible, expressed in s/m/h.`,
Example: `cscli ban add range 1.2.3.0/24 12h "the whole range"`,
Args: cobra.ExactArgs(3),
Args: cobra.MinimumNArgs(3),
Run: func(cmd *cobra.Command, args []string) {
if err := BanAdd(args[0], args[1], args[2], remediationType); err != nil {
reason := strings.Join(args[2:], " ")
if err := BanAdd(args[0], args[1], reason, remediationType); err != nil {
log.Fatalf("failed to add ban to sqlite : %v", err)
}
},

View file

@ -14,6 +14,7 @@ import (
"github.com/crowdsecurity/crowdsec/pkg/outputs"
"github.com/crowdsecurity/crowdsec/pkg/parser"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/pkg/errors"
"github.com/sevlyar/go-daemon"
log "github.com/sirupsen/logrus"
@ -151,11 +152,22 @@ func LoadOutputs(cConfig *csconfig.CrowdSec) error {
return fmt.Errorf("Failed to load output profiles : %v", err)
}
OutputRunner, err = outputs.NewOutput(cConfig.OutputConfig, cConfig.Daemonize)
//If the user is providing a single file (ie forensic mode), don't flush expired records
if cConfig.SingleFile != "" {
log.Infof("forensic mode, disable flush")
cConfig.OutputConfig.Flush = false
} else {
cConfig.OutputConfig.Flush = true
}
OutputRunner, err = outputs.NewOutput(cConfig.OutputConfig)
if err != nil {
return fmt.Errorf("output plugins initialization error : %s", err.Error())
}
if err := OutputRunner.StartAutoCommit(); err != nil {
return errors.Wrap(err, "failed to start autocommit")
}
/* Init the API connector */
if cConfig.APIMode {
log.Infof("Loading API client")
@ -277,7 +289,6 @@ func main() {
if err := LoadBuckets(cConfig); err != nil {
log.Fatalf("Failed to load scenarios: %s", err)
}
if err := LoadOutputs(cConfig); err != nil {

View file

@ -129,8 +129,6 @@ func termHandler(sig os.Signal) error {
}
func serveOneTimeRun(outputRunner outputs.Output) error {
log.Infof("waiting for acquisition to finish")
if err := acquisTomb.Wait(); err != nil {
log.Warningf("acquisition returned error : %s", err)
}

View file

@ -6,8 +6,12 @@ cscli_dir: "./config/crowdsec-cli"
log_dir: "./logs"
log_mode: "stdout"
log_level: info
prometheus: true
profiling: false
sqlite_path: "./test.db"
apimode: false
plugin:
backend: "./config/plugins/backend"
max_records: 10000
#30 days = 720 hours
max_records_age: 720h

View file

@ -2,4 +2,3 @@ name: sqlite
path: /usr/local/lib/crowdsec/plugins/backend/sqlite.so
config:
db_path: /var/lib/crowdsec/data/crowdsec.db
flush: true

View file

@ -7,7 +7,6 @@ cscli_dir: ${CFG}/cscli
log_mode: file
log_level: info
profiling: false
sqlite_path: ${DATA}/crowdsec.db
apimode: true
daemon: true
prometheus: true
@ -15,3 +14,4 @@ prometheus: true
http_listen: 127.0.0.1:6060
plugin:
backend: "/etc/crowdsec/plugins/backend"
max_records_age: 720h

View file

@ -7,7 +7,6 @@ cscli_dir: ${CFG}/cscli
log_mode: stdout
log_level: info
profiling: false
sqlite_path: ${DATA}/crowdsec.db
apimode: false
daemon: false
prometheus: false

View file

@ -25,14 +25,13 @@ type CrowdSec struct {
SingleFileLabel string //for forensic mode
PIDFolder string `yaml:"pid_dir,omitempty"`
LogFolder string `yaml:"log_dir,omitempty"`
LogMode string `yaml:"log_mode,omitempty"` //like file, syslog or stdout ?
LogLevel log.Level `yaml:"log_level,omitempty"` //trace,debug,info,warning,error
Daemonize bool `yaml:"daemon,omitempty"` //true -> go background
Profiling bool `yaml:"profiling,omitempty"` //true -> enable runtime profiling
SQLiteFile string `yaml:"sqlite_path,omitempty"` //path to sqlite output
APIMode bool `yaml:"apimode,omitempty"` //true -> enable api push
CsCliFolder string `yaml:"cscli_dir"` //cscli folder
NbParsers int `yaml:"parser_routines"` //the number of go routines to start for parsing
LogMode string `yaml:"log_mode,omitempty"` //like file, syslog or stdout ?
LogLevel log.Level `yaml:"log_level,omitempty"` //trace,debug,info,warning,error
Daemonize bool `yaml:"daemon,omitempty"` //true -> go background
Profiling bool `yaml:"profiling,omitempty"` //true -> enable runtime profiling
APIMode bool `yaml:"apimode,omitempty"` //true -> enable api push
CsCliFolder string `yaml:"cscli_dir"` //cscli folder
NbParsers int `yaml:"parser_routines"` //the number of go routines to start for parsing
Linter bool
Prometheus bool
HTTPListen string `yaml:"http_listen,omitempty"`
@ -53,7 +52,6 @@ func NewCrowdSecConfig() *CrowdSec {
PIDFolder: "/var/run/",
LogFolder: "/var/log/",
LogMode: "stdout",
SQLiteFile: "/var/lib/crowdsec/data/crowdsec.db",
APIMode: false,
NbParsers: 1,
Prometheus: false,
@ -89,7 +87,6 @@ func (c *CrowdSec) GetOPT() error {
printInfo := flag.Bool("info", false, "print info-level on stdout")
printVersion := flag.Bool("version", false, "display version")
APIMode := flag.Bool("api", false, "perform pushes to api")
SQLiteMode := flag.Bool("sqlite", true, "write overflows to sqlite")
profileMode := flag.Bool("profile", false, "Enable performance profiling")
catFile := flag.String("file", "", "Process a single file in time-machine")
catFileType := flag.String("type", "", "Labels.type for file in time-machine")
@ -156,9 +153,6 @@ func (c *CrowdSec) GetOPT() error {
if *printTrace {
c.LogLevel = log.TraceLevel
}
if !*SQLiteMode {
c.SQLiteFile = ""
}
if *APIMode {
c.APIMode = true
}

View file

@ -123,7 +123,7 @@ func parser_visit(path string, f os.FileInfo, err error) error {
subs := strings.Split(path, "/")
log.Debugf("path:%s, hubdir:%s, installdir:%s", path, Hubdir, Installdir)
log.Tracef("path:%s, hubdir:%s, installdir:%s", path, Hubdir, Installdir)
/*we're in hub (~/.cscli/hub/)*/
if strings.HasPrefix(path, Hubdir) {
inhub = true
@ -137,7 +137,7 @@ func parser_visit(path string, f os.FileInfo, err error) error {
fauthor = subs[len(subs)-2]
stage = subs[len(subs)-3]
ftype = subs[len(subs)-4]
log.Debugf("HUBB check [%s] by [%s] in stage [%s] of type [%s]", fname, fauthor, stage, ftype)
log.Tracef("HUBB check [%s] by [%s] in stage [%s] of type [%s]", fname, fauthor, stage, ftype)
} else if strings.HasPrefix(path, Installdir) { /*we're in install /etc/crowdsec/<type>/... */
if len(subs) < 3 {
@ -151,7 +151,7 @@ func parser_visit(path string, f os.FileInfo, err error) error {
stage = subs[len(subs)-2]
ftype = subs[len(subs)-3]
fauthor = ""
log.Debugf("INSTALL check [%s] by [%s] in stage [%s] of type [%s]", fname, fauthor, stage, ftype)
log.Tracef("INSTALL check [%s] by [%s] in stage [%s] of type [%s]", fname, fauthor, stage, ftype)
}
//log.Printf("%s -> name:%s stage:%s", path, fname, stage)
@ -165,7 +165,7 @@ func parser_visit(path string, f os.FileInfo, err error) error {
return fmt.Errorf("unknown prefix in %s : fname:%s, fauthor:%s, stage:%s, ftype:%s", path, fname, fauthor, stage, ftype)
}
log.Debugf("CORRECTED [%s] by [%s] in stage [%s] of type [%s]", fname, fauthor, stage, ftype)
log.Tracef("CORRECTED [%s] by [%s] in stage [%s] of type [%s]", fname, fauthor, stage, ftype)
/*
we can encounter 'collections' in the form of a symlink :
@ -176,7 +176,7 @@ func parser_visit(path string, f os.FileInfo, err error) error {
if f.Mode()&os.ModeSymlink == 0 {
local = true
skippedLocal++
log.Debugf("%s isn't a symlink", path)
log.Tracef("%s isn't a symlink", path)
} else {
hubpath, err = os.Readlink(path)
if err != nil {
@ -192,7 +192,7 @@ func parser_visit(path string, f os.FileInfo, err error) error {
}
return nil
}
log.Debugf("%s points to %s", path, hubpath)
log.Tracef("%s points to %s", path, hubpath)
}
//if it's not a symlink and not in hub, it's a local file, don't bother
@ -214,13 +214,13 @@ func parser_visit(path string, f os.FileInfo, err error) error {
return nil
}
//try to find which configuration item it is
log.Debugf("check [%s] of %s", fname, ftype)
log.Tracef("check [%s] of %s", fname, ftype)
match := false
for k, v := range HubIdx[ftype] {
log.Debugf("check [%s] vs [%s] : %s", fname, v.RemotePath, ftype+"/"+stage+"/"+fname+".yaml")
log.Tracef("check [%s] vs [%s] : %s", fname, v.RemotePath, ftype+"/"+stage+"/"+fname+".yaml")
if fname != v.FileName {
log.Debugf("%s != %s (filename)", fname, v.FileName)
log.Tracef("%s != %s (filename)", fname, v.FileName)
continue
}
//wrong stage
@ -238,7 +238,7 @@ func parser_visit(path string, f os.FileInfo, err error) error {
continue
}
if path == Hubdir+"/"+v.RemotePath {
log.Debugf("marking %s as downloaded", v.Name)
log.Tracef("marking %s as downloaded", v.Name)
v.Downloaded = true
}
} else {
@ -274,10 +274,8 @@ func parser_visit(path string, f os.FileInfo, err error) error {
target.FileName = x[len(x)-1]
}
if version == v.Version {
log.Debugf("%s is up-to-date", v.Name)
log.Tracef("%s is up-to-date", v.Name)
v.UpToDate = true
} else {
log.Debugf("%s is outdated", v.Name)
}
match = true
@ -310,18 +308,18 @@ func parser_visit(path string, f os.FileInfo, err error) error {
func CollecDepsCheck(v *Item) error {
/*if it's a collection, ensure all the items are installed, or tag it as tainted*/
if v.Type == COLLECTIONS {
log.Debugf("checking submembers of %s installed:%t", v.Name, v.Installed)
log.Tracef("checking submembers of %s installed:%t", v.Name, v.Installed)
var tmp = [][]string{v.Parsers, v.PostOverflows, v.Scenarios, v.Collections}
for idx, ptr := range tmp {
ptrtype := ItemTypes[idx]
for _, p := range ptr {
if val, ok := HubIdx[ptrtype][p]; ok {
log.Debugf("check %s installed:%t", val.Name, val.Installed)
log.Tracef("check %s installed:%t", val.Name, val.Installed)
if !v.Installed {
continue
}
if val.Type == COLLECTIONS {
log.Debugf("collec, recurse.")
log.Tracef("collec, recurse.")
if err := CollecDepsCheck(&val); err != nil {
return fmt.Errorf("sub collection %s is broken : %s", val.Name, err)
}
@ -341,7 +339,7 @@ func CollecDepsCheck(v *Item) error {
}
val.BelongsToCollections = append(val.BelongsToCollections, v.Name)
HubIdx[ptrtype][p] = val
log.Debugf("checking for %s - tainted:%t uptodate:%t", p, v.Tainted, v.UpToDate)
log.Tracef("checking for %s - tainted:%t uptodate:%t", p, v.Tainted, v.UpToDate)
} else {
log.Fatalf("Referred %s %s in collection %s doesn't exist.", ptrtype, p, v.Name)
}
@ -653,7 +651,7 @@ func DownloadLatest(target Item, tdir string, overwrite bool, dataFolder string)
log.Debugf("Download %s sub-item : %s %s", target.Name, ptrtype, p)
//recurse as it's a collection
if ptrtype == COLLECTIONS {
log.Debugf("collection, recurse")
log.Tracef("collection, recurse")
HubIdx[ptrtype][p], err = DownloadLatest(val, tdir, overwrite, dataFolder)
if err != nil {
log.Errorf("Encountered error while downloading sub-item %s %s : %s.", ptrtype, p, err)

View file

@ -22,24 +22,35 @@ type Backend interface {
Flush() error
Shutdown() error
DeleteAll() error
StartAutoCommit() error
}
type BackendPlugin struct {
Name string `yaml:"name"`
Path string `yaml:"path"`
ConfigFilePath string
Config map[string]string `yaml:"config"`
ID string
funcs Backend
//Config is passed to the backend plugin.
//It contains specific plugin config + plugin config from main yaml file
Config map[string]string `yaml:"config"`
ID string
funcs Backend
}
type BackendManager struct {
backendPlugins map[string]BackendPlugin
}
func NewBackendPlugin(path string, isDaemon bool) (*BackendManager, error) {
func NewBackendPlugin(outputConfig map[string]string) (*BackendManager, error) {
var files []string
var backendManager = &BackendManager{}
var path string
if v, ok := outputConfig["backend"]; ok {
path = v
} else {
return nil, fmt.Errorf("missing 'backend' (path to backend plugins)")
}
//var path = output.BackendFolder
err := filepath.Walk(path, func(path string, info os.FileInfo, err error) error {
if filepath.Ext(path) == ".yaml" {
files = append(files, path)
@ -88,17 +99,28 @@ func NewBackendPlugin(path string, isDaemon bool) (*BackendManager, error) {
// Add the interface and Init()
newPlugin.funcs = bInterface
if isDaemon {
newPlugin.Config["flush"] = "true"
} else {
newPlugin.Config["flush"] = "false"
// Merge backend config from main config file
if v, ok := outputConfig["debug"]; ok {
newPlugin.Config["debug"] = v
}
if v, ok := outputConfig["max_records"]; ok {
newPlugin.Config["max_records"] = v
}
if v, ok := outputConfig["max_records_age"]; ok {
newPlugin.Config["max_records_age"] = v
}
if v, ok := outputConfig["flush"]; ok {
newPlugin.Config["flush"] = v
}
err = newPlugin.funcs.Init(newPlugin.Config)
if err != nil {
return nil, fmt.Errorf("plugin '%s' init error : %s", newPlugin.Name, err)
}
log.Infof("backend plugin '%s' loaded", newPlugin.Name)
log.Debugf("backend plugin '%s' loaded", newPlugin.Name)
backendManager.backendPlugins[newPlugin.Name] = newPlugin
}
@ -175,6 +197,17 @@ func (b *BackendManager) IsBackendPlugin(plugin string) bool {
return false
}
func (b *BackendManager) StartAutoCommit() error {
var err error
for _, plugin := range b.backendPlugins {
err = plugin.funcs.StartAutoCommit()
if err != nil {
return err
}
}
return nil
}
func (b *BackendManager) ReadAT(timeAT time.Time) ([]map[string]string, error) {
var ret []map[string]string
var err error

View file

@ -44,7 +44,6 @@ func GetExprEnv(ctx map[string]interface{}) map[string]interface{} {
}
func Init() error {
log.Infof("Expr helper initiated")
dataFile = make(map[string][]string)
dataFileRegex = make(map[string][]*regexp.Regexp)
return nil

View file

@ -10,6 +10,7 @@ import (
"github.com/crowdsecurity/crowdsec/pkg/cwplugin"
"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/pkg/errors"
"github.com/crowdsecurity/crowdsec/pkg/cwapi"
@ -18,10 +19,19 @@ import (
"gopkg.in/yaml.v2"
)
//OutputFactory is part of the main yaml configuration file, and holds generic backend config
type OutputFactory struct {
BackendFolder string `yaml:"backend"`
BackendFolder string `yaml:"backend,omitempty"`
//For the db GC : how many records can we keep at most
MaxRecords string `yaml:"max_records,omitempty"`
//For the db GC what is the oldest records we tolerate
MaxRecordsAge string `yaml:"max_records_age,omitempty"`
//Should we automatically flush expired bans
Flush bool
Debug bool `yaml:"debug"`
}
//Output holds the runtime objects of backend
type Output struct {
API *cwapi.ApiCtx
bManager *cwplugin.BackendManager
@ -86,6 +96,10 @@ func OvflwToOrder(sig types.SignalOccurence, prof types.Profile) (*types.BanOrde
return &ordr, nil, warn
}
func (o *Output) StartAutoCommit() error {
return o.bManager.StartAutoCommit()
}
func (o *Output) Shutdown() error {
var reterr error
if o.API != nil {
@ -100,8 +114,6 @@ func (o *Output) Shutdown() error {
reterr = err
}
}
//bManager
//TBD : the backend(s) should be stopped in the same way
return reterr
}
@ -286,19 +298,6 @@ func (o *Output) LoadAPIConfig(configFile string) error {
return nil
}
func (o *Output) load(config *OutputFactory, isDaemon bool) error {
var err error
if config == nil {
return fmt.Errorf("missing output plugin configuration")
}
log.Debugf("loading backend plugins ...")
o.bManager, err = cwplugin.NewBackendPlugin(config.BackendFolder, isDaemon)
if err != nil {
return err
}
return nil
}
func (o *Output) Delete(target string) (int, error) {
nbDel, err := o.bManager.Delete(target)
return nbDel, err
@ -327,11 +326,30 @@ func (o *Output) ReadAT(timeAT time.Time) ([]map[string]string, error) {
return ret, nil
}
func NewOutput(config *OutputFactory, isDaemon bool) (*Output, error) {
func NewOutput(config *OutputFactory) (*Output, error) {
var output Output
err := output.load(config, isDaemon)
var err error
if config == nil {
return nil, fmt.Errorf("missing output plugin configuration")
}
log.Debugf("loading backend plugins ...")
//turn the *OutputFactory into a map[string]string for less constraint
backendConfig := map[string]string{
"backend": config.BackendFolder,
"flush": strconv.FormatBool(config.Flush),
"debug": strconv.FormatBool(config.Debug)}
if config.MaxRecords != "" {
backendConfig["max_records"] = config.MaxRecords
}
if config.MaxRecordsAge != "" {
backendConfig["max_records_age"] = config.MaxRecordsAge
}
output.bManager, err = cwplugin.NewBackendPlugin(backendConfig)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to load backend plugin")
}
return &output, nil
}

View file

@ -133,7 +133,8 @@ func LoadStages(stageFiles []Stagefile, pctx *UnixParserCtx) ([]Node, error) {
pctx.Stages = append(pctx.Stages, k)
}
sort.Strings(pctx.Stages)
log.Infof("Stages loaded: %+v", pctx.Stages)
log.Infof("Loaded %d nodes, %d stages", len(nodes), len(pctx.Stages))
return nodes, nil
}

View file

@ -6,9 +6,21 @@ import (
"time"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
func (c *Context) DeleteExpired() error {
//Delete the expired records
if c.flush {
retx := c.Db.Where(`strftime("%s", until) < strftime("%s", "now")`).Delete(types.BanApplication{})
if retx.RowsAffected > 0 {
log.Infof("Flushed %d expired entries from Ban Application", retx.RowsAffected)
}
}
return nil
}
func (c *Context) Flush() error {
c.lock.Lock()
defer c.lock.Unlock()
@ -21,18 +33,120 @@ func (c *Context) Flush() error {
}
c.tx = c.Db.Begin()
c.lastCommit = time.Now()
//Delete the expired records
if c.flush {
retx := c.Db.Where(`strftime("%s", until) < strftime("%s", "now")`).Delete(types.BanApplication{})
if retx.RowsAffected > 0 {
log.Infof("Flushed %d expired entries from Ban Application", retx.RowsAffected)
return nil
}
func (c *Context) CleanUpRecordsByAge() error {
//let's fetch all expired records that are more than XX days olds
sos := []types.BanApplication{}
if c.maxDurationRetention == 0 {
return nil
}
//look for soft-deleted events that are OLDER than maxDurationRetention
ret := c.Db.Unscoped().Table("ban_applications").Where("deleted_at is not NULL").
Where(fmt.Sprintf("deleted_at > date('now','-%d minutes')", int(c.maxDurationRetention.Minutes()))).
Order("updated_at desc").Find(&sos)
if ret.Error != nil {
return errors.Wrap(ret.Error, "failed to get count of old records")
}
//no events elligible
if len(sos) == 0 || ret.RowsAffected == 0 {
log.Debugf("no event older than %s", c.maxDurationRetention.String())
return nil
}
//let's do it in a single transaction
delTx := c.Db.Unscoped().Begin()
delRecords := 0
for _, record := range sos {
copy := record
delTx.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{})
delTx.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{})
delTx.Unscoped().Table("ban_applications").Delete(&copy)
//we need to delete associations : event_sequences, signal_occurences
delRecords++
}
ret = delTx.Unscoped().Commit()
if ret.Error != nil {
return errors.Wrap(ret.Error, "failed to delete records")
}
log.Printf("max_records_age: deleting %d events (max age:%s)", delRecords, c.maxDurationRetention)
return nil
}
func (c *Context) CleanUpRecordsByCount() error {
var count int
if c.maxEventRetention <= 0 {
return nil
}
ret := c.Db.Unscoped().Table("ban_applications").Order("updated_at desc").Count(&count)
if ret.Error != nil {
return errors.Wrap(ret.Error, "failed to get bans count")
}
if count < c.maxEventRetention {
log.Debugf("%d < %d, don't cleanup", count, c.maxEventRetention)
return nil
}
sos := []types.BanApplication{}
/*get soft deleted records oldest to youngest*/
records := c.Db.Unscoped().Table("ban_applications").Where("deleted_at is not NULL").Where(`strftime("%s", deleted_at) < strftime("%s", "now")`).Find(&sos)
if records.Error != nil {
return errors.Wrap(records.Error, "failed to list expired bans for flush")
}
//let's do it in a single transaction
delTx := c.Db.Unscoped().Begin()
delRecords := 0
for _, ld := range sos {
copy := ld
delTx.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{})
delTx.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{})
delTx.Unscoped().Table("ban_applications").Delete(&copy)
//we need to delete associations : event_sequences, signal_occurences
delRecords++
//let's delete as well the associated event_sequence
if count-delRecords <= c.maxEventRetention {
break
}
}
if len(sos) > 0 {
//log.Printf("Deleting %d soft-deleted results out of %d total events (%d soft-deleted)", delRecords, count, len(sos))
log.Printf("max_records: deleting %d events. (%d soft-deleted)", delRecords, len(sos))
ret = delTx.Unscoped().Commit()
if ret.Error != nil {
return errors.Wrap(ret.Error, "failed to delete records")
}
} else {
log.Debugf("didn't find any record to clean")
}
return nil
}
func (c *Context) AutoCommit() {
func (c *Context) StartAutoCommit() error {
//TBD : we shouldn't start auto-commit if we are in cli mode ?
c.PusherTomb.Go(func() error {
c.autoCommit()
return nil
})
return nil
}
func (c *Context) autoCommit() {
log.Debugf("starting autocommit")
ticker := time.NewTicker(200 * time.Millisecond)
cleanUpTicker := time.NewTicker(1 * time.Minute)
expireTicker := time.NewTicker(1 * time.Second)
if !c.flush {
log.Debugf("flush is disabled")
}
for {
select {
case <-c.PusherTomb.Dying():
@ -51,12 +165,25 @@ func (c *Context) AutoCommit() {
log.Errorf("error while closing db : %s", err)
}
return
case <-expireTicker.C:
if err := c.DeleteExpired(); err != nil {
log.Errorf("Error while deleting expired records: %s", err)
}
case <-ticker.C:
if atomic.LoadInt32(&c.count) != 0 &&
(atomic.LoadInt32(&c.count)%100 == 0 || time.Since(c.lastCommit) >= 500*time.Millisecond) {
if err := c.Flush(); err != nil {
log.Errorf("failed to flush : %s", err)
}
}
case <-cleanUpTicker.C:
if err := c.CleanUpRecordsByCount(); err != nil {
log.Errorf("error in max records cleanup : %s", err)
}
if err := c.CleanUpRecordsByAge(); err != nil {
log.Errorf("error in old records cleanup : %s", err)
}
}
}

View file

@ -7,6 +7,7 @@ import (
"time"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/jinzhu/gorm"
@ -23,12 +24,27 @@ type Context struct {
count int32
lock sync.Mutex //booboo
PusherTomb tomb.Tomb
//to manage auto cleanup : max number of records *or* oldest
maxEventRetention int
maxDurationRetention time.Duration
}
func NewSQLite(cfg map[string]string) (*Context, error) {
var err error
c := &Context{}
if v, ok := cfg["max_records"]; ok {
c.maxEventRetention, err = strconv.Atoi(v)
if err != nil {
log.Errorf("Ignoring invalid max_records '%s' : %s", v, err)
}
}
if v, ok := cfg["max_records_age"]; ok {
c.maxDurationRetention, err = time.ParseDuration(v)
if err != nil {
log.Errorf("Ignoring invalid duration '%s' : %s", v, err)
}
}
if _, ok := cfg["db_path"]; !ok {
return nil, fmt.Errorf("please specify a 'db_path' to SQLite db in the configuration")
}
@ -36,6 +52,7 @@ func NewSQLite(cfg map[string]string) (*Context, error) {
if cfg["db_path"] == "" {
return nil, fmt.Errorf("please specify a 'db_path' to SQLite db in the configuration")
}
log.Debugf("Starting SQLite backend, path:%s", cfg["db_path"])
c.Db, err = gorm.Open("sqlite3", cfg["db_path"]+"?_busy_timeout=1000")
if err != nil {
@ -47,7 +64,10 @@ func NewSQLite(cfg map[string]string) (*Context, error) {
c.Db.LogMode(true)
}
c.flush, _ = strconv.ParseBool(cfg["flush"])
c.flush, err = strconv.ParseBool(cfg["flush"])
if err != nil {
return nil, errors.Wrap(err, "Unable to parse 'flush' flag")
}
// Migrate the schema
c.Db.AutoMigrate(&types.EventSequence{}, &types.SignalOccurence{}, &types.BanApplication{})
c.Db.Model(&types.SignalOccurence{}).Related(&types.EventSequence{})
@ -64,9 +84,5 @@ func NewSQLite(cfg map[string]string) (*Context, error) {
if c.tx == nil {
return nil, fmt.Errorf("failed to begin sqlite transac : %s", err)
}
c.PusherTomb.Go(func() error {
c.AutoCommit()
return nil
})
return c, nil
}

View file

@ -23,6 +23,10 @@ func (p *pluginDB) Shutdown() error {
return nil
}
func (p *pluginDB) StartAutoCommit() error {
return p.CTX.StartAutoCommit()
}
func (p *pluginDB) Init(config map[string]string) error {
var err error
log.Debugf("sqlite config : %+v \n", config)