diff --git a/cmd/crowdsec/crowdsec.go b/cmd/crowdsec/crowdsec.go index 940faff70..10ac48aba 100644 --- a/cmd/crowdsec/crowdsec.go +++ b/cmd/crowdsec/crowdsec.go @@ -44,8 +44,8 @@ func initCrowdsec(cConfig *csconfig.Config) (*parser.Parsers, error) { } func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers) error { - inputLineChan := make(chan types.Event) - inputEventChan := make(chan types.Event) + inputEventChan = make(chan types.Event) + inputLineChan = make(chan types.Event) //start go-routines for parsing, buckets pour and outputs. parserWg := &sync.WaitGroup{} diff --git a/cmd/crowdsec/main.go b/cmd/crowdsec/main.go index d012d24fd..67f0f9967 100644 --- a/cmd/crowdsec/main.go +++ b/cmd/crowdsec/main.go @@ -40,8 +40,11 @@ var ( /*the state of acquisition*/ dataSources []acquisition.DataSource /*the state of the buckets*/ - holders []leakybucket.BucketFactory - buckets *leakybucket.Buckets + holders []leakybucket.BucketFactory + buckets *leakybucket.Buckets + + inputLineChan chan types.Event + inputEventChan chan types.Event outputEventChan chan types.Event // the buckets init returns its own chan that is used for multiplexing /*settings*/ lastProcessedItem time.Time /*keep track of last item timestamp in time-machine. it is used to GC buckets when we dump them.*/ @@ -283,7 +286,6 @@ func LoadConfig(cConfig *csconfig.Config) error { return nil } - // exitWithCode must be called right before the program termination, // to allow measuring functional test coverage in case of abnormal exit. // diff --git a/cmd/crowdsec/serve.go b/cmd/crowdsec/serve.go index 46b28ab36..db6401190 100644 --- a/cmd/crowdsec/serve.go +++ b/cmd/crowdsec/serve.go @@ -115,7 +115,7 @@ func ShutdownCrowdsecRoutines() error { if len(dataSources) > 0 { acquisTomb.Kill(nil) log.Debugf("waiting for acquisition to finish") - + drainChan(inputLineChan) if err := acquisTomb.Wait(); err != nil { log.Warningf("Acquisition returned error : %s", err) reterr = err @@ -124,6 +124,7 @@ func ShutdownCrowdsecRoutines() error { log.Debugf("acquisition is finished, wait for parser/bucket/ouputs.") parsersTomb.Kill(nil) + drainChan(inputEventChan) if err := parsersTomb.Wait(); err != nil { log.Warningf("Parsers returned error : %s", err) reterr = err @@ -194,6 +195,19 @@ func shutdown(sig os.Signal, cConfig *csconfig.Config) error { return nil } +func drainChan(c chan types.Event) { + for { + select { + case _, ok := <-c: + if !ok { //closed + return + } + default: + return + } + } +} + func HandleSignals(cConfig *csconfig.Config) error { var ( newConfig *csconfig.Config