diff --git a/.golangci.yml b/.golangci.yml index 3161b2c0a..e605ac079 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -11,7 +11,7 @@ run: linters-settings: cyclop: # lower this after refactoring - max-complexity: 70 + max-complexity: 53 gci: sections: @@ -26,7 +26,7 @@ linters-settings: gocyclo: # lower this after refactoring - min-complexity: 70 + min-complexity: 49 funlen: # Checks the number of lines in a function. @@ -46,7 +46,7 @@ linters-settings: maintidx: # raise this after refactoring - under: 9 + under: 11 misspell: locale: US @@ -263,6 +263,10 @@ issues: - perfsprint text: "fmt.Sprintf can be replaced .*" + - linters: + - perfsprint + text: "fmt.Errorf can be replaced with errors.New" + # # Will fix, easy but some neurons required # diff --git a/cmd/crowdsec/api.go b/cmd/crowdsec/api.go index a1e933cba..4ac5c3ce9 100644 --- a/cmd/crowdsec/api.go +++ b/cmd/crowdsec/api.go @@ -56,7 +56,8 @@ func initAPIServer(cConfig *csconfig.Config) (*apiserver.APIServer, error) { return apiServer, nil } -func serveAPIServer(apiServer *apiserver.APIServer, apiReady chan bool) { +func serveAPIServer(apiServer *apiserver.APIServer) { + apiReady := make(chan bool, 1) apiTomb.Go(func() error { defer trace.CatchPanic("crowdsec/serveAPIServer") go func() { @@ -80,6 +81,7 @@ func serveAPIServer(apiServer *apiserver.APIServer, apiReady chan bool) { } return nil }) + <-apiReady } func hasPlugins(profiles []*csconfig.ProfileCfg) bool { diff --git a/cmd/crowdsec/crowdsec.go b/cmd/crowdsec/crowdsec.go index 774b9d381..d4cd2d3cf 100644 --- a/cmd/crowdsec/crowdsec.go +++ b/cmd/crowdsec/crowdsec.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "os" "path/filepath" @@ -13,8 +14,8 @@ import ( "github.com/crowdsecurity/go-cs-lib/trace" "github.com/crowdsecurity/crowdsec/pkg/acquisition" - "github.com/crowdsecurity/crowdsec/pkg/appsec" "github.com/crowdsecurity/crowdsec/pkg/alertcontext" + "github.com/crowdsecurity/crowdsec/pkg/appsec" "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/cwhub" leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" @@ -56,63 +57,86 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H //start go-routines for parsing, buckets pour and outputs. parserWg := &sync.WaitGroup{} + parsersTomb.Go(func() error { parserWg.Add(1) + for i := 0; i < cConfig.Crowdsec.ParserRoutinesCount; i++ { parsersTomb.Go(func() error { defer trace.CatchPanic("crowdsec/runParse") + if err := runParse(inputLineChan, inputEventChan, *parsers.Ctx, parsers.Nodes); err != nil { //this error will never happen as parser.Parse is not able to return errors log.Fatalf("starting parse error : %s", err) return err } + return nil }) } parserWg.Done() + return nil }) parserWg.Wait() bucketWg := &sync.WaitGroup{} + bucketsTomb.Go(func() error { bucketWg.Add(1) /*restore previous state as well if present*/ if cConfig.Crowdsec.BucketStateFile != "" { log.Warningf("Restoring buckets state from %s", cConfig.Crowdsec.BucketStateFile) + if err := leaky.LoadBucketsState(cConfig.Crowdsec.BucketStateFile, buckets, holders); err != nil { - return fmt.Errorf("unable to restore buckets : %s", err) + return fmt.Errorf("unable to restore buckets: %w", err) } } for i := 0; i < cConfig.Crowdsec.BucketsRoutinesCount; i++ { bucketsTomb.Go(func() error { defer trace.CatchPanic("crowdsec/runPour") + if err := runPour(inputEventChan, holders, buckets, cConfig); err != nil { log.Fatalf("starting pour error : %s", err) return err } + return nil }) } bucketWg.Done() + return nil }) bucketWg.Wait() + apiClient, err := AuthenticatedLAPIClient(*cConfig.API.Client.Credentials, hub) + if err != nil { + return err + } + + log.Debugf("Starting HeartBeat service") + apiClient.HeartBeat.StartHeartBeat(context.Background(), &outputsTomb) + outputWg := &sync.WaitGroup{} + outputsTomb.Go(func() error { outputWg.Add(1) + for i := 0; i < cConfig.Crowdsec.OutputRoutinesCount; i++ { outputsTomb.Go(func() error { defer trace.CatchPanic("crowdsec/runOutput") - if err := runOutput(inputEventChan, outputEventChan, buckets, *parsers.Povfwctx, parsers.Povfwnodes, *cConfig.API.Client.Credentials, hub); err != nil { + + if err := runOutput(inputEventChan, outputEventChan, buckets, *parsers.Povfwctx, parsers.Povfwnodes, apiClient); err != nil { log.Fatalf("starting outputs error : %s", err) return err } + return nil }) } outputWg.Done() + return nil }) outputWg.Wait() @@ -122,16 +146,16 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H if cConfig.Prometheus.Level == "aggregated" { aggregated = true } + if err := acquisition.GetMetrics(dataSources, aggregated); err != nil { return fmt.Errorf("while fetching prometheus metrics for datasources: %w", err) } - } + log.Info("Starting processing data") if err := acquisition.StartAcquisition(dataSources, inputLineChan, &acquisTomb); err != nil { - log.Fatalf("starting acquisition error : %s", err) - return err + return fmt.Errorf("starting acquisition error: %w", err) } return nil @@ -140,11 +164,13 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, agentReady chan bool) { crowdsecTomb.Go(func() error { defer trace.CatchPanic("crowdsec/serveCrowdsec") + go func() { defer trace.CatchPanic("crowdsec/runCrowdsec") // this logs every time, even at config reload log.Debugf("running agent after %s ms", time.Since(crowdsecT0)) agentReady <- true + if err := runCrowdsec(cConfig, parsers, hub); err != nil { log.Fatalf("unable to start crowdsec routines: %s", err) } @@ -156,16 +182,20 @@ func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub */ waitOnTomb() log.Debugf("Shutting down crowdsec routines") + if err := ShutdownCrowdsecRoutines(); err != nil { log.Fatalf("unable to shutdown crowdsec routines: %s", err) } + log.Debugf("everything is dead, return crowdsecTomb") + if dumpStates { dumpParserState() dumpOverflowState() dumpBucketsPour() os.Exit(0) } + return nil }) } @@ -175,55 +205,65 @@ func dumpBucketsPour() { if err != nil { log.Fatalf("open: %s", err) } + out, err := yaml.Marshal(leaky.BucketPourCache) if err != nil { log.Fatalf("marshal: %s", err) } + b, err := fd.Write(out) if err != nil { log.Fatalf("write: %s", err) } + log.Tracef("wrote %d bytes", b) + if err := fd.Close(); err != nil { log.Fatalf(" close: %s", err) } } func dumpParserState() { - fd, err := os.OpenFile(filepath.Join(parser.DumpFolder, "parser-dump.yaml"), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) if err != nil { log.Fatalf("open: %s", err) } + out, err := yaml.Marshal(parser.StageParseCache) if err != nil { log.Fatalf("marshal: %s", err) } + b, err := fd.Write(out) if err != nil { log.Fatalf("write: %s", err) } + log.Tracef("wrote %d bytes", b) + if err := fd.Close(); err != nil { log.Fatalf(" close: %s", err) } } func dumpOverflowState() { - fd, err := os.OpenFile(filepath.Join(parser.DumpFolder, "bucket-dump.yaml"), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) if err != nil { log.Fatalf("open: %s", err) } + out, err := yaml.Marshal(bucketOverflows) if err != nil { log.Fatalf("marshal: %s", err) } + b, err := fd.Write(out) if err != nil { log.Fatalf("write: %s", err) } + log.Tracef("wrote %d bytes", b) + if err := fd.Close(); err != nil { log.Fatalf(" close: %s", err) } diff --git a/cmd/crowdsec/lapiclient.go b/cmd/crowdsec/lapiclient.go new file mode 100644 index 000000000..fd29aa9d9 --- /dev/null +++ b/cmd/crowdsec/lapiclient.go @@ -0,0 +1,92 @@ +package main + +import ( + "context" + "fmt" + "net/url" + "time" + + "github.com/go-openapi/strfmt" + + "github.com/crowdsecurity/go-cs-lib/version" + + "github.com/crowdsecurity/crowdsec/pkg/apiclient" + "github.com/crowdsecurity/crowdsec/pkg/csconfig" + "github.com/crowdsecurity/crowdsec/pkg/cwhub" + "github.com/crowdsecurity/crowdsec/pkg/models" +) + +func AuthenticatedLAPIClient(credentials csconfig.ApiCredentialsCfg, hub *cwhub.Hub) (*apiclient.ApiClient, error) { + scenarios, err := hub.GetInstalledItemNames(cwhub.SCENARIOS) + if err != nil { + return nil, fmt.Errorf("loading list of installed hub scenarios: %w", err) + } + + appsecRules, err := hub.GetInstalledItemNames(cwhub.APPSEC_RULES) + if err != nil { + return nil, fmt.Errorf("loading list of installed hub appsec rules: %w", err) + } + + installedScenariosAndAppsecRules := make([]string, 0, len(scenarios)+len(appsecRules)) + installedScenariosAndAppsecRules = append(installedScenariosAndAppsecRules, scenarios...) + installedScenariosAndAppsecRules = append(installedScenariosAndAppsecRules, appsecRules...) + + apiURL, err := url.Parse(credentials.URL) + if err != nil { + return nil, fmt.Errorf("parsing api url ('%s'): %w", credentials.URL, err) + } + + papiURL, err := url.Parse(credentials.PapiURL) + if err != nil { + return nil, fmt.Errorf("parsing polling api url ('%s'): %w", credentials.PapiURL, err) + } + + password := strfmt.Password(credentials.Password) + + client, err := apiclient.NewClient(&apiclient.Config{ + MachineID: credentials.Login, + Password: password, + Scenarios: installedScenariosAndAppsecRules, + UserAgent: fmt.Sprintf("crowdsec/%s", version.String()), + URL: apiURL, + PapiURL: papiURL, + VersionPrefix: "v1", + UpdateScenario: func() ([]string, error) { + scenarios, err := hub.GetInstalledItemNames(cwhub.SCENARIOS) + if err != nil { + return nil, err + } + appsecRules, err := hub.GetInstalledItemNames(cwhub.APPSEC_RULES) + if err != nil { + return nil, err + } + ret := make([]string, 0, len(scenarios)+len(appsecRules)) + ret = append(ret, scenarios...) + ret = append(ret, appsecRules...) + + return ret, nil + }, + }) + if err != nil { + return nil, fmt.Errorf("new client api: %w", err) + } + + authResp, _, err := client.Auth.AuthenticateWatcher(context.Background(), models.WatcherAuthRequest{ + MachineID: &credentials.Login, + Password: &password, + Scenarios: installedScenariosAndAppsecRules, + }) + if err != nil { + return nil, fmt.Errorf("authenticate watcher (%s): %w", credentials.Login, err) + } + + var expiration time.Time + if err := expiration.UnmarshalText([]byte(authResp.Expire)); err != nil { + return nil, fmt.Errorf("unable to parse jwt expiration: %w", err) + } + + client.GetClient().Transport.(*apiclient.JWTTransport).Token = authResp.Token + client.GetClient().Transport.(*apiclient.JWTTransport).Expiration = expiration + + return client, nil +} diff --git a/cmd/crowdsec/metrics.go b/cmd/crowdsec/metrics.go index fa2d8d5de..1199af0fe 100644 --- a/cmd/crowdsec/metrics.go +++ b/cmd/crowdsec/metrics.go @@ -114,13 +114,17 @@ func computeDynamicMetrics(next http.Handler, dbClient *database.Client) http.Ha } decisionsFilters := make(map[string][]string, 0) + decisions, err := dbClient.QueryDecisionCountByScenario(decisionsFilters) if err != nil { log.Errorf("Error querying decisions for metrics: %v", err) next.ServeHTTP(w, r) + return } + globalActiveDecisions.Reset() + for _, d := range decisions { globalActiveDecisions.With(prometheus.Labels{"reason": d.Scenario, "origin": d.Origin, "action": d.Type}).Set(float64(d.Count)) } @@ -136,6 +140,7 @@ func computeDynamicMetrics(next http.Handler, dbClient *database.Client) http.Ha if err != nil { log.Errorf("Error querying alerts for metrics: %v", err) next.ServeHTTP(w, r) + return } @@ -173,11 +178,12 @@ func registerPrometheus(config *csconfig.PrometheusCfg) { globalActiveDecisions, globalAlerts, parser.NodesWlHitsOk, parser.NodesWlHits, cache.CacheMetrics, exprhelpers.RegexpCacheMetrics, ) - } } -func servePrometheus(config *csconfig.PrometheusCfg, dbClient *database.Client, apiReady chan bool, agentReady chan bool) { +func servePrometheus(config *csconfig.PrometheusCfg, dbClient *database.Client, agentReady chan bool) { + <-agentReady + if !config.Enabled { return } @@ -185,9 +191,8 @@ func servePrometheus(config *csconfig.PrometheusCfg, dbClient *database.Client, defer trace.CatchPanic("crowdsec/servePrometheus") http.Handle("/metrics", computeDynamicMetrics(promhttp.Handler(), dbClient)) - <-apiReady - <-agentReady log.Debugf("serving metrics after %s ms", time.Since(crowdsecT0)) + if err := http.ListenAndServe(fmt.Sprintf("%s:%d", config.ListenAddr, config.ListenPort), nil); err != nil { log.Warningf("prometheus: %s", err) } diff --git a/cmd/crowdsec/output.go b/cmd/crowdsec/output.go index ad53ce4c8..c4a2c0b6a 100644 --- a/cmd/crowdsec/output.go +++ b/cmd/crowdsec/output.go @@ -3,18 +3,12 @@ package main import ( "context" "fmt" - "net/url" "sync" "time" - "github.com/go-openapi/strfmt" log "github.com/sirupsen/logrus" - "github.com/crowdsecurity/go-cs-lib/version" - "github.com/crowdsecurity/crowdsec/pkg/apiclient" - "github.com/crowdsecurity/crowdsec/pkg/csconfig" - "github.com/crowdsecurity/crowdsec/pkg/cwhub" leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" "github.com/crowdsecurity/crowdsec/pkg/models" "github.com/crowdsecurity/crowdsec/pkg/parser" @@ -22,7 +16,6 @@ import ( ) func dedupAlerts(alerts []types.RuntimeAlert) ([]*models.Alert, error) { - var dedupCache []*models.Alert for idx, alert := range alerts { @@ -32,16 +25,21 @@ func dedupAlerts(alerts []types.RuntimeAlert) ([]*models.Alert, error) { dedupCache = append(dedupCache, alert.Alert) continue } + for k, src := range alert.Sources { refsrc := *alert.Alert //copy + log.Tracef("source[%s]", k) + refsrc.Source = &src dedupCache = append(dedupCache, &refsrc) } } + if len(dedupCache) != len(alerts) { log.Tracef("went from %d to %d alerts", len(alerts), len(dedupCache)) } + return dedupCache, nil } @@ -52,93 +50,25 @@ func PushAlerts(alerts []types.RuntimeAlert, client *apiclient.ApiClient) error if err != nil { return fmt.Errorf("failed to transform alerts for api: %w", err) } + _, _, err = client.Alerts.Add(ctx, alertsToPush) if err != nil { return fmt.Errorf("failed sending alert to LAPI: %w", err) } + return nil } var bucketOverflows []types.Event -func runOutput(input chan types.Event, overflow chan types.Event, buckets *leaky.Buckets, - postOverflowCTX parser.UnixParserCtx, postOverflowNodes []parser.Node, - apiConfig csconfig.ApiCredentialsCfg, hub *cwhub.Hub) error { +func runOutput(input chan types.Event, overflow chan types.Event, buckets *leaky.Buckets, postOverflowCTX parser.UnixParserCtx, + postOverflowNodes []parser.Node, client *apiclient.ApiClient) error { + var ( + cache []types.RuntimeAlert + cacheMutex sync.Mutex + ) - var err error ticker := time.NewTicker(1 * time.Second) - - var cache []types.RuntimeAlert - var cacheMutex sync.Mutex - - scenarios, err := hub.GetInstalledItemNames(cwhub.SCENARIOS) - if err != nil { - return fmt.Errorf("loading list of installed hub scenarios: %w", err) - } - - appsecRules, err := hub.GetInstalledItemNames(cwhub.APPSEC_RULES) - if err != nil { - return fmt.Errorf("loading list of installed hub appsec rules: %w", err) - } - - installedScenariosAndAppsecRules := make([]string, 0, len(scenarios)+len(appsecRules)) - installedScenariosAndAppsecRules = append(installedScenariosAndAppsecRules, scenarios...) - installedScenariosAndAppsecRules = append(installedScenariosAndAppsecRules, appsecRules...) - - apiURL, err := url.Parse(apiConfig.URL) - if err != nil { - return fmt.Errorf("parsing api url ('%s'): %w", apiConfig.URL, err) - } - papiURL, err := url.Parse(apiConfig.PapiURL) - if err != nil { - return fmt.Errorf("parsing polling api url ('%s'): %w", apiConfig.PapiURL, err) - } - password := strfmt.Password(apiConfig.Password) - - Client, err := apiclient.NewClient(&apiclient.Config{ - MachineID: apiConfig.Login, - Password: password, - Scenarios: installedScenariosAndAppsecRules, - UserAgent: fmt.Sprintf("crowdsec/%s", version.String()), - URL: apiURL, - PapiURL: papiURL, - VersionPrefix: "v1", - UpdateScenario: func() ([]string, error) { - scenarios, err := hub.GetInstalledItemNames(cwhub.SCENARIOS) - if err != nil { - return nil, err - } - appsecRules, err := hub.GetInstalledItemNames(cwhub.APPSEC_RULES) - if err != nil { - return nil, err - } - ret := make([]string, 0, len(scenarios)+len(appsecRules)) - ret = append(ret, scenarios...) - ret = append(ret, appsecRules...) - return ret, nil - }, - }) - if err != nil { - return fmt.Errorf("new client api: %w", err) - } - authResp, _, err := Client.Auth.AuthenticateWatcher(context.Background(), models.WatcherAuthRequest{ - MachineID: &apiConfig.Login, - Password: &password, - Scenarios: installedScenariosAndAppsecRules, - }) - if err != nil { - return fmt.Errorf("authenticate watcher (%s): %w", apiConfig.Login, err) - } - - if err := Client.GetClient().Transport.(*apiclient.JWTTransport).Expiration.UnmarshalText([]byte(authResp.Expire)); err != nil { - return fmt.Errorf("unable to parse jwt expiration: %w", err) - } - - Client.GetClient().Transport.(*apiclient.JWTTransport).Token = authResp.Token - - //start the heartbeat service - log.Debugf("Starting HeartBeat service") - Client.HeartBeat.StartHeartBeat(context.Background(), &outputsTomb) LOOP: for { select { @@ -149,7 +79,7 @@ LOOP: newcache := make([]types.RuntimeAlert, 0) cache = newcache cacheMutex.Unlock() - if err := PushAlerts(cachecopy, Client); err != nil { + if err := PushAlerts(cachecopy, client); err != nil { log.Errorf("while pushing to api : %s", err) //just push back the events to the queue cacheMutex.Lock() @@ -162,10 +92,11 @@ LOOP: cacheMutex.Lock() cachecopy := cache cacheMutex.Unlock() - if err := PushAlerts(cachecopy, Client); err != nil { + if err := PushAlerts(cachecopy, client); err != nil { log.Errorf("while pushing leftovers to api : %s", err) } } + break LOOP case event := <-overflow: /*if alert is empty and mapKey is present, the overflow is just to cleanup bucket*/ @@ -176,7 +107,7 @@ LOOP: /* process post overflow parser nodes */ event, err := parser.Parse(postOverflowCTX, event, postOverflowNodes) if err != nil { - return fmt.Errorf("postoverflow failed : %s", err) + return fmt.Errorf("postoverflow failed: %w", err) } log.Printf("%s", *event.Overflow.Alert.Message) //if the Alert is nil, it's to signal bucket is ready for GC, don't track this @@ -206,6 +137,6 @@ LOOP: } ticker.Stop() - return nil + return nil } diff --git a/cmd/crowdsec/run_in_svc.go b/cmd/crowdsec/run_in_svc.go index 202053790..5a8bc9a6c 100644 --- a/cmd/crowdsec/run_in_svc.go +++ b/cmd/crowdsec/run_in_svc.go @@ -33,7 +33,6 @@ func StartRunSvc() error { log.Infof("Crowdsec %s", version.String()) - apiReady := make(chan bool, 1) agentReady := make(chan bool, 1) // Enable profiling early @@ -46,14 +45,19 @@ func StartRunSvc() error { dbClient, err = database.NewClient(cConfig.DbConfig) if err != nil { - return fmt.Errorf("unable to create database client: %s", err) + return fmt.Errorf("unable to create database client: %w", err) } } registerPrometheus(cConfig.Prometheus) - go servePrometheus(cConfig.Prometheus, dbClient, apiReady, agentReady) + go servePrometheus(cConfig.Prometheus, dbClient, agentReady) + } else { + // avoid leaking the channel + go func() { + <-agentReady + }() } - return Serve(cConfig, apiReady, agentReady) + return Serve(cConfig, agentReady) } diff --git a/cmd/crowdsec/run_in_svc_windows.go b/cmd/crowdsec/run_in_svc_windows.go index 991f7ae44..7845e9c58 100644 --- a/cmd/crowdsec/run_in_svc_windows.go +++ b/cmd/crowdsec/run_in_svc_windows.go @@ -73,7 +73,6 @@ func WindowsRun() error { log.Infof("Crowdsec %s", version.String()) - apiReady := make(chan bool, 1) agentReady := make(chan bool, 1) // Enable profiling early @@ -85,11 +84,11 @@ func WindowsRun() error { dbClient, err = database.NewClient(cConfig.DbConfig) if err != nil { - return fmt.Errorf("unable to create database client: %s", err) + return fmt.Errorf("unable to create database client: %w", err) } } registerPrometheus(cConfig.Prometheus) - go servePrometheus(cConfig.Prometheus, dbClient, apiReady, agentReady) + go servePrometheus(cConfig.Prometheus, dbClient, agentReady) } - return Serve(cConfig, apiReady, agentReady) + return Serve(cConfig, agentReady) } diff --git a/cmd/crowdsec/serve.go b/cmd/crowdsec/serve.go index a5c8e24cf..22f65b927 100644 --- a/cmd/crowdsec/serve.go +++ b/cmd/crowdsec/serve.go @@ -42,7 +42,9 @@ func debugHandler(sig os.Signal, cConfig *csconfig.Config) error { if err := leaky.ShutdownAllBuckets(buckets); err != nil { log.Warningf("Failed to shut down routines : %s", err) } + log.Printf("Shutdown is finished, buckets are in %s", tmpFile) + return nil } @@ -66,15 +68,16 @@ func reloadHandler(sig os.Signal) (*csconfig.Config, error) { if !cConfig.DisableAPI { if flags.DisableCAPI { log.Warningf("Communication with CrowdSec Central API disabled from args") + cConfig.API.Server.OnlineClient = nil } + apiServer, err := initAPIServer(cConfig) if err != nil { return nil, fmt.Errorf("unable to init api server: %w", err) } - apiReady := make(chan bool, 1) - serveAPIServer(apiServer, apiReady) + serveAPIServer(apiServer) } if !cConfig.DisableAgent { @@ -110,6 +113,7 @@ func reloadHandler(sig os.Signal) (*csconfig.Config, error) { log.Warningf("Failed to delete temp file (%s) : %s", tmpFile, err) } } + return cConfig, nil } @@ -117,10 +121,12 @@ func ShutdownCrowdsecRoutines() error { var reterr error log.Debugf("Shutting down crowdsec sub-routines") + if len(dataSources) > 0 { acquisTomb.Kill(nil) log.Debugf("waiting for acquisition to finish") drainChan(inputLineChan) + if err := acquisTomb.Wait(); err != nil { log.Warningf("Acquisition returned error : %s", err) reterr = err @@ -130,6 +136,7 @@ func ShutdownCrowdsecRoutines() error { log.Debugf("acquisition is finished, wait for parser/bucket/ouputs.") parsersTomb.Kill(nil) drainChan(inputEventChan) + if err := parsersTomb.Wait(); err != nil { log.Warningf("Parsers returned error : %s", err) reterr = err @@ -160,6 +167,7 @@ func ShutdownCrowdsecRoutines() error { log.Warningf("Outputs returned error : %s", err) reterr = err } + log.Debugf("outputs are done") case <-time.After(3 * time.Second): // this can happen if outputs are stuck in a http retry loop @@ -181,6 +189,7 @@ func shutdownAPI() error { } log.Debugf("done") + return nil } @@ -193,6 +202,7 @@ func shutdownCrowdsec() error { } log.Debugf("done") + return nil } @@ -292,10 +302,11 @@ func HandleSignals(cConfig *csconfig.Config) error { if err == nil { log.Warning("Crowdsec service shutting down") } + return err } -func Serve(cConfig *csconfig.Config, apiReady chan bool, agentReady chan bool) error { +func Serve(cConfig *csconfig.Config, agentReady chan bool) error { acquisTomb = tomb.Tomb{} parsersTomb = tomb.Tomb{} bucketsTomb = tomb.Tomb{} @@ -325,6 +336,7 @@ func Serve(cConfig *csconfig.Config, apiReady chan bool, agentReady chan bool) e if cConfig.API.CTI != nil && *cConfig.API.CTI.Enabled { log.Infof("Crowdsec CTI helper enabled") + if err := exprhelpers.InitCrowdsecCTI(cConfig.API.CTI.Key, cConfig.API.CTI.CacheTimeout, cConfig.API.CTI.CacheSize, cConfig.API.CTI.LogLevel); err != nil { return fmt.Errorf("failed to init crowdsec cti: %w", err) } @@ -337,6 +349,7 @@ func Serve(cConfig *csconfig.Config, apiReady chan bool, agentReady chan bool) e if flags.DisableCAPI { log.Warningf("Communication with CrowdSec Central API disabled from args") + cConfig.API.Server.OnlineClient = nil } @@ -346,10 +359,8 @@ func Serve(cConfig *csconfig.Config, apiReady chan bool, agentReady chan bool) e } if !flags.TestMode { - serveAPIServer(apiServer, apiReady) + serveAPIServer(apiServer) } - } else { - apiReady <- true } if !cConfig.DisableAgent { @@ -366,6 +377,8 @@ func Serve(cConfig *csconfig.Config, apiReady chan bool, agentReady chan bool) e // if it's just linting, we're done if !flags.TestMode { serveCrowdsec(csParsers, cConfig, hub, agentReady) + } else { + agentReady <- true } } else { agentReady <- true @@ -395,6 +408,7 @@ func Serve(cConfig *csconfig.Config, apiReady chan bool, agentReady chan bool) e for _, ch := range waitChans { <-ch + switch ch { case apiTomb.Dead(): log.Infof("api shutdown") @@ -402,5 +416,6 @@ func Serve(cConfig *csconfig.Config, apiReady chan bool, agentReady chan bool) e log.Infof("crowdsec shutdown") } } + return nil } diff --git a/test/bats/01_crowdsec.bats b/test/bats/01_crowdsec.bats index be06ac926..a585930e3 100644 --- a/test/bats/01_crowdsec.bats +++ b/test/bats/01_crowdsec.bats @@ -75,6 +75,9 @@ teardown() { rune -0 ./instance-crowdsec start-pid PID="$output" + + sleep .5 + assert_file_exists "$log_old" assert_file_contains "$log_old" "Starting processing data" diff --git a/test/bats/40_live-ban.bats b/test/bats/40_live-ban.bats index c6b8ddf15..a544f67be 100644 --- a/test/bats/40_live-ban.bats +++ b/test/bats/40_live-ban.bats @@ -41,10 +41,23 @@ teardown() { echo -e "---\nfilename: ${tmpfile}\nlabels:\n type: syslog\n" >>"${ACQUIS_YAML}" ./instance-crowdsec start + + sleep 0.2 + fake_log >>"${tmpfile}" - sleep 2 + + sleep 0.2 + rm -f -- "${tmpfile}" - rune -0 cscli decisions list -o json - rune -0 jq -r '.[].decisions[0].value' <(output) - assert_output '1.1.1.172' + + found=0 + # this may take some time in CI + for _ in $(seq 1 10); do + if cscli decisions list -o json | jq -r '.[].decisions[0].value' | grep -q '1.1.1.172'; then + found=1 + break + fi + sleep 0.2 + done + assert_equal 1 "${found}" }