From 151af2d0d857795e57696a30c4301215228649c1 Mon Sep 17 00:00:00 2001 From: "Thibault \"bui\" Koechlin" Date: Mon, 27 Jul 2020 13:42:30 +0200 Subject: [PATCH] No sql transaction + proper time-machine wait (#148) --- cmd/crowdsec/serve.go | 36 +++++++++++++++++++- pkg/database/commit.go | 71 +++++++++++++--------------------------- pkg/database/database.go | 13 +------- pkg/database/write.go | 6 ++-- 4 files changed, 61 insertions(+), 65 deletions(-) diff --git a/cmd/crowdsec/serve.go b/cmd/crowdsec/serve.go index f1f6f2a32..be076be2d 100644 --- a/cmd/crowdsec/serve.go +++ b/cmd/crowdsec/serve.go @@ -139,7 +139,41 @@ func serveOneTimeRun(outputRunner outputs.Output) error { } log.Infof("acquisition is finished, wait for parser/bucket/ouputs.") - //let's wait more than enough for in-flight events to be parsed. + /* + While it might make sense to want to shut-down parser/buckets/etc. as soon as acquisition is finished, + we might have some pending buckets : buckets that overflowed, but which LeakRoutine are still alive because they + are waiting to be able to "commit" (push to db). This can happens specifically in a context where a lot of logs + are going to trigger overflow (ie. trigger buckets with ~100% of the logs triggering an overflow). + + To avoid this (which would mean that we would "lose" some overflows), let's monitor the number of live buckets. + However, because of the blackhole mechanism, you can't really wait for the number of LeakRoutine to go to zero (we might have to wait $blackhole_duration). + + So : we are waiting for the number of buckets to stop decreasing before returning. "how long" we should wait is a bit of the trick question, + as some operations (ie. reverse dns or such in post-overflow) can take some time :) + */ + + bucketCount := leaky.LeakyRoutineCount + rounds := 0 + successiveStillRounds := 0 + for { + rounds++ + time.Sleep(5 * time.Second) + currBucketCount := leaky.LeakyRoutineCount + if currBucketCount != bucketCount { + if rounds == 0 || rounds%2 == 0 { + log.Printf("Still %d live LeakRoutines, waiting (was %d)", currBucketCount, bucketCount) + } + bucketCount = currBucketCount + successiveStillRounds = 0 + } else { + if successiveStillRounds > 1 { + log.Printf("LeakRoutines commit over.") + break + } + successiveStillRounds++ + } + } + time.Sleep(5 * time.Second) // wait for the parser to parse all events diff --git a/pkg/database/commit.go b/pkg/database/commit.go index b034a4abf..49081aa0f 100644 --- a/pkg/database/commit.go +++ b/pkg/database/commit.go @@ -1,8 +1,6 @@ package database import ( - "fmt" - "sync/atomic" "time" "github.com/crowdsecurity/crowdsec/pkg/types" @@ -11,10 +9,11 @@ import ( ) func (c *Context) DeleteExpired() error { + c.lock.Lock() + defer c.lock.Unlock() //Delete the expired records now := time.Now() if c.flush { - //retx := c.Db.Where(`strftime("%s", until) < strftime("%s", "now")`).Delete(types.BanApplication{}) retx := c.Db.Delete(types.BanApplication{}, "until < ?", now) if retx.RowsAffected > 0 { log.Infof("Flushed %d expired entries from Ban Application", retx.RowsAffected) @@ -23,18 +22,8 @@ func (c *Context) DeleteExpired() error { return nil } +/*Flush doesn't do anything here : we are not using transactions or such, nothing to "flush" per se*/ func (c *Context) Flush() error { - c.lock.Lock() - defer c.lock.Unlock() - - ret := c.tx.Commit() - - if ret.Error != nil { - c.tx = c.Db.Begin() - return fmt.Errorf("failed to commit records : %v", ret.Error) - } - c.tx = c.Db.Begin() - c.lastCommit = time.Now() return nil } @@ -60,22 +49,21 @@ func (c *Context) CleanUpRecordsByAge() error { log.Debugf("no event older than %s", c.maxDurationRetention.String()) return nil } - //let's do it in a single transaction - delTx := c.Db.Unscoped().Begin() - delRecords := 0 + delRecords := 0 for _, record := range sos { copy := record - delTx.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{}) - delTx.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{}) - delTx.Unscoped().Table("ban_applications").Delete(©) - //we need to delete associations : event_sequences, signal_occurences + if ret := c.Db.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{}); ret.Error != nil { + return errors.Wrap(ret.Error, "failed to clean signal_occurences") + } + if ret := c.Db.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{}); ret.Error != nil { + return errors.Wrap(ret.Error, "failed to clean event_sequences") + } + if ret := c.Db.Unscoped().Table("ban_applications").Delete(©); ret.Error != nil { + return errors.Wrap(ret.Error, "failed to clean ban_applications") + } delRecords++ } - ret = delTx.Unscoped().Commit() - if ret.Error != nil { - return errors.Wrap(ret.Error, "failed to delete records") - } log.Printf("max_records_age: deleting %d events (max age:%s)", delRecords, c.maxDurationRetention) return nil } @@ -107,13 +95,18 @@ func (c *Context) CleanUpRecordsByCount() error { } //let's do it in a single transaction - delTx := c.Db.Unscoped().Begin() delRecords := 0 for _, ld := range sos { copy := ld - delTx.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{}) - delTx.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{}) - delTx.Unscoped().Table("ban_applications").Delete(©) + if ret := c.Db.Unscoped().Table("signal_occurences").Where("ID = ?", copy.SignalOccurenceID).Delete(&types.SignalOccurence{}); ret.Error != nil { + return errors.Wrap(ret.Error, "failed to clean signal_occurences") + } + if ret := c.Db.Unscoped().Table("event_sequences").Where("signal_occurence_id = ?", copy.SignalOccurenceID).Delete(&types.EventSequence{}); ret.Error != nil { + return errors.Wrap(ret.Error, "failed to clean event_sequences") + } + if ret := c.Db.Unscoped().Table("ban_applications").Delete(©); ret.Error != nil { + return errors.Wrap(ret.Error, "failed to clean ban_applications") + } //we need to delete associations : event_sequences, signal_occurences delRecords++ //let's delete as well the associated event_sequence @@ -122,12 +115,7 @@ func (c *Context) CleanUpRecordsByCount() error { } } if len(sos) > 0 { - //log.Printf("Deleting %d soft-deleted results out of %d total events (%d soft-deleted)", delRecords, count, len(sos)) log.Printf("max_records: deleting %d events. (%d soft-deleted)", delRecords, len(sos)) - ret = delTx.Unscoped().Commit() - if ret.Error != nil { - return errors.Wrap(ret.Error, "failed to delete records") - } } else { log.Debugf("didn't find any record to clean") } @@ -145,7 +133,6 @@ func (c *Context) StartAutoCommit() error { func (c *Context) autoCommit() { log.Debugf("starting autocommit") - ticker := time.NewTicker(200 * time.Millisecond) cleanUpTicker := time.NewTicker(1 * time.Minute) expireTicker := time.NewTicker(1 * time.Second) if !c.flush { @@ -159,12 +146,6 @@ func (c *Context) autoCommit() { if err := c.Flush(); err != nil { log.Errorf("error while flushing records: %s", err) } - if ret := c.tx.Commit(); ret.Error != nil { - log.Errorf("failed to commit records : %v", ret.Error) - } - if err := c.tx.Close(); err != nil { - log.Errorf("error while closing tx : %s", err) - } if err := c.Db.Close(); err != nil { log.Errorf("error while closing db : %s", err) } @@ -173,14 +154,6 @@ func (c *Context) autoCommit() { if err := c.DeleteExpired(); err != nil { log.Errorf("Error while deleting expired records: %s", err) } - case <-ticker.C: - if atomic.LoadInt32(&c.count) != 0 && - (atomic.LoadInt32(&c.count)%100 == 0 || time.Since(c.lastCommit) >= 500*time.Millisecond) { - if err := c.Flush(); err != nil { - log.Errorf("failed to flush : %s", err) - } - - } case <-cleanUpTicker.C: if err := c.CleanUpRecordsByCount(); err != nil { log.Errorf("error in max records cleanup : %s", err) diff --git a/pkg/database/database.go b/pkg/database/database.go index b152ab168..f6bf48457 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -18,7 +18,6 @@ import ( type Context struct { Db *gorm.DB //Pointer to database - tx *gorm.DB //Pointer to current transaction (flushed on a regular basis) lastCommit time.Time flush bool count int32 @@ -107,17 +106,7 @@ func NewDatabase(cfg map[string]string) (*Context, error) { c.Db.AutoMigrate(&types.EventSequence{}, &types.SignalOccurence{}, &types.BanApplication{}) c.Db.Model(&types.SignalOccurence{}).Related(&types.EventSequence{}) c.Db.Model(&types.SignalOccurence{}).Related(&types.BanApplication{}) - c.tx = c.Db.Begin() + c.lastCommit = time.Now() - ret := c.tx.Commit() - - if ret.Error != nil { - return nil, fmt.Errorf("failed to commit records : %v", ret.Error) - - } - c.tx = c.Db.Begin() - if c.tx == nil { - return nil, fmt.Errorf("failed to begin %s transac : %s", cfg["type"], err) - } return c, nil } diff --git a/pkg/database/write.go b/pkg/database/write.go index b0431487a..10a39133b 100644 --- a/pkg/database/write.go +++ b/pkg/database/write.go @@ -15,7 +15,7 @@ func (c *Context) WriteBanApplication(ban types.BanApplication) error { c.lock.Lock() defer c.lock.Unlock() log.Debugf("Ban application being called : %s %s", ban.Scenario, ban.IpText) - ret := c.tx.Where(types.BanApplication{IpText: ban.IpText}).Assign(types.BanApplication{Until: ban.Until}).Assign(types.BanApplication{Reason: ban.Reason}).Assign(types.BanApplication{MeasureType: ban.MeasureType}).FirstOrCreate(&ban) + ret := c.Db.Where(types.BanApplication{IpText: ban.IpText}).Assign(types.BanApplication{Until: ban.Until}).Assign(types.BanApplication{Reason: ban.Reason}).Assign(types.BanApplication{MeasureType: ban.MeasureType}).FirstOrCreate(&ban) if ret.Error != nil { return fmt.Errorf("failed to write ban record : %v", ret.Error) } @@ -28,14 +28,14 @@ func (c *Context) WriteSignal(sig types.SignalOccurence) error { defer c.lock.Unlock() /*let's ensure we only have one ban active for a given scope*/ for _, ba := range sig.BanApplications { - ret := c.tx.Where("ip_text = ?", ba.IpText).Delete(types.BanApplication{}) + ret := c.Db.Where("ip_text = ?", ba.IpText).Delete(types.BanApplication{}) if ret.Error != nil { log.Errorf("While delete overlaping bans : %s", ret.Error) return fmt.Errorf("failed to write signal occurrence : %v", ret.Error) } } /*and add the new one(s)*/ - ret := c.tx.Create(&sig) + ret := c.Db.Create(&sig) if ret.Error != nil { log.Errorf("While creating new bans : %s", ret.Error) return fmt.Errorf("failed to write signal occurrence : %s", ret.Error)