diff --git a/cmd/crowdsec/lpmetrics.go b/cmd/crowdsec/lpmetrics.go index 089614344..19e0ac798 100644 --- a/cmd/crowdsec/lpmetrics.go +++ b/cmd/crowdsec/lpmetrics.go @@ -4,39 +4,40 @@ import ( "context" "errors" "net/http" - "github.com/sirupsen/logrus" - "github.com/blackfireio/osinfo" "time" + "github.com/blackfireio/osinfo" + "github.com/sirupsen/logrus" + "gopkg.in/tomb.v2" - "github.com/crowdsecurity/go-cs-lib/ptr" - "github.com/crowdsecurity/go-cs-lib/trace" + "github.com/crowdsecurity/go-cs-lib/ptr" + "github.com/crowdsecurity/go-cs-lib/trace" - "github.com/crowdsecurity/crowdsec/pkg/acquisition" - "github.com/crowdsecurity/crowdsec/pkg/apiclient" - "github.com/crowdsecurity/crowdsec/pkg/cwhub" - "github.com/crowdsecurity/crowdsec/pkg/cwversion" - "github.com/crowdsecurity/crowdsec/pkg/fflag" - "github.com/crowdsecurity/crowdsec/pkg/models" + "github.com/crowdsecurity/crowdsec/pkg/acquisition" + "github.com/crowdsecurity/crowdsec/pkg/apiclient" + "github.com/crowdsecurity/crowdsec/pkg/cwhub" + "github.com/crowdsecurity/crowdsec/pkg/cwversion" + "github.com/crowdsecurity/crowdsec/pkg/fflag" + "github.com/crowdsecurity/crowdsec/pkg/models" ) // MetricsProvider collects metrics from the LP and sends them to the LAPI type MetricsProvider struct { - apic *apiclient.ApiClient + apic *apiclient.ApiClient interval time.Duration - static staticMetrics - logger *logrus.Entry + static staticMetrics + logger *logrus.Entry } type staticMetrics struct { - osName string - osVersion string - startupTS int64 - featureFlags []string - consoleOptions []string - datasourceMap map[string]int64 - hubState models.HubItems + osName string + osVersion string + startupTS int64 + featureFlags []string + consoleOptions []string + datasourceMap map[string]int64 + hubState models.HubItems } func getHubState(hub *cwhub.Hub) models.HubItems { @@ -96,50 +97,48 @@ func detectOS() (string, string) { return osInfo.Name, osInfo.Version } - func NewMetricsProvider(apic *apiclient.ApiClient, interval time.Duration, logger *logrus.Entry, - consoleOptions []string, datasources []acquisition.DataSource, hub *cwhub.Hub) *MetricsProvider { + consoleOptions []string, datasources []acquisition.DataSource, hub *cwhub.Hub) *MetricsProvider { return &MetricsProvider{ - apic: apic, + apic: apic, interval: interval, - logger: logger, - static: newStaticMetrics(consoleOptions, datasources, hub), + logger: logger, + static: newStaticMetrics(consoleOptions, datasources, hub), } } func (m *MetricsProvider) metricsPayload() *models.AllMetrics { meta := &models.MetricsMeta{ UtcStartupTimestamp: m.static.startupTS, - WindowSizeSeconds: int64(m.interval.Seconds()), + WindowSizeSeconds: int64(m.interval.Seconds()), } os := &models.OSversion{ - Name: m.static.osName, + Name: m.static.osName, Version: m.static.osVersion, } base := models.BaseMetrics{ - Meta: meta, - Os: os, - Version: ptr.Of(cwversion.VersionStr()), + Meta: meta, + Os: os, + Version: ptr.Of(cwversion.VersionStr()), FeatureFlags: m.static.featureFlags, } item0 := &models.LogProcessorsMetricsItems0{ - BaseMetrics: base, + BaseMetrics: base, ConsoleOptions: m.static.consoleOptions, - Datasources: m.static.datasourceMap, - HubItems: m.static.hubState, + Datasources: m.static.datasourceMap, + HubItems: m.static.hubState, } // TODO: more metric details... ? return &models.AllMetrics{ - LogProcessors: []models.LogProcessorsMetrics{{item0}}, + LogProcessors: []models.LogProcessorsMetrics{{item0}}, } } - func (m *MetricsProvider) Run(ctx context.Context, myTomb *tomb.Tomb) error { defer trace.CatchPanic("crowdsec/MetricsProvider.Run") @@ -149,7 +148,7 @@ func (m *MetricsProvider) Run(ctx context.Context, myTomb *tomb.Tomb) error { met := m.metricsPayload() - ticker := time.NewTicker(m.interval) + ticker := time.NewTicker(1) //Send on start for { select { @@ -177,6 +176,8 @@ func (m *MetricsProvider) Run(ctx context.Context, myTomb *tomb.Tomb) error { continue } + ticker.Reset(m.interval) + m.logger.Tracef("lp usage metrics sent") case <-myTomb.Dying(): ticker.Stop() diff --git a/pkg/apiserver/apic_metrics.go b/pkg/apiserver/apic_metrics.go index 128ce5a96..75696604d 100644 --- a/pkg/apiserver/apic_metrics.go +++ b/pkg/apiserver/apic_metrics.go @@ -2,11 +2,15 @@ package apiserver import ( "context" + "encoding/json" + "strings" "time" - log "github.com/sirupsen/logrus" "slices" + "github.com/davecgh/go-spew/spew" + log "github.com/sirupsen/logrus" + "github.com/crowdsecurity/go-cs-lib/ptr" "github.com/crowdsecurity/go-cs-lib/trace" "github.com/crowdsecurity/go-cs-lib/version" @@ -14,6 +18,67 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/models" ) +func (a *apic) GetUsageMetrics() (*models.AllMetrics, error) { + lpsMetrics, err := a.dbClient.GetLPsUsageMetrics() + + if err != nil { + return nil, err + } + + spew.Dump(lpsMetrics) + + bouncersMetrics, err := a.dbClient.GetBouncersUsageMetrics() + if err != nil { + return nil, err + } + + spew.Dump(bouncersMetrics) + + allMetrics := &models.AllMetrics{} + + allLps := a.dbClient.ListMachines() + allBouncers := a.dbClient.ListBouncers() + + for _, lpsMetric := range lpsMetrics { + lpName := lpsMetric.GeneratedBy + metrics := models.LogProcessorsMetricsItems0{} + + err := json.Unmarshal([]byte(lpsMetric.Payload), &metrics) + + if err != nil { + log.Errorf("unable to unmarshal LPs metrics (%s)", err) + continue + } + + lp, err := a.dbClient.QueryMachineByID(lpName) + + if err != nil { + log.Errorf("unable to get LP information for %s: %s", lpName, err) + continue + } + + if lp.Hubstate != nil { + metrics.HubItems = *lp.Hubstate + } + + metrics.Os = &models.OSversion{ + Name: lp.Osname, + Version: lp.Osversion, + } + + metrics.FeatureFlags = strings.Split(lp.Featureflags, ",") + metrics.Version = &lp.Version + //TODO: meta + + } + + //bouncerInfos := make(map[string]string) + + //TODO: add LAPI metrics + + return allMetrics, nil +} + func (a *apic) GetMetrics() (*models.Metrics, error) { machines, err := a.dbClient.ListMachines() if err != nil { @@ -160,3 +225,24 @@ func (a *apic) SendMetrics(stop chan (bool)) { } } } + +func (a *apic) SendUsageMetrics() { + defer trace.CatchPanic("lapi/usageMetricsToAPIC") + + ticker := time.NewTicker(5 * time.Second) + + for { + select { + case <-a.metricsTomb.Dying(): + //The normal metrics routine also kills push/pull tombs, does that make sense ? + ticker.Stop() + return + case <-ticker.C: + _, err := a.GetUsageMetrics() + if err != nil { + log.Errorf("unable to get usage metrics (%s)", err) + } + + } + } +} diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 7989cfc1d..0c34f5a85 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -25,6 +25,7 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/csplugin" "github.com/crowdsecurity/crowdsec/pkg/database" + "github.com/crowdsecurity/crowdsec/pkg/fflag" "github.com/crowdsecurity/crowdsec/pkg/types" ) @@ -360,6 +361,15 @@ func (s *APIServer) Run(apiReady chan bool) error { s.apic.SendMetrics(make(chan bool)) return nil }) + + if fflag.CAPIUsageMetrics.IsEnabled() { + log.Infof("CAPI_USAGE_METRICS flag is enabled, starting usage metrics routine") + s.apic.metricsTomb.Go(func() error { + s.apic.SendUsageMetrics() + return nil + }) + } + } s.httpServerTomb.Go(func() error { @@ -368,7 +378,7 @@ func (s *APIServer) Run(apiReady chan bool) error { if err := s.httpServerTomb.Wait(); err != nil { return fmt.Errorf("local API server stopped with error: %w", err) - } + } return nil } diff --git a/pkg/database/metrics.go b/pkg/database/metrics.go index bd5254497..22f699a78 100644 --- a/pkg/database/metrics.go +++ b/pkg/database/metrics.go @@ -15,7 +15,6 @@ import ( // RemoveOldMetrics // avoid errors.Wrapf - func (c *Client) CreateMetric(generatedType metric.GeneratedType, generatedBy string, collectedAt time.Time, payload string) (*ent.Metric, error) { metric, err := c.Ent.Metric. Create(). @@ -38,3 +37,35 @@ func (c *Client) CreateMetric(generatedType metric.GeneratedType, generatedBy st return metric, nil } + +func (c *Client) GetLPsUsageMetrics() ([]*ent.Metric, error) { + metrics, err := c.Ent.Metric.Query(). + Where( + metric.GeneratedTypeEQ(metric.GeneratedTypeLP), + metric.PushedAtIsNil(), + ). + Order(ent.Desc(metric.FieldCollectedAt)). + All(c.CTX) + if err != nil { + c.Log.Warningf("GetLPsUsageMetrics: %s", err) + return nil, fmt.Errorf("getting LPs usage metrics: %w", err) + } + + return metrics, nil +} + +func (c *Client) GetBouncersUsageMetrics() ([]*ent.Metric, error) { + metrics, err := c.Ent.Metric.Query(). + Where( + metric.GeneratedTypeEQ(metric.GeneratedTypeRC), + metric.PushedAtIsNil(), + ). + Order(ent.Desc(metric.FieldCollectedAt)). + All(c.CTX) + if err != nil { + c.Log.Warningf("GetBouncersUsageMetrics: %s", err) + return nil, fmt.Errorf("getting bouncers usage metrics: %w", err) + } + + return metrics, nil +} diff --git a/pkg/fflag/crowdsec.go b/pkg/fflag/crowdsec.go index d42d6a05e..e5c80a19b 100644 --- a/pkg/fflag/crowdsec.go +++ b/pkg/fflag/crowdsec.go @@ -8,6 +8,7 @@ var ChunkedDecisionsStream = &Feature{Name: "chunked_decisions_stream", Descript var PapiClient = &Feature{Name: "papi_client", Description: "Enable Polling API client", State: DeprecatedState} var Re2GrokSupport = &Feature{Name: "re2_grok_support", Description: "Enable RE2 support for GROK patterns"} var Re2RegexpInfileSupport = &Feature{Name: "re2_regexp_in_file_support", Description: "Enable RE2 support for RegexpInFile expr helper"} +var CAPIUsageMetrics = &Feature{Name: "capi_usage_metrics", Description: "Enable usage metrics push to CAPI"} func RegisterAllFeatures() error { err := Crowdsec.RegisterFeature(CscliSetup) @@ -40,5 +41,10 @@ func RegisterAllFeatures() error { return err } + err = Crowdsec.RegisterFeature(CAPIUsageMetrics) + if err != nil { + return err + } + return nil }