send metrics immediately if agents are added or removed (#2296)

This commit is contained in:
mmetc 2023-06-23 14:06:04 +02:00 committed by GitHub
parent 9beb5388cb
commit 507da49b5a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 254 additions and 160 deletions

View file

@ -33,7 +33,8 @@ import (
"github.com/crowdsecurity/crowdsec/pkg/types"
)
var (
const (
// delta values must be smaller than the interval
pullIntervalDefault = time.Hour * 2
pullIntervalDelta = 5 * time.Minute
pushIntervalDefault = time.Second * 10
@ -71,7 +72,12 @@ type apic struct {
// randomDuration returns a duration value between d-delta and d+delta
func randomDuration(d time.Duration, delta time.Duration) time.Duration {
return time.Duration(float64(d) + float64(delta)*(-1.0+2.0*rand.Float64()))
ret := d + time.Duration(rand.Int63n(int64(2*delta))) - delta
// ticker interval must be > 0 (nanoseconds)
if ret <= 0 {
return 1
}
return ret
}
func (a *apic) FetchScenariosListFromDB() ([]string, error) {
@ -822,80 +828,6 @@ func (a *apic) Pull() error {
}
}
func (a *apic) GetMetrics() (*models.Metrics, error) {
metric := &models.Metrics{
ApilVersion: ptr.Of(version.String()),
Machines: make([]*models.MetricsAgentInfo, 0),
Bouncers: make([]*models.MetricsBouncerInfo, 0),
}
machines, err := a.dbClient.ListMachines()
if err != nil {
return metric, err
}
bouncers, err := a.dbClient.ListBouncers()
if err != nil {
return metric, err
}
var lastpush string
for _, machine := range machines {
if machine.LastPush == nil {
lastpush = time.Time{}.String()
} else {
lastpush = machine.LastPush.String()
}
m := &models.MetricsAgentInfo{
Version: machine.Version,
Name: machine.MachineId,
LastUpdate: machine.UpdatedAt.String(),
LastPush: lastpush,
}
metric.Machines = append(metric.Machines, m)
}
for _, bouncer := range bouncers {
m := &models.MetricsBouncerInfo{
Version: bouncer.Version,
CustomName: bouncer.Name,
Name: bouncer.Type,
LastPull: bouncer.LastPull.String(),
}
metric.Bouncers = append(metric.Bouncers, m)
}
return metric, nil
}
func (a *apic) SendMetrics(stop chan (bool)) {
defer trace.CatchPanic("lapi/metricsToAPIC")
ticker := time.NewTicker(a.metricsIntervalFirst)
log.Infof("Start send metrics to CrowdSec Central API (interval: %s once, then %s)", a.metricsIntervalFirst.Round(time.Second), a.metricsInterval)
for {
metrics, err := a.GetMetrics()
if err != nil {
log.Errorf("unable to get metrics (%s), will retry", err)
}
_, _, err = a.apiClient.Metrics.Add(context.Background(), metrics)
if err != nil {
log.Errorf("capi metrics: failed: %s", err)
} else {
log.Infof("capi metrics: metrics sent successfully")
}
select {
case <-stop:
return
case <-ticker.C:
ticker.Reset(a.metricsInterval)
case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others?
a.pullTomb.Kill(nil)
a.pushTomb.Kill(nil)
return
}
}
}
func (a *apic) Shutdown() {
a.pushTomb.Kill(nil)
a.pullTomb.Kill(nil)

View file

@ -0,0 +1,145 @@
package apiserver
import (
"context"
"time"
log "github.com/sirupsen/logrus"
"golang.org/x/exp/slices"
"github.com/crowdsecurity/go-cs-lib/pkg/ptr"
"github.com/crowdsecurity/go-cs-lib/pkg/trace"
"github.com/crowdsecurity/go-cs-lib/pkg/version"
"github.com/crowdsecurity/crowdsec/pkg/models"
)
func (a *apic) GetMetrics() (*models.Metrics, error) {
machines, err := a.dbClient.ListMachines()
if err != nil {
return nil, err
}
machinesInfo := make([]*models.MetricsAgentInfo, len(machines))
for i, machine := range machines {
machinesInfo[i] = &models.MetricsAgentInfo{
Version: machine.Version,
Name: machine.MachineId,
LastUpdate: machine.UpdatedAt.String(),
LastPush: ptr.OrEmpty(machine.LastPush).String(),
}
}
bouncers, err := a.dbClient.ListBouncers()
if err != nil {
return nil, err
}
bouncersInfo := make([]*models.MetricsBouncerInfo, len(bouncers))
for i, bouncer := range bouncers {
bouncersInfo[i] = &models.MetricsBouncerInfo{
Version: bouncer.Version,
CustomName: bouncer.Name,
Name: bouncer.Type,
LastPull: bouncer.LastPull.String(),
}
}
return &models.Metrics{
ApilVersion: ptr.Of(version.String()),
Machines: machinesInfo,
Bouncers: bouncersInfo,
}, nil
}
func (a *apic) fetchMachineIDs() ([]string, error) {
machines, err := a.dbClient.ListMachines()
if err != nil {
return nil, err
}
ret := make([]string, len(machines))
for i, machine := range machines {
ret[i] = machine.MachineId
}
// sorted slices are required for the slices.Equal comparison
slices.Sort(ret)
return ret, nil
}
// SendMetrics sends metrics to the API server until it receives a stop signal.
//
// Metrics are sent at start, then at the randomized metricsIntervalFirst,
// then at regular metricsInterval. If a change is detected in the list
// of machines, the next metrics are sent immediately.
func (a *apic) SendMetrics(stop chan (bool)) {
defer trace.CatchPanic("lapi/metricsToAPIC")
// verify the list of machines every <checkInt> interval
const checkInt = 20 * time.Second
// intervals must always be > 0
metInts := []time.Duration{1, a.metricsIntervalFirst, a.metricsInterval}
log.Infof("Start send metrics to CrowdSec Central API (interval: %s once, then %s)",
metInts[1].Round(time.Second), metInts[2])
count := -1
nextMetInt := func() time.Duration {
if count < len(metInts)-1 {
count++
}
return metInts[count]
}
// store the list of machine IDs to compare
// with the next list
machineIDs := []string{}
reloadMachineIDs := func() {
ids, err := a.fetchMachineIDs()
if err != nil {
log.Debugf("unable to get machines (%s), will retry", err)
return
}
machineIDs = ids
}
checkTicker := time.NewTicker(checkInt)
metTicker := time.NewTicker(nextMetInt())
for {
select {
case <-stop:
checkTicker.Stop()
metTicker.Stop()
return
case <-checkTicker.C:
oldIDs := machineIDs
reloadMachineIDs()
if !slices.Equal(oldIDs, machineIDs) {
log.Infof("capi metrics: machines changed, immediate send")
metTicker.Reset(1)
}
case <-metTicker.C:
metrics, err := a.GetMetrics()
if err != nil {
log.Errorf("unable to get metrics (%s), will retry", err)
}
log.Info("capi metrics: sending")
_, _, err = a.apiClient.Metrics.Add(context.Background(), metrics)
if err != nil {
log.Errorf("capi metrics: failed: %s", err)
}
metTicker.Reset(nextMetInt())
case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others?
checkTicker.Stop()
metTicker.Stop()
a.pullTomb.Kill(nil)
a.pushTomb.Kill(nil)
return
}
}
}

View file

@ -0,0 +1,101 @@
package apiserver
import (
"context"
"fmt"
"net/url"
"testing"
"time"
"github.com/jarcoal/httpmock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/crowdsecurity/go-cs-lib/pkg/version"
"github.com/crowdsecurity/crowdsec/pkg/apiclient"
)
func TestAPICSendMetrics(t *testing.T) {
tests := []struct {
name string
duration time.Duration
expectedCalls int
setUp func(*apic)
metricsInterval time.Duration
}{
{
name: "basic",
duration: time.Millisecond * 30,
metricsInterval: time.Millisecond * 5,
expectedCalls: 5,
setUp: func(api *apic) {},
},
{
name: "with some metrics",
duration: time.Millisecond * 30,
metricsInterval: time.Millisecond * 5,
expectedCalls: 5,
setUp: func(api *apic) {
api.dbClient.Ent.Machine.Delete().ExecX(context.Background())
api.dbClient.Ent.Machine.Create().
SetMachineId("1234").
SetPassword(testPassword.String()).
SetIpAddress("1.2.3.4").
SetScenarios("crowdsecurity/test").
SetLastPush(time.Time{}).
SetUpdatedAt(time.Time{}).
ExecX(context.Background())
api.dbClient.Ent.Bouncer.Delete().ExecX(context.Background())
api.dbClient.Ent.Bouncer.Create().
SetIPAddress("1.2.3.6").
SetName("someBouncer").
SetAPIKey("foobar").
SetRevoked(false).
SetLastPull(time.Time{}).
ExecX(context.Background())
},
},
}
httpmock.RegisterResponder("POST", "http://api.crowdsec.net/api/metrics/", httpmock.NewBytesResponder(200, []byte{}))
httpmock.Activate()
defer httpmock.Deactivate()
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
url, err := url.ParseRequestURI("http://api.crowdsec.net/")
require.NoError(t, err)
apiClient, err := apiclient.NewDefaultClient(
url,
"/api",
fmt.Sprintf("crowdsec/%s", version.String()),
nil,
)
require.NoError(t, err)
api := getAPIC(t)
api.pushInterval = time.Millisecond
api.pushIntervalFirst = time.Millisecond
api.apiClient = apiClient
api.metricsInterval = tc.metricsInterval
api.metricsIntervalFirst = tc.metricsInterval
tc.setUp(api)
stop := make(chan bool)
httpmock.ZeroCallCounters()
go api.SendMetrics(stop)
time.Sleep(tc.duration)
stop <- true
info := httpmock.GetCallCountInfo()
noResponderCalls := info["NO_RESPONDER"]
responderCalls := info["POST http://api.crowdsec.net/api/metrics/"]
assert.LessOrEqual(t, absDiff(tc.expectedCalls, responderCalls), 2)
assert.Zero(t, noResponderCalls)
})
}
}

View file

@ -1057,90 +1057,6 @@ func TestAPICPush(t *testing.T) {
}
}
func TestAPICSendMetrics(t *testing.T) {
tests := []struct {
name string
duration time.Duration
expectedCalls int
setUp func(*apic)
metricsInterval time.Duration
}{
{
name: "basic",
duration: time.Millisecond * 30,
metricsInterval: time.Millisecond * 5,
expectedCalls: 5,
setUp: func(api *apic) {},
},
{
name: "with some metrics",
duration: time.Millisecond * 30,
metricsInterval: time.Millisecond * 5,
expectedCalls: 5,
setUp: func(api *apic) {
api.dbClient.Ent.Machine.Delete().ExecX(context.Background())
api.dbClient.Ent.Machine.Create().
SetMachineId("1234").
SetPassword(testPassword.String()).
SetIpAddress("1.2.3.4").
SetScenarios("crowdsecurity/test").
SetLastPush(time.Time{}).
SetUpdatedAt(time.Time{}).
ExecX(context.Background())
api.dbClient.Ent.Bouncer.Delete().ExecX(context.Background())
api.dbClient.Ent.Bouncer.Create().
SetIPAddress("1.2.3.6").
SetName("someBouncer").
SetAPIKey("foobar").
SetRevoked(false).
SetLastPull(time.Time{}).
ExecX(context.Background())
},
},
}
httpmock.RegisterResponder("POST", "http://api.crowdsec.net/api/metrics/", httpmock.NewBytesResponder(200, []byte{}))
httpmock.Activate()
defer httpmock.Deactivate()
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
url, err := url.ParseRequestURI("http://api.crowdsec.net/")
require.NoError(t, err)
apiClient, err := apiclient.NewDefaultClient(
url,
"/api",
fmt.Sprintf("crowdsec/%s", version.String()),
nil,
)
require.NoError(t, err)
api := getAPIC(t)
api.pushInterval = time.Millisecond
api.pushIntervalFirst = time.Millisecond
api.apiClient = apiClient
api.metricsInterval = tc.metricsInterval
api.metricsIntervalFirst = tc.metricsInterval
tc.setUp(api)
stop := make(chan bool)
httpmock.ZeroCallCounters()
go api.SendMetrics(stop)
time.Sleep(tc.duration)
stop <- true
info := httpmock.GetCallCountInfo()
noResponderCalls := info["NO_RESPONDER"]
responderCalls := info["POST http://api.crowdsec.net/api/metrics/"]
assert.LessOrEqual(t, absDiff(tc.expectedCalls, responderCalls), 2)
assert.Zero(t, noResponderCalls)
})
}
}
func TestAPICPull(t *testing.T) {
api := getAPIC(t)
tests := []struct {