don't run blackhole cleanup routine when replaying logs
This commit is contained in:
parent
662764929d
commit
5ae97d582c
|
@ -70,12 +70,15 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers) error {
|
||||||
parserWg.Wait()
|
parserWg.Wait()
|
||||||
|
|
||||||
bucketWg := &sync.WaitGroup{}
|
bucketWg := &sync.WaitGroup{}
|
||||||
bucketsTomb.Go(func() error {
|
log.Infof("BucketsGCEnabled: %v", cConfig.Crowdsec.BucketsGCEnabled)
|
||||||
bucketWg.Add(1)
|
if !cConfig.Crowdsec.BucketsGCEnabled {
|
||||||
leakybucket.CleanupBlackhole(&bucketsTomb)
|
bucketsTomb.Go(func() error {
|
||||||
bucketWg.Done()
|
bucketWg.Add(1)
|
||||||
return nil
|
leakybucket.CleanupBlackhole(&bucketsTomb)
|
||||||
})
|
bucketWg.Done()
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
bucketsTomb.Go(func() error {
|
bucketsTomb.Go(func() error {
|
||||||
bucketWg.Add(1)
|
bucketWg.Add(1)
|
||||||
/*restore previous state as well if present*/
|
/*restore previous state as well if present*/
|
||||||
|
|
|
@ -149,6 +149,8 @@ func LoadAcquisition(cConfig *csconfig.Config) error {
|
||||||
flags.Labels = labels
|
flags.Labels = labels
|
||||||
flags.Labels["type"] = flags.SingleFileType
|
flags.Labels["type"] = flags.SingleFileType
|
||||||
|
|
||||||
|
cConfig.Crowdsec.BucketsGCEnabled = true
|
||||||
|
|
||||||
dataSources, err = acquisition.LoadAcquisitionFromDSN(flags.OneShotDSN, flags.Labels)
|
dataSources, err = acquisition.LoadAcquisitionFromDSN(flags.OneShotDSN, flags.Labels)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "failed to configure datasource for %s", flags.OneShotDSN)
|
return errors.Wrapf(err, "failed to configure datasource for %s", flags.OneShotDSN)
|
||||||
|
|
|
@ -15,8 +15,7 @@ type Blackhole struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type BlackholeExpiration struct {
|
type BlackholeExpiration struct {
|
||||||
blExpiration time.Time
|
blExpiration time.Time
|
||||||
cleanupExpiration time.Time //we need a separate expiration for the cleanup to properly handle timemachine buckets
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBlackhole(bucketFactory *BucketFactory) (*Blackhole, error) {
|
func NewBlackhole(bucketFactory *BucketFactory) (*Blackhole, error) {
|
||||||
|
@ -44,7 +43,7 @@ func CleanupBlackhole(bucketsTomb *tomb.Tomb) error {
|
||||||
return nil
|
return nil
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
BlackholeTracking.Range(func(key, value interface{}) bool {
|
BlackholeTracking.Range(func(key, value interface{}) bool {
|
||||||
cleanupDate := value.(BlackholeExpiration).cleanupExpiration
|
cleanupDate := value.(BlackholeExpiration).blExpiration
|
||||||
if cleanupDate.Before(time.Now().UTC()) {
|
if cleanupDate.Before(time.Now().UTC()) {
|
||||||
log.Debugf("Expiring blackhole for %s", key)
|
log.Debugf("Expiring blackhole for %s", key)
|
||||||
BlackholeTracking.Delete(key)
|
BlackholeTracking.Delete(key)
|
||||||
|
@ -72,8 +71,7 @@ func (bl *Blackhole) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky,
|
||||||
}
|
}
|
||||||
|
|
||||||
BlackholeTracking.Store(leaky.Mapkey, BlackholeExpiration{
|
BlackholeTracking.Store(leaky.Mapkey, BlackholeExpiration{
|
||||||
blExpiration: leaky.Ovflw_ts.Add(bl.duration),
|
blExpiration: leaky.Ovflw_ts.Add(bl.duration),
|
||||||
cleanupExpiration: time.Now().UTC().Add(bl.duration),
|
|
||||||
})
|
})
|
||||||
|
|
||||||
leaky.logger.Debugf("Blackhole triggered for %s (expiration : %s)", leaky.Mapkey, leaky.Ovflw_ts.Add(bl.duration))
|
leaky.logger.Debugf("Blackhole triggered for %s (expiration : %s)", leaky.Mapkey, leaky.Ovflw_ts.Add(bl.duration))
|
||||||
|
|
Loading…
Reference in a new issue