From 507da49b5ad6f4f7c4eda328ae17b9ff3e627a17 Mon Sep 17 00:00:00 2001 From: mmetc <92726601+mmetc@users.noreply.github.com> Date: Fri, 23 Jun 2023 14:06:04 +0200 Subject: [PATCH] send metrics immediately if agents are added or removed (#2296) --- pkg/apiserver/apic.go | 84 ++--------------- pkg/apiserver/apic_metrics.go | 145 +++++++++++++++++++++++++++++ pkg/apiserver/apic_metrics_test.go | 101 ++++++++++++++++++++ pkg/apiserver/apic_test.go | 84 ----------------- 4 files changed, 254 insertions(+), 160 deletions(-) create mode 100644 pkg/apiserver/apic_metrics.go create mode 100644 pkg/apiserver/apic_metrics_test.go diff --git a/pkg/apiserver/apic.go b/pkg/apiserver/apic.go index 3791a9a3c..b15cf21d6 100644 --- a/pkg/apiserver/apic.go +++ b/pkg/apiserver/apic.go @@ -33,7 +33,8 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/types" ) -var ( +const ( + // delta values must be smaller than the interval pullIntervalDefault = time.Hour * 2 pullIntervalDelta = 5 * time.Minute pushIntervalDefault = time.Second * 10 @@ -71,7 +72,12 @@ type apic struct { // randomDuration returns a duration value between d-delta and d+delta func randomDuration(d time.Duration, delta time.Duration) time.Duration { - return time.Duration(float64(d) + float64(delta)*(-1.0+2.0*rand.Float64())) + ret := d + time.Duration(rand.Int63n(int64(2*delta))) - delta + // ticker interval must be > 0 (nanoseconds) + if ret <= 0 { + return 1 + } + return ret } func (a *apic) FetchScenariosListFromDB() ([]string, error) { @@ -822,80 +828,6 @@ func (a *apic) Pull() error { } } -func (a *apic) GetMetrics() (*models.Metrics, error) { - metric := &models.Metrics{ - ApilVersion: ptr.Of(version.String()), - Machines: make([]*models.MetricsAgentInfo, 0), - Bouncers: make([]*models.MetricsBouncerInfo, 0), - } - machines, err := a.dbClient.ListMachines() - if err != nil { - return metric, err - } - bouncers, err := a.dbClient.ListBouncers() - if err != nil { - return metric, err - } - var lastpush string - for _, machine := range machines { - if machine.LastPush == nil { - lastpush = time.Time{}.String() - } else { - lastpush = machine.LastPush.String() - } - m := &models.MetricsAgentInfo{ - Version: machine.Version, - Name: machine.MachineId, - LastUpdate: machine.UpdatedAt.String(), - LastPush: lastpush, - } - metric.Machines = append(metric.Machines, m) - } - - for _, bouncer := range bouncers { - m := &models.MetricsBouncerInfo{ - Version: bouncer.Version, - CustomName: bouncer.Name, - Name: bouncer.Type, - LastPull: bouncer.LastPull.String(), - } - metric.Bouncers = append(metric.Bouncers, m) - } - return metric, nil -} - -func (a *apic) SendMetrics(stop chan (bool)) { - defer trace.CatchPanic("lapi/metricsToAPIC") - - ticker := time.NewTicker(a.metricsIntervalFirst) - - log.Infof("Start send metrics to CrowdSec Central API (interval: %s once, then %s)", a.metricsIntervalFirst.Round(time.Second), a.metricsInterval) - - for { - metrics, err := a.GetMetrics() - if err != nil { - log.Errorf("unable to get metrics (%s), will retry", err) - } - _, _, err = a.apiClient.Metrics.Add(context.Background(), metrics) - if err != nil { - log.Errorf("capi metrics: failed: %s", err) - } else { - log.Infof("capi metrics: metrics sent successfully") - } - - select { - case <-stop: - return - case <-ticker.C: - ticker.Reset(a.metricsInterval) - case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others? - a.pullTomb.Kill(nil) - a.pushTomb.Kill(nil) - return - } - } -} - func (a *apic) Shutdown() { a.pushTomb.Kill(nil) a.pullTomb.Kill(nil) diff --git a/pkg/apiserver/apic_metrics.go b/pkg/apiserver/apic_metrics.go new file mode 100644 index 000000000..4befcf50c --- /dev/null +++ b/pkg/apiserver/apic_metrics.go @@ -0,0 +1,145 @@ +package apiserver + +import ( + "context" + "time" + + log "github.com/sirupsen/logrus" + "golang.org/x/exp/slices" + + "github.com/crowdsecurity/go-cs-lib/pkg/ptr" + "github.com/crowdsecurity/go-cs-lib/pkg/trace" + "github.com/crowdsecurity/go-cs-lib/pkg/version" + + "github.com/crowdsecurity/crowdsec/pkg/models" +) + +func (a *apic) GetMetrics() (*models.Metrics, error) { + machines, err := a.dbClient.ListMachines() + if err != nil { + return nil, err + } + + machinesInfo := make([]*models.MetricsAgentInfo, len(machines)) + + for i, machine := range machines { + machinesInfo[i] = &models.MetricsAgentInfo{ + Version: machine.Version, + Name: machine.MachineId, + LastUpdate: machine.UpdatedAt.String(), + LastPush: ptr.OrEmpty(machine.LastPush).String(), + } + } + + bouncers, err := a.dbClient.ListBouncers() + if err != nil { + return nil, err + } + + bouncersInfo := make([]*models.MetricsBouncerInfo, len(bouncers)) + + for i, bouncer := range bouncers { + bouncersInfo[i] = &models.MetricsBouncerInfo{ + Version: bouncer.Version, + CustomName: bouncer.Name, + Name: bouncer.Type, + LastPull: bouncer.LastPull.String(), + } + } + + return &models.Metrics{ + ApilVersion: ptr.Of(version.String()), + Machines: machinesInfo, + Bouncers: bouncersInfo, + }, nil +} + +func (a *apic) fetchMachineIDs() ([]string, error) { + machines, err := a.dbClient.ListMachines() + if err != nil { + return nil, err + } + + ret := make([]string, len(machines)) + for i, machine := range machines { + ret[i] = machine.MachineId + } + // sorted slices are required for the slices.Equal comparison + slices.Sort(ret) + return ret, nil +} + +// SendMetrics sends metrics to the API server until it receives a stop signal. +// +// Metrics are sent at start, then at the randomized metricsIntervalFirst, +// then at regular metricsInterval. If a change is detected in the list +// of machines, the next metrics are sent immediately. +func (a *apic) SendMetrics(stop chan (bool)) { + defer trace.CatchPanic("lapi/metricsToAPIC") + + // verify the list of machines every interval + const checkInt = 20 * time.Second + + // intervals must always be > 0 + metInts := []time.Duration{1, a.metricsIntervalFirst, a.metricsInterval} + + log.Infof("Start send metrics to CrowdSec Central API (interval: %s once, then %s)", + metInts[1].Round(time.Second), metInts[2]) + + count := -1 + nextMetInt := func() time.Duration { + if count < len(metInts)-1 { + count++ + } + return metInts[count] + } + + // store the list of machine IDs to compare + // with the next list + machineIDs := []string{} + + reloadMachineIDs := func() { + ids, err := a.fetchMachineIDs() + if err != nil { + log.Debugf("unable to get machines (%s), will retry", err) + return + } + machineIDs = ids + } + + checkTicker := time.NewTicker(checkInt) + metTicker := time.NewTicker(nextMetInt()) + + for { + select { + case <-stop: + checkTicker.Stop() + metTicker.Stop() + return + case <-checkTicker.C: + oldIDs := machineIDs + reloadMachineIDs() + if !slices.Equal(oldIDs, machineIDs) { + log.Infof("capi metrics: machines changed, immediate send") + metTicker.Reset(1) + } + case <-metTicker.C: + metrics, err := a.GetMetrics() + if err != nil { + log.Errorf("unable to get metrics (%s), will retry", err) + } + log.Info("capi metrics: sending") + _, _, err = a.apiClient.Metrics.Add(context.Background(), metrics) + if err != nil { + log.Errorf("capi metrics: failed: %s", err) + } + metTicker.Reset(nextMetInt()) + case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others? + checkTicker.Stop() + metTicker.Stop() + a.pullTomb.Kill(nil) + a.pushTomb.Kill(nil) + return + } + } +} diff --git a/pkg/apiserver/apic_metrics_test.go b/pkg/apiserver/apic_metrics_test.go new file mode 100644 index 000000000..7e37ea1e9 --- /dev/null +++ b/pkg/apiserver/apic_metrics_test.go @@ -0,0 +1,101 @@ +package apiserver + +import ( + "context" + "fmt" + "net/url" + "testing" + "time" + + "github.com/jarcoal/httpmock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/crowdsecurity/go-cs-lib/pkg/version" + + "github.com/crowdsecurity/crowdsec/pkg/apiclient" +) + +func TestAPICSendMetrics(t *testing.T) { + tests := []struct { + name string + duration time.Duration + expectedCalls int + setUp func(*apic) + metricsInterval time.Duration + }{ + { + name: "basic", + duration: time.Millisecond * 30, + metricsInterval: time.Millisecond * 5, + expectedCalls: 5, + setUp: func(api *apic) {}, + }, + { + name: "with some metrics", + duration: time.Millisecond * 30, + metricsInterval: time.Millisecond * 5, + expectedCalls: 5, + setUp: func(api *apic) { + api.dbClient.Ent.Machine.Delete().ExecX(context.Background()) + api.dbClient.Ent.Machine.Create(). + SetMachineId("1234"). + SetPassword(testPassword.String()). + SetIpAddress("1.2.3.4"). + SetScenarios("crowdsecurity/test"). + SetLastPush(time.Time{}). + SetUpdatedAt(time.Time{}). + ExecX(context.Background()) + + api.dbClient.Ent.Bouncer.Delete().ExecX(context.Background()) + api.dbClient.Ent.Bouncer.Create(). + SetIPAddress("1.2.3.6"). + SetName("someBouncer"). + SetAPIKey("foobar"). + SetRevoked(false). + SetLastPull(time.Time{}). + ExecX(context.Background()) + }, + }, + } + + httpmock.RegisterResponder("POST", "http://api.crowdsec.net/api/metrics/", httpmock.NewBytesResponder(200, []byte{})) + httpmock.Activate() + defer httpmock.Deactivate() + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + url, err := url.ParseRequestURI("http://api.crowdsec.net/") + require.NoError(t, err) + + apiClient, err := apiclient.NewDefaultClient( + url, + "/api", + fmt.Sprintf("crowdsec/%s", version.String()), + nil, + ) + require.NoError(t, err) + + api := getAPIC(t) + api.pushInterval = time.Millisecond + api.pushIntervalFirst = time.Millisecond + api.apiClient = apiClient + api.metricsInterval = tc.metricsInterval + api.metricsIntervalFirst = tc.metricsInterval + tc.setUp(api) + + stop := make(chan bool) + httpmock.ZeroCallCounters() + go api.SendMetrics(stop) + time.Sleep(tc.duration) + stop <- true + + info := httpmock.GetCallCountInfo() + noResponderCalls := info["NO_RESPONDER"] + responderCalls := info["POST http://api.crowdsec.net/api/metrics/"] + assert.LessOrEqual(t, absDiff(tc.expectedCalls, responderCalls), 2) + assert.Zero(t, noResponderCalls) + }) + } +} diff --git a/pkg/apiserver/apic_test.go b/pkg/apiserver/apic_test.go index 65ca29991..8aeb092cd 100644 --- a/pkg/apiserver/apic_test.go +++ b/pkg/apiserver/apic_test.go @@ -1057,90 +1057,6 @@ func TestAPICPush(t *testing.T) { } } -func TestAPICSendMetrics(t *testing.T) { - tests := []struct { - name string - duration time.Duration - expectedCalls int - setUp func(*apic) - metricsInterval time.Duration - }{ - { - name: "basic", - duration: time.Millisecond * 30, - metricsInterval: time.Millisecond * 5, - expectedCalls: 5, - setUp: func(api *apic) {}, - }, - { - name: "with some metrics", - duration: time.Millisecond * 30, - metricsInterval: time.Millisecond * 5, - expectedCalls: 5, - setUp: func(api *apic) { - api.dbClient.Ent.Machine.Delete().ExecX(context.Background()) - api.dbClient.Ent.Machine.Create(). - SetMachineId("1234"). - SetPassword(testPassword.String()). - SetIpAddress("1.2.3.4"). - SetScenarios("crowdsecurity/test"). - SetLastPush(time.Time{}). - SetUpdatedAt(time.Time{}). - ExecX(context.Background()) - - api.dbClient.Ent.Bouncer.Delete().ExecX(context.Background()) - api.dbClient.Ent.Bouncer.Create(). - SetIPAddress("1.2.3.6"). - SetName("someBouncer"). - SetAPIKey("foobar"). - SetRevoked(false). - SetLastPull(time.Time{}). - ExecX(context.Background()) - }, - }, - } - - httpmock.RegisterResponder("POST", "http://api.crowdsec.net/api/metrics/", httpmock.NewBytesResponder(200, []byte{})) - httpmock.Activate() - defer httpmock.Deactivate() - - for _, tc := range tests { - tc := tc - t.Run(tc.name, func(t *testing.T) { - url, err := url.ParseRequestURI("http://api.crowdsec.net/") - require.NoError(t, err) - - apiClient, err := apiclient.NewDefaultClient( - url, - "/api", - fmt.Sprintf("crowdsec/%s", version.String()), - nil, - ) - require.NoError(t, err) - - api := getAPIC(t) - api.pushInterval = time.Millisecond - api.pushIntervalFirst = time.Millisecond - api.apiClient = apiClient - api.metricsInterval = tc.metricsInterval - api.metricsIntervalFirst = tc.metricsInterval - tc.setUp(api) - - stop := make(chan bool) - httpmock.ZeroCallCounters() - go api.SendMetrics(stop) - time.Sleep(tc.duration) - stop <- true - - info := httpmock.GetCallCountInfo() - noResponderCalls := info["NO_RESPONDER"] - responderCalls := info["POST http://api.crowdsec.net/api/metrics/"] - assert.LessOrEqual(t, absDiff(tc.expectedCalls, responderCalls), 2) - assert.Zero(t, noResponderCalls) - }) - } -} - func TestAPICPull(t *testing.T) { api := getAPIC(t) tests := []struct {