diff --git a/cmd/crowdsec-cli/metrics.go b/cmd/crowdsec-cli/metrics.go index 02ca56f55..d8710df9a 100644 --- a/cmd/crowdsec-cli/metrics.go +++ b/cmd/crowdsec-cli/metrics.go @@ -124,6 +124,8 @@ func ShowPrometheus(url string) { lapi_stats := map[string]map[string]int{} lapi_machine_stats := map[string]map[string]map[string]int{} lapi_bouncer_stats := map[string]map[string]map[string]int{} + decisions_stats := map[string]map[string]map[string]int{} + alerts_stats := map[string]int{} for idx, fam := range result { if !strings.HasPrefix(fam.Name, "cs_") { @@ -131,7 +133,11 @@ func ShowPrometheus(url string) { } log.Tracef("round %d", idx) for _, m := range fam.Metrics { - metric := m.(prom2json.Metric) + metric, ok := m.(prom2json.Metric) + if !ok { + log.Debugf("failed to convert metric to prom2json.Metric") + continue + } name, ok := metric.Labels["name"] if !ok { log.Debugf("no name in Metric %v", metric.Labels) @@ -152,6 +158,10 @@ func ShowPrometheus(url string) { route := metric.Labels["route"] method := metric.Labels["method"] + reason := metric.Labels["reason"] + origin := metric.Labels["origin"] + action := metric.Labels["action"] + fval, err := strconv.ParseFloat(value, 32) if err != nil { log.Errorf("Unexpected int value %s : %s", value, err) @@ -254,6 +264,19 @@ func ShowPrometheus(url string) { x.NonEmpty += ival } lapi_decisions_stats[bouncer] = x + case "cs_active_decisions": + if _, ok := decisions_stats[reason]; !ok { + decisions_stats[reason] = make(map[string]map[string]int) + } + if _, ok := decisions_stats[reason][origin]; !ok { + decisions_stats[reason][origin] = make(map[string]int) + } + decisions_stats[reason][origin][action] += ival + case "cs_alerts": + /*if _, ok := alerts_stats[scenario]; !ok { + alerts_stats[scenario] = make(map[string]int) + }*/ + alerts_stats[reason] += ival default: continue } @@ -329,6 +352,30 @@ func ShowPrometheus(url string) { } } + decisionsTable := tablewriter.NewWriter(os.Stdout) + decisionsTable.SetHeader([]string{"Reason", "Origin", "Action", "Count"}) + for reason, origins := range decisions_stats { + for origin, actions := range origins { + for action, hits := range actions { + row := []string{} + row = append(row, reason) + row = append(row, origin) + row = append(row, action) + row = append(row, fmt.Sprintf("%d", hits)) + decisionsTable.Append(row) + } + } + } + + alertsTable := tablewriter.NewWriter(os.Stdout) + alertsTable.SetHeader([]string{"Reason", "Count"}) + for scenario, hits := range alerts_stats { + row := []string{} + row = append(row, scenario) + row = append(row, fmt.Sprintf("%d", hits)) + alertsTable.Append(row) + } + if bucketsTable.NumLines() > 0 { log.Printf("Buckets Metrics:") bucketsTable.SetAlignment(tablewriter.ALIGN_LEFT) @@ -366,8 +413,20 @@ func ShowPrometheus(url string) { lapiDecisionsTable.Render() } + if decisionsTable.NumLines() > 0 { + log.Printf("Local Api Decisions:") + decisionsTable.SetAlignment(tablewriter.ALIGN_LEFT) + decisionsTable.Render() + } + + if alertsTable.NumLines() > 0 { + log.Printf("Local Api Alerts:") + alertsTable.SetAlignment(tablewriter.ALIGN_LEFT) + alertsTable.Render() + } + } else if csConfig.Cscli.Output == "json" { - for _, val := range []interface{}{acquis_stats, parsers_stats, buckets_stats, lapi_stats, lapi_bouncer_stats, lapi_machine_stats, lapi_decisions_stats} { + for _, val := range []interface{}{acquis_stats, parsers_stats, buckets_stats, lapi_stats, lapi_bouncer_stats, lapi_machine_stats, lapi_decisions_stats, decisions_stats, alerts_stats} { x, err := json.MarshalIndent(val, "", " ") if err != nil { log.Fatalf("failed to unmarshal metrics : %v", err) @@ -375,7 +434,7 @@ func ShowPrometheus(url string) { fmt.Printf("%s\n", string(x)) } } else if csConfig.Cscli.Output == "raw" { - for _, val := range []interface{}{acquis_stats, parsers_stats, buckets_stats, lapi_stats, lapi_bouncer_stats, lapi_machine_stats, lapi_decisions_stats} { + for _, val := range []interface{}{acquis_stats, parsers_stats, buckets_stats, lapi_stats, lapi_bouncer_stats, lapi_machine_stats, lapi_decisions_stats, decisions_stats, alerts_stats} { x, err := yaml.Marshal(val) if err != nil { log.Fatalf("failed to unmarshal metrics : %v", err) diff --git a/cmd/crowdsec/metrics.go b/cmd/crowdsec/metrics.go index 83d131cb1..ed927aff3 100644 --- a/cmd/crowdsec/metrics.go +++ b/cmd/crowdsec/metrics.go @@ -6,6 +6,7 @@ import ( v1 "github.com/crowdsecurity/crowdsec/pkg/apiserver/controllers/v1" "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/cwversion" + "github.com/crowdsecurity/crowdsec/pkg/database" leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" "github.com/crowdsecurity/crowdsec/pkg/parser" "github.com/crowdsecurity/crowdsec/pkg/types" @@ -62,6 +63,81 @@ var globalCsInfo = prometheus.NewGauge( }, ) +var globalActiveDecisions = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "cs_active_decisions", + Help: "Number of active decisions.", + }, + []string{"reason", "origin", "action"}, +) + +var globalAlerts = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "cs_alerts", + Help: "Number of alerts (excluding CAPI).", + }, + []string{"reason"}, +) + +var globalParsingHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Help: "Time spent parsing a line", + Name: "cs_parsing_time_seconds", + Buckets: []float64{0.0005, 0.001, 0.0015, 0.002, 0.0025, 0.003, 0.004, 0.005, 0.0075, 0.01}, + }, + []string{"type", "source"}, +) + +var globalPourHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "cs_bucket_pour_seconds", + Help: "Time spent pouring an event to buckets.", + Buckets: []float64{0.001, 0.002, 0.005, 0.01, 0.015, 0.02, 0.03, 0.04, 0.05}, + }, + []string{"type", "source"}, +) + +func computeDynamicMetrics(next http.Handler, dbClient *database.Client) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if dbClient == nil { + next.ServeHTTP(w, r) + return + } + + decisionsFilters := make(map[string][]string, 0) + decisions, err := dbClient.QueryDecisionCountByScenario(decisionsFilters) + if err != nil { + log.Errorf("Error querying decisions for metrics: %v", err) + next.ServeHTTP(w, r) + return + } + globalActiveDecisions.Reset() + for _, d := range decisions { + globalActiveDecisions.With(prometheus.Labels{"reason": d.Scenario, "origin": d.Origin, "action": d.Type}).Set(float64(d.Count)) + } + + globalAlerts.Reset() + + alertsFilter := map[string][]string{ + "include_capi": {"false"}, + } + + alerts, err := dbClient.AlertsCountPerScenario(alertsFilter) + + if err != nil { + log.Errorf("Error querying alerts for metrics: %v", err) + next.ServeHTTP(w, r) + return + } + + for k, v := range alerts { + globalAlerts.With(prometheus.Labels{"reason": k}).Set(float64(v)) + } + + next.ServeHTTP(w, r) + }) +} + func registerPrometheus(config *csconfig.PrometheusCfg) { if !config.Enabled { return @@ -75,13 +151,12 @@ func registerPrometheus(config *csconfig.PrometheusCfg) { config.ListenPort = 6060 } - defer types.CatchPanic("crowdsec/registerPrometheus") /*Registering prometheus*/ /*If in aggregated mode, do not register events associated to a source, keeps cardinality low*/ if config.Level == "aggregated" { log.Infof("Loading aggregated prometheus collectors") prometheus.MustRegister(globalParserHits, globalParserHitsOk, globalParserHitsKo, - globalCsInfo, + globalCsInfo, globalParsingHistogram, globalPourHistogram, leaky.BucketsUnderflow, leaky.BucketsCanceled, leaky.BucketsInstanciation, leaky.BucketsOverflow, v1.LapiRouteHits, leaky.BucketsCurrentCount) @@ -89,12 +164,22 @@ func registerPrometheus(config *csconfig.PrometheusCfg) { log.Infof("Loading prometheus collectors") prometheus.MustRegister(globalParserHits, globalParserHitsOk, globalParserHitsKo, parser.NodesHits, parser.NodesHitsOk, parser.NodesHitsKo, - globalCsInfo, - v1.LapiRouteHits, v1.LapiMachineHits, v1.LapiBouncerHits, v1.LapiNilDecisions, v1.LapiNonNilDecisions, - leaky.BucketsPour, leaky.BucketsUnderflow, leaky.BucketsCanceled, leaky.BucketsInstanciation, leaky.BucketsOverflow, leaky.BucketsCurrentCount) + globalCsInfo, globalParsingHistogram, globalPourHistogram, + v1.LapiRouteHits, v1.LapiMachineHits, v1.LapiBouncerHits, v1.LapiNilDecisions, v1.LapiNonNilDecisions, v1.LapiResponseTime, + leaky.BucketsPour, leaky.BucketsUnderflow, leaky.BucketsCanceled, leaky.BucketsInstanciation, leaky.BucketsOverflow, leaky.BucketsCurrentCount, + globalActiveDecisions, globalAlerts) } - http.Handle("/metrics", promhttp.Handler()) +} + +func servePrometheus(config *csconfig.PrometheusCfg, dbClient *database.Client) { + if !config.Enabled { + return + } + + defer types.CatchPanic("crowdsec/servePrometheus") + + http.Handle("/metrics", computeDynamicMetrics(promhttp.Handler(), dbClient)) if err := http.ListenAndServe(fmt.Sprintf("%s:%d", config.ListenAddr, config.ListenPort), nil); err != nil { log.Warningf("prometheus: %s", err) } diff --git a/cmd/crowdsec/parse.go b/cmd/crowdsec/parse.go index db6455689..aa93b6ec7 100644 --- a/cmd/crowdsec/parse.go +++ b/cmd/crowdsec/parse.go @@ -1,6 +1,8 @@ package main import ( + "time" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" @@ -26,11 +28,14 @@ LOOP: } globalParserHits.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Inc() + startParsing := time.Now() /* parse the log using magic */ parsed, err := parser.Parse(parserCTX, event, nodes) if err != nil { log.Errorf("failed parsing : %v\n", err) } + elapsed := time.Since(startParsing) + globalParsingHistogram.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Observe(elapsed.Seconds()) if !parsed.Process { globalParserHitsKo.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Inc() log.Debugf("Discarding line %+v", parsed) diff --git a/cmd/crowdsec/pour.go b/cmd/crowdsec/pour.go index aa5c7de02..23bdecb35 100644 --- a/cmd/crowdsec/pour.go +++ b/cmd/crowdsec/pour.go @@ -7,6 +7,7 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/csconfig" leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" "github.com/crowdsecurity/crowdsec/pkg/types" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" ) @@ -21,6 +22,7 @@ func runPour(input chan types.Event, holders []leaky.BucketFactory, buckets *lea log.Infof("Bucket routine exiting") return nil case parsed := <-input: + startTime := time.Now() count++ if count%5000 == 0 { log.Infof("%d existing buckets", leaky.LeakyRoutineCount) @@ -45,6 +47,8 @@ func runPour(input chan types.Event, holders []leaky.BucketFactory, buckets *lea log.Errorf("bucketify failed for: %v", parsed) return fmt.Errorf("process of event failed : %v", err) } + elapsed := time.Since(startTime) + globalPourHistogram.With(prometheus.Labels{"type": parsed.Line.Module, "source": parsed.Line.Src}).Observe(elapsed.Seconds()) if poured { globalBucketPourOk.Inc() } else { diff --git a/cmd/crowdsec/run_in_svc.go b/cmd/crowdsec/run_in_svc.go index 5fa837700..17aeeecab 100644 --- a/cmd/crowdsec/run_in_svc.go +++ b/cmd/crowdsec/run_in_svc.go @@ -8,6 +8,7 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/cwversion" + "github.com/crowdsecurity/crowdsec/pkg/database" "github.com/crowdsecurity/crowdsec/pkg/types" log "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/writer" @@ -48,7 +49,18 @@ func StartRunSvc() error { // Enable profiling early if cConfig.Prometheus != nil { - go registerPrometheus(cConfig.Prometheus) + var dbClient *database.Client + var err error + + if cConfig.DbConfig != nil { + dbClient, err = database.NewClient(cConfig.DbConfig) + + if err != nil { + log.Fatalf("unable to create database client: %s", err) + } + } + registerPrometheus(cConfig.Prometheus) + go servePrometheus(cConfig.Prometheus, dbClient) } return Serve(cConfig) } diff --git a/cmd/crowdsec/run_in_svc_windows.go b/cmd/crowdsec/run_in_svc_windows.go index de4466384..efb0ac5c2 100644 --- a/cmd/crowdsec/run_in_svc_windows.go +++ b/cmd/crowdsec/run_in_svc_windows.go @@ -6,6 +6,7 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/cwversion" + "github.com/crowdsecurity/crowdsec/pkg/database" "github.com/crowdsecurity/crowdsec/pkg/types" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -88,7 +89,18 @@ func WindowsRun() error { // Enable profiling early if cConfig.Prometheus != nil { - go registerPrometheus(cConfig.Prometheus) + var dbClient *database.Client + var err error + + if cConfig.DbConfig != nil { + dbClient, err = database.NewClient(cConfig.DbConfig) + + if err != nil { + log.Fatalf("unable to create database client: %s", err) + } + } + registerPrometheus(cConfig.Prometheus) + go servePrometheus(cConfig.Prometheus, dbClient) } return Serve(cConfig) } diff --git a/pkg/apiserver/controllers/v1/metrics.go b/pkg/apiserver/controllers/v1/metrics.go index c5bd2ecec..0f3bdb6d1 100644 --- a/pkg/apiserver/controllers/v1/metrics.go +++ b/pkg/apiserver/controllers/v1/metrics.go @@ -1,6 +1,8 @@ package v1 import ( + "time" + jwt "github.com/appleboy/gin-jwt/v2" "github.com/gin-gonic/gin" "github.com/prometheus/client_golang/prometheus" @@ -52,6 +54,14 @@ var LapiNonNilDecisions = prometheus.NewCounterVec( []string{"bouncer"}, ) +var LapiResponseTime = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "cs_lapi_request_duration_seconds", + Help: "Response time of LAPI", + Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.2, 0.3, 0.4, 0.5, 0.75, 1}, + }, + []string{"endpoint", "method"}) + func PrometheusBouncersHasEmptyDecision(c *gin.Context) { name, ok := c.Get("BOUNCER_NAME") if ok { @@ -99,9 +109,12 @@ func PrometheusBouncersMiddleware() gin.HandlerFunc { func PrometheusMiddleware() gin.HandlerFunc { return func(c *gin.Context) { + startTime := time.Now() LapiRouteHits.With(prometheus.Labels{ "route": c.Request.URL.Path, "method": c.Request.Method}).Inc() c.Next() + elapsed := time.Since(startTime) + LapiResponseTime.With(prometheus.Labels{"method": c.Request.Method, "endpoint": c.Request.URL.Path}).Observe(elapsed.Seconds()) } } diff --git a/pkg/database/alerts.go b/pkg/database/alerts.go index cc4b2d77a..92d83bd8b 100644 --- a/pkg/database/alerts.go +++ b/pkg/database/alerts.go @@ -1,6 +1,7 @@ package database import ( + "context" "encoding/json" "fmt" "strconv" @@ -722,6 +723,38 @@ func BuildAlertRequestFromFilter(alerts *ent.AlertQuery, filter map[string][]str return alerts, nil } +func (c *Client) AlertsCountPerScenario(filters map[string][]string) (map[string]int, error) { + + var res []struct { + Scenario string + Count int + } + + ctx := context.Background() + + query := c.Ent.Alert.Query() + + query, err := BuildAlertRequestFromFilter(query, filters) + + if err != nil { + return nil, errors.Wrap(err, "failed to build alert request") + } + + err = query.GroupBy(alert.FieldScenario).Aggregate(ent.Count()).Scan(ctx, &res) + + if err != nil { + return nil, errors.Wrap(err, "failed to count alerts per scenario") + } + + counts := make(map[string]int) + + for _, r := range res { + counts[r.Scenario] = r.Count + } + + return counts, nil +} + func (c *Client) TotalAlerts() (int, error) { return c.Ent.Alert.Query().Count(c.CTX) } diff --git a/pkg/database/decisions.go b/pkg/database/decisions.go index 4fddb46a2..710d37b5e 100644 --- a/pkg/database/decisions.go +++ b/pkg/database/decisions.go @@ -15,6 +15,13 @@ import ( "github.com/pkg/errors" ) +type DecisionsByScenario struct { + Scenario string + Count int + Origin string + Type string +} + func BuildDecisionRequestWithFilter(query *ent.DecisionQuery, filter map[string][]string) (*ent.DecisionQuery, []*sql.Predicate, error) { var err error @@ -231,6 +238,29 @@ func (c *Client) QueryAllDecisionsWithFilters(filters map[string][]string) ([]*e return data, nil } +func (c *Client) QueryDecisionCountByScenario(filters map[string][]string) ([]*DecisionsByScenario, error) { + query := c.Ent.Decision.Query().Where( + decision.UntilGT(time.Now().UTC()), + ) + query, _, err := BuildDecisionRequestWithFilter(query, filters) + + if err != nil { + c.Log.Warningf("QueryDecisionCountByScenario : %s", err) + return nil, errors.Wrap(QueryFail, "count all decisions with filters") + } + + var r []*DecisionsByScenario + + err = query.GroupBy(decision.FieldScenario, decision.FieldOrigin, decision.FieldType).Aggregate(ent.Count()).Scan(c.CTX, &r) + + if err != nil { + c.Log.Warningf("QueryDecisionCountByScenario : %s", err) + return nil, errors.Wrap(QueryFail, "count all decisions with filters") + } + + return r, nil +} + func (c *Client) QueryExpiredDecisionsWithFilters(filters map[string][]string) ([]*ent.Decision, error) { now := time.Now().UTC() query := c.Ent.Decision.Query().Where( diff --git a/tests/bats/06_crowdsec.bats b/tests/bats/06_crowdsec.bats index 5446c9042..d57d663f4 100644 --- a/tests/bats/06_crowdsec.bats +++ b/tests/bats/06_crowdsec.bats @@ -31,7 +31,7 @@ declare stderr run -1 --separate-stderr "${BIN_DIR}/crowdsec" refute_output run -0 echo "${stderr}" - assert_output --partial "api server init: unable to run local API: unable to init database client: unknown database type 'meh'" + assert_output --partial "unable to create database client: unknown database type 'meh'" } @test "${FILE} CS_LAPI_SECRET not strong enough" {