Compare commits

...

9 commits

Author SHA1 Message Date
Sebastien Blot 56f457d4e3
up 2022-08-18 09:43:40 +02:00
Sebastien Blot 5ae97d582c
don't run blackhole cleanup routine when replaying logs 2022-08-16 15:12:56 +02:00
Sebastien Blot 662764929d
remove timing debug 2022-08-16 10:48:02 +02:00
Sebastien Blot 7a00123ed0
up 2022-08-10 15:30:21 +02:00
Sebastien Blot cb1a86fdc8
fix tests 2022-08-10 12:35:56 +02:00
Sebastien Blot 51fdc38789
fix blackhole test 2022-08-10 12:13:23 +02:00
Sebastien Blot 9f676844d9
properly handle timemachine buckets 2022-08-10 12:12:55 +02:00
Sebastien Blot 10c97e46af
do not create bucket if the overflow would be blackholed 2022-08-08 12:41:55 +02:00
Sebastien Blot 9f72bbe725
allow to use WAL with sqlite 2022-08-08 12:40:42 +02:00
21 changed files with 126 additions and 60 deletions

View file

@ -11,6 +11,7 @@ import (
"github.com/crowdsecurity/crowdsec/pkg/acquisition"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
"github.com/crowdsecurity/crowdsec/pkg/cwhub"
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/parser"
"github.com/crowdsecurity/crowdsec/pkg/types"
@ -47,6 +48,8 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers) error {
inputLineChan := make(chan types.Event)
inputEventChan := make(chan types.Event)
leaky.BlackholeTracking = &sync.Map{}
//start go-routines for parsing, buckets pour and outputs.
parserWg := &sync.WaitGroup{}
parsersTomb.Go(func() error {
@ -67,6 +70,16 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers) error {
parserWg.Wait()
bucketWg := &sync.WaitGroup{}
//Only start the blackhole GC routine if buckets GC is not enabled, ie we are replaying a file
//This is because the GC routine expects events to happen in real time, which is not the case during a replay.
if !cConfig.Crowdsec.BucketsGCEnabled {
bucketsTomb.Go(func() error {
bucketWg.Add(1)
leakybucket.BlackholeGC(&bucketsTomb)
bucketWg.Done()
return nil
})
}
bucketsTomb.Go(func() error {
bucketWg.Add(1)
/*restore previous state as well if present*/

View file

@ -149,6 +149,8 @@ func LoadAcquisition(cConfig *csconfig.Config) error {
flags.Labels = labels
flags.Labels["type"] = flags.SingleFileType
cConfig.Crowdsec.BucketsGCEnabled = true
dataSources, err = acquisition.LoadAcquisitionFromDSN(flags.OneShotDSN, flags.Labels)
if err != nil {
return errors.Wrapf(err, "failed to configure datasource for %s", flags.OneShotDSN)

View file

@ -114,7 +114,6 @@ func (c *Controller) sendAlertToPluginChannel(alert *models.Alert, profileID uin
// CreateAlert : write received alerts in body to the database
func (c *Controller) CreateAlert(gctx *gin.Context) {
var input models.AddAlertsRequest
claims := jwt.ExtractClaims(gctx)

View file

@ -22,6 +22,7 @@ type DatabaseCfg struct {
Flush *FlushDBCfg `yaml:"flush"`
LogLevel *log.Level `yaml:"log_level"`
MaxOpenConns *int `yaml:"max_open_conns,omitempty"`
UseWal *bool `yaml:"use_wal,omitempty"`
}
type AuthGCCfg struct {

View file

@ -888,7 +888,12 @@ func (c *Client) DeleteAlertWithFilter(filter map[string][]string) (int, error)
return 0, errors.Wrap(DeleteFail, "alert query failed")
}
for _, alertItem := range alertsToDelete {
c.Log.Debugf("Deleting %d alerts", len(alertsToDelete))
for p, alertItem := range alertsToDelete {
if p%100 == 0 {
c.Log.Debugf("Deleting alert %d", p)
}
err = c.DeleteAlertGraph(alertItem)
if err != nil {
c.Log.Warningf("DeleteAlertWithFilter : %s", err)

View file

@ -76,7 +76,16 @@ func NewClient(config *csconfig.DatabaseCfg) (*Client, error) {
return &Client{}, fmt.Errorf("unable to set perms on %s: %v", config.DbPath, err)
}
}
drv, err := getEntDriver("sqlite3", dialect.SQLite, fmt.Sprintf("file:%s?_busy_timeout=100000&_fk=1", config.DbPath), config)
if config.UseWal == nil {
entLogger.Warn("you are using sqlite without WAL, this can have an impact of performance. If you do not store the database in a network share, set db_config.use_wal to true. Set to false to disable this warning")
}
var sqliteConnectionStringParameters string
if config.UseWal != nil && *config.UseWal {
sqliteConnectionStringParameters = "_busy_timeout=100000&_fk=1&_journal_mode=WAL"
} else {
sqliteConnectionStringParameters = "_busy_timeout=100000&_fk=1"
}
drv, err := getEntDriver("sqlite3", dialect.SQLite, fmt.Sprintf("file:%s?%s", config.DbPath, sqliteConnectionStringParameters), config)
if err != nil {
return &Client{}, errors.Wrapf(err, "failed opening connection to sqlite: %v", config.DbPath)
}

View file

@ -5,17 +5,17 @@ import (
"time"
"github.com/crowdsecurity/crowdsec/pkg/types"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
)
type HiddenKey struct {
key string
expiration time.Time
type Blackhole struct {
duration time.Duration
DumbProcessor
}
type Blackhole struct {
duration time.Duration
hiddenKeys []HiddenKey
DumbProcessor
type BlackholeExpiration struct {
blExpiration time.Time
}
func NewBlackhole(bucketFactory *BucketFactory) (*Blackhole, error) {
@ -26,43 +26,59 @@ func NewBlackhole(bucketFactory *BucketFactory) (*Blackhole, error) {
}
return &Blackhole{
duration: duration,
hiddenKeys: []HiddenKey{},
DumbProcessor: DumbProcessor{},
}, nil
}
func CleanupBlackhole(lastEvent time.Time) {
BlackholeTracking.Range(func(key, value interface{}) bool {
cleanupDate := value.(BlackholeExpiration).blExpiration
if cleanupDate.Before(lastEvent) {
log.Debugf("Expiring blackhole for %s", key)
BlackholeTracking.Delete(key)
}
return true
})
}
func BlackholeGC(bucketsTomb *tomb.Tomb) error {
ticker := time.NewTicker(10 * time.Second)
for {
select {
case <-bucketsTomb.Dying():
ticker.Stop()
BlackholeTracking.Range(func(key, value interface{}) bool {
BlackholeTracking.Delete(key)
return true
})
return nil
case <-ticker.C:
CleanupBlackhole(time.Now().UTC())
}
}
}
func (bl *Blackhole) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, types.RuntimeAlert, *Queue) (types.RuntimeAlert, *Queue) {
return func(leaky *Leaky, alert types.RuntimeAlert, queue *Queue) (types.RuntimeAlert, *Queue) {
var blackholed bool = false
var tmp []HiddenKey
// search if we are blackholed and refresh the slice
for _, element := range bl.hiddenKeys {
if element.key == leaky.Mapkey {
if element.expiration.After(leaky.Ovflw_ts) {
leaky.logger.Debugf("Overflow discarded, still blackholed for %s", element.expiration.Sub(leaky.Ovflw_ts))
blackholed = true
}
}
if element.expiration.After(leaky.Ovflw_ts) {
tmp = append(tmp, element)
if expiration, ok := BlackholeTracking.Load(leaky.Mapkey); ok {
x := expiration.(BlackholeExpiration)
if x.blExpiration.After(leaky.Ovflw_ts) {
leaky.logger.Debugf("Blackhole already triggered for %s (remaining : %s", leaky.Mapkey, x.blExpiration.Sub(time.Now().UTC()))
return types.RuntimeAlert{
Mapkey: leaky.Mapkey,
}, nil
} else {
leaky.logger.Debugf("%s left blackhole %s ago", element.key, leaky.Ovflw_ts.Sub(element.expiration))
leaky.logger.Debugf("Blackhole expired for %s", leaky.Mapkey)
BlackholeTracking.Delete(leaky.Mapkey)
}
}
bl.hiddenKeys = tmp
if blackholed {
leaky.logger.Tracef("Event is blackholed (%s)", leaky.First_ts)
return types.RuntimeAlert{
Mapkey: leaky.Mapkey,
}, nil
}
bl.hiddenKeys = append(bl.hiddenKeys, HiddenKey{leaky.Mapkey, leaky.Ovflw_ts.Add(bl.duration)})
leaky.logger.Debugf("Adding overflow to blackhole (%s)", leaky.First_ts)
BlackholeTracking.Store(leaky.Mapkey, BlackholeExpiration{
blExpiration: leaky.Ovflw_ts.Add(bl.duration),
})
leaky.logger.Debugf("Blackhole triggered for %s (expiration : %s)", leaky.Mapkey, leaky.Ovflw_ts.Add(bl.duration))
return alert, queue
}
}

View file

@ -220,7 +220,6 @@ func LeakRoutine(leaky *Leaky) error {
}
}
leaky.logger.Debugf("Leaky routine starting, lifetime : %s", leaky.Duration)
for {
select {
/*receiving an event*/

View file

@ -38,6 +38,8 @@ func TestBucket(t *testing.T) {
log.Fatalf("exprhelpers init failed: %s", err)
}
BlackholeTracking = &sync.Map{}
if envSetting != "" {
if err := testOneBucket(t, envSetting, tomb); err != nil {
t.Fatalf("Test '%s' failed : %s", envSetting, err)
@ -64,8 +66,8 @@ func TestBucket(t *testing.T) {
}
}
//during tests, we're likely to have only one scenario, and thus only one holder.
//we want to avoid the death of the tomb because all existing buckets have been destroyed.
// during tests, we're likely to have only one scenario, and thus only one holder.
// we want to avoid the death of the tomb because all existing buckets have been destroyed.
func watchTomb(tomb *tomb.Tomb) {
for {
if tomb.Alive() == false {

View file

@ -6,6 +6,7 @@ import (
"io/ioutil"
"math"
"os"
"sync"
"time"
"github.com/pkg/errors"
@ -23,9 +24,13 @@ var serialized map[string]Leaky
var BucketPourCache map[string][]types.Event
var BucketPourTrack bool
/*The leaky routines lifecycle are based on "real" time.
var BlackholeTracking *sync.Map
/*
The leaky routines lifecycle are based on "real" time.
But when we are running in time-machine mode, the reference time is in logs and not "real" time.
Thus we need to garbage collect them to avoid a skyrocketing memory usage.*/
Thus we need to garbage collect them to avoid a skyrocketing memory usage.
*/
func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) error {
buckets.wgPour.Wait()
buckets.wgDumpState.Add(1)
@ -272,7 +277,7 @@ func LoadOrStoreBucketFromHolder(partitionKey string, buckets *Buckets, holder B
//once the created goroutine is ready to process event, we can return it
<-fresh_bucket.Signal
} else {
holder.logger.Debugf("Unexpectedly found exisint bucket for %s", partitionKey)
holder.logger.Debugf("Unexpectedly found existing bucket for %s", partitionKey)
biface = actual
}
holder.logger.Debugf("Created new bucket %s", partitionKey)
@ -340,7 +345,25 @@ func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buc
}
}
buckey := GetKey(holders[idx], groupby)
if x, ok := BlackholeTracking.Load(buckey); ok {
holders[idx].logger.Debugf("Checking if blackhole has expired for %s", buckey)
blackholeExp := x.(BlackholeExpiration)
t := time.Now().UTC()
if parsed.ExpectMode == TIMEMACHINE {
//This is not optimal at all, date enrichment should also set parsed.Time to avoid parsing the date twice
err := t.UnmarshalText([]byte(parsed.MarshaledTime))
if err != nil {
holders[idx].logger.Errorf("failed parsing time : %v", err)
}
holders[idx].logger.Debugf("Found TIMEMACHINE bucket, using %s as time, comparing against %s ", t, blackholeExp.blExpiration)
}
if blackholeExp.blExpiration.After(t) {
holders[idx].logger.Debugf("Event is blackholed: %s (remaining: %s)", buckey, blackholeExp.blExpiration.Sub(t))
continue
}
holders[idx].logger.Debugf("Event is no longer blackholed: %s", buckey)
BlackholeTracking.Delete(buckey)
}
//we need to either find the existing bucket, or create a new one (if it's the first event to hit it for this partition key)
bucket, err := LoadOrStoreBucketFromHolder(buckey, buckets, holders[idx], parsed.ExpectMode)
if err != nil {

View file

@ -1,7 +1,7 @@
# ssh bruteforce
type: leaky
debug: true
name: test/simple-leaky
name: test/simple-leaky-blackhole
description: "Simple leaky"
filter: "evt.Line.Labels.type =='testlog'"
leakspeed: "10s"

View file

@ -91,16 +91,12 @@
}
},
"Alert" : {
"scenario": "test/simple-leaky",
"scenario": "test/simple-leaky-blackhole",
"events_count": 2
}
}
},
{
"Alert": {
}
},
{
"Alert": {
"sources": {
@ -112,7 +108,7 @@
}
},
"Alert" : {
"scenario": "test/simple-leaky",
"scenario": "test/simple-leaky-blackhole",
"events_count": 2
}

View file

@ -1,6 +1,6 @@
type: leaky
debug: true
name: test/simple-leaky
name: test/simple-leaky-ovfl
description: "Simple leaky"
filter: "evt.Line.Labels.type =='testlog'"
leakspeed: "10s"

View file

@ -36,7 +36,7 @@
}
},
"Alert" : {
"scenario": "test/simple-leaky",
"scenario": "test/simple-leaky-ovfl",
"events_count": 2
}
}

View file

@ -1,7 +1,7 @@
# ssh bruteforce
type: leaky
debug: true
name: test/simple-leaky
name: test/simple-leaky-underflow
description: "Simple leaky"
filter: "evt.Line.Labels.type =='testlog'"
leakspeed: "0.5s"

View file

@ -1,7 +1,7 @@
# ssh bruteforce
type: leaky
debug: true
name: test/simple-leaky
name: test/simple-leaky-state
description: "Simple leaky"
filter: "evt.Line.Labels.type =='testlog'"
leakspeed: "10s"

View file

@ -1,6 +1,6 @@
{
"cdf58e6ae48e79ac3ae0f006e1a2e627eccd8b63": {
"Name": "test/simple-leaky",
"4fccb3db1e4c2e1c94bdb747336c8749bbc470ef": {
"Name": "test/simple-leaky-state",
"Mode": 1,
"SerializedState": {
"Limit": 0.1,
@ -74,7 +74,7 @@
},
"Capacity": 3,
"CacheSize": 0,
"Mapkey": "cdf58e6ae48e79ac3ae0f006e1a2e627eccd8b63",
"Mapkey": "4fccb3db1e4c2e1c94bdb747336c8749bbc470ef",
"Reprocess": false,
"Uuid": "dark-bush",
"First_ts": "2020-01-01T10:00:04Z",

View file

@ -52,7 +52,7 @@
}
},
"Alert" : {
"scenario": "test/simple-leaky",
"scenario": "test/simple-leaky-state",
"events_count": 4
}

View file

@ -1,7 +1,7 @@
# ssh bruteforce
type: leaky
debug: true
name: test/simple-leaky
name: test/simple-leaky-uniq
description: "Simple leaky"
filter: "evt.Line.Labels.type =='testlog'"
leakspeed: "10s"

View file

@ -52,7 +52,7 @@
}
},
"Alert" : {
"scenario": "test/simple-leaky",
"scenario": "test/simple-leaky-uniq",
"events_count": 2
}

View file

@ -81,6 +81,7 @@ config_generate() {
.config_paths.plugin_dir=strenv(PLUGIN_DIR) |
.crowdsec_service.acquisition_path=strenv(CONFIG_DIR)+"/acquis.yaml" |
.db_config.db_path=strenv(DATA_DIR)+"/crowdsec.db" |
.db_config.use_wal=true |
.api.client.credentials_path=strenv(CONFIG_DIR)+"/local_api_credentials.yaml" |
.api.server.profiles_path=strenv(CONFIG_DIR)+"/profiles.yaml" |
.api.server.console_path=strenv(CONFIG_DIR)+"/console.yaml" |