crowdsec/pkg/acquisition/acquisition_test.go
Thibault "bui" Koechlin b1c09f7512
acquisition : take prometheus level into account (#2885)
* properly take into account the aggregation level of prometheus metrics in acquisition
2024-03-13 14:57:19 +01:00

560 lines
15 KiB
Go

package acquisition
import (
"fmt"
"strings"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
tomb "gopkg.in/tomb.v2"
"gopkg.in/yaml.v2"
"github.com/crowdsecurity/go-cs-lib/cstest"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
type MockSource struct {
configuration.DataSourceCommonCfg `yaml:",inline"`
Toto string `yaml:"toto"`
logger *log.Entry
}
func (f *MockSource) UnmarshalConfig(cfg []byte) error {
err := yaml.UnmarshalStrict(cfg, &f)
if err != nil {
return err
}
return nil
}
func (f *MockSource) Configure(cfg []byte, logger *log.Entry, metricsLevel int) error {
f.logger = logger
if err := f.UnmarshalConfig(cfg); err != nil {
return err
}
if f.Mode == "" {
f.Mode = configuration.CAT_MODE
}
if f.Mode != configuration.CAT_MODE && f.Mode != configuration.TAIL_MODE {
return fmt.Errorf("mode %s is not supported", f.Mode)
}
if f.Toto == "" {
return fmt.Errorf("expect non-empty toto")
}
return nil
}
func (f *MockSource) GetMode() string { return f.Mode }
func (f *MockSource) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSource) StreamingAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSource) CanRun() error { return nil }
func (f *MockSource) GetMetrics() []prometheus.Collector { return nil }
func (f *MockSource) GetAggregMetrics() []prometheus.Collector { return nil }
func (f *MockSource) Dump() interface{} { return f }
func (f *MockSource) GetName() string { return "mock" }
func (f *MockSource) ConfigureByDSN(string, map[string]string, *log.Entry, string) error {
return fmt.Errorf("not supported")
}
func (f *MockSource) GetUuid() string { return "" }
// copy the mocksource, but this one can't run
type MockSourceCantRun struct {
MockSource
}
func (f *MockSourceCantRun) CanRun() error { return fmt.Errorf("can't run bro") }
func (f *MockSourceCantRun) GetName() string { return "mock_cant_run" }
// appendMockSource is only used to add mock source for tests
func appendMockSource() {
if GetDataSourceIface("mock") == nil {
AcquisitionSources["mock"] = func() DataSource { return &MockSource{} }
}
if GetDataSourceIface("mock_cant_run") == nil {
AcquisitionSources["mock_cant_run"] = func() DataSource { return &MockSourceCantRun{} }
}
}
func TestDataSourceConfigure(t *testing.T) {
appendMockSource()
tests := []struct {
TestName string
String string
ExpectedError string
}{
{
TestName: "basic_valid_config",
String: `
mode: cat
labels:
test: foobar
log_level: info
source: mock
toto: test_value1
`,
},
{
TestName: "basic_debug_config",
String: `
mode: cat
labels:
test: foobar
log_level: debug
source: mock
toto: test_value1
`,
},
{
TestName: "basic_tailmode_config",
String: `
mode: tail
labels:
test: foobar
log_level: debug
source: mock
toto: test_value1
`,
},
{
TestName: "bad_mode_config",
String: `
mode: ratata
labels:
test: foobar
log_level: debug
source: mock
toto: test_value1
`,
ExpectedError: "failed to configure datasource mock: mode ratata is not supported",
},
{
TestName: "bad_type_config",
String: `
mode: cat
labels:
test: foobar
log_level: debug
source: tutu
`,
ExpectedError: "cannot find source tutu",
},
{
TestName: "mismatch_config",
String: `
mode: cat
labels:
test: foobar
log_level: debug
source: mock
wowo: ajsajasjas
`,
ExpectedError: "field wowo not found in type acquisition.MockSource",
},
{
TestName: "cant_run_error",
String: `
mode: cat
labels:
test: foobar
log_level: debug
source: mock_cant_run
wowo: ajsajasjas
`,
ExpectedError: "datasource 'mock_cant_run' is not available: can't run bro",
},
}
for _, tc := range tests {
tc := tc
t.Run(tc.TestName, func(t *testing.T) {
common := configuration.DataSourceCommonCfg{}
yaml.Unmarshal([]byte(tc.String), &common)
ds, err := DataSourceConfigure(common, configuration.METRICS_NONE)
cstest.RequireErrorContains(t, err, tc.ExpectedError)
if tc.ExpectedError != "" {
return
}
switch tc.TestName {
case "basic_valid_config":
mock := (*ds).Dump().(*MockSource)
assert.Equal(t, "test_value1", mock.Toto)
assert.Equal(t, "cat", mock.Mode)
assert.Equal(t, log.InfoLevel, mock.logger.Logger.Level)
assert.Equal(t, map[string]string{"test": "foobar"}, mock.Labels)
case "basic_debug_config":
mock := (*ds).Dump().(*MockSource)
assert.Equal(t, "test_value1", mock.Toto)
assert.Equal(t, "cat", mock.Mode)
assert.Equal(t, log.DebugLevel, mock.logger.Logger.Level)
assert.Equal(t, map[string]string{"test": "foobar"}, mock.Labels)
case "basic_tailmode_config":
mock := (*ds).Dump().(*MockSource)
assert.Equal(t, "test_value1", mock.Toto)
assert.Equal(t, "tail", mock.Mode)
assert.Equal(t, log.DebugLevel, mock.logger.Logger.Level)
assert.Equal(t, map[string]string{"test": "foobar"}, mock.Labels)
}
})
}
}
func TestLoadAcquisitionFromFile(t *testing.T) {
appendMockSource()
tests := []struct {
TestName string
Config csconfig.CrowdsecServiceCfg
ExpectedError string
ExpectedLen int
}{
{
TestName: "non_existent_file",
Config: csconfig.CrowdsecServiceCfg{
AcquisitionFiles: []string{"does_not_exist"},
},
ExpectedError: "open does_not_exist: " + cstest.FileNotFoundMessage,
ExpectedLen: 0,
},
{
TestName: "invalid_yaml_file",
Config: csconfig.CrowdsecServiceCfg{
AcquisitionFiles: []string{"test_files/badyaml.yaml"},
},
ExpectedError: "failed to yaml decode test_files/badyaml.yaml: yaml: unmarshal errors",
ExpectedLen: 0,
},
{
TestName: "invalid_empty_yaml",
Config: csconfig.CrowdsecServiceCfg{
AcquisitionFiles: []string{"test_files/emptyitem.yaml"},
},
ExpectedLen: 0,
},
{
TestName: "basic_valid",
Config: csconfig.CrowdsecServiceCfg{
AcquisitionFiles: []string{"test_files/basic_filemode.yaml"},
},
ExpectedLen: 2,
},
{
TestName: "missing_labels",
Config: csconfig.CrowdsecServiceCfg{
AcquisitionFiles: []string{"test_files/missing_labels.yaml"},
},
ExpectedError: "missing labels in test_files/missing_labels.yaml",
},
{
TestName: "backward_compat",
Config: csconfig.CrowdsecServiceCfg{
AcquisitionFiles: []string{"test_files/backward_compat.yaml"},
},
ExpectedLen: 2,
},
{
TestName: "bad_type",
Config: csconfig.CrowdsecServiceCfg{
AcquisitionFiles: []string{"test_files/bad_source.yaml"},
},
ExpectedError: "unknown data source does_not_exist in test_files/bad_source.yaml",
},
{
TestName: "invalid_filetype_config",
Config: csconfig.CrowdsecServiceCfg{
AcquisitionFiles: []string{"test_files/bad_filetype.yaml"},
},
ExpectedError: "while configuring datasource of type file from test_files/bad_filetype.yaml",
},
}
for _, tc := range tests {
tc := tc
t.Run(tc.TestName, func(t *testing.T) {
dss, err := LoadAcquisitionFromFile(&tc.Config, nil)
cstest.RequireErrorContains(t, err, tc.ExpectedError)
if tc.ExpectedError != "" {
return
}
assert.Len(t, dss, tc.ExpectedLen)
})
}
}
/*
test start acquisition :
- create mock parser in cat mode : start acquisition, check it returns, count items in chan
- create mock parser in tail mode : start acquisition, sleep, check item count, tomb kill it, wait for it to return
*/
type MockCat struct {
configuration.DataSourceCommonCfg `yaml:",inline"`
logger *log.Entry
}
func (f *MockCat) Configure(cfg []byte, logger *log.Entry, metricsLevel int) error {
f.logger = logger
if f.Mode == "" {
f.Mode = configuration.CAT_MODE
}
if f.Mode != configuration.CAT_MODE {
return fmt.Errorf("mode %s is not supported", f.Mode)
}
return nil
}
func (f *MockCat) UnmarshalConfig(cfg []byte) error { return nil }
func (f *MockCat) GetName() string { return "mock_cat" }
func (f *MockCat) GetMode() string { return "cat" }
func (f *MockCat) OneShotAcquisition(out chan types.Event, tomb *tomb.Tomb) error {
for i := 0; i < 10; i++ {
evt := types.Event{}
evt.Line.Src = "test"
out <- evt
}
return nil
}
func (f *MockCat) StreamingAcquisition(chan types.Event, *tomb.Tomb) error {
return fmt.Errorf("can't run in tail")
}
func (f *MockCat) CanRun() error { return nil }
func (f *MockCat) GetMetrics() []prometheus.Collector { return nil }
func (f *MockCat) GetAggregMetrics() []prometheus.Collector { return nil }
func (f *MockCat) Dump() interface{} { return f }
func (f *MockCat) ConfigureByDSN(string, map[string]string, *log.Entry, string) error {
return fmt.Errorf("not supported")
}
func (f *MockCat) GetUuid() string { return "" }
//----
type MockTail struct {
configuration.DataSourceCommonCfg `yaml:",inline"`
logger *log.Entry
}
func (f *MockTail) Configure(cfg []byte, logger *log.Entry, metricsLevel int) error {
f.logger = logger
if f.Mode == "" {
f.Mode = configuration.TAIL_MODE
}
if f.Mode != configuration.TAIL_MODE {
return fmt.Errorf("mode %s is not supported", f.Mode)
}
return nil
}
func (f *MockTail) UnmarshalConfig(cfg []byte) error { return nil }
func (f *MockTail) GetName() string { return "mock_tail" }
func (f *MockTail) GetMode() string { return "tail" }
func (f *MockTail) OneShotAcquisition(out chan types.Event, tomb *tomb.Tomb) error {
return fmt.Errorf("can't run in cat mode")
}
func (f *MockTail) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
for i := 0; i < 10; i++ {
evt := types.Event{}
evt.Line.Src = "test"
out <- evt
}
<-t.Dying()
return nil
}
func (f *MockTail) CanRun() error { return nil }
func (f *MockTail) GetMetrics() []prometheus.Collector { return nil }
func (f *MockTail) GetAggregMetrics() []prometheus.Collector { return nil }
func (f *MockTail) Dump() interface{} { return f }
func (f *MockTail) ConfigureByDSN(string, map[string]string, *log.Entry, string) error {
return fmt.Errorf("not supported")
}
func (f *MockTail) GetUuid() string { return "" }
//func StartAcquisition(sources []DataSource, output chan types.Event, AcquisTomb *tomb.Tomb) error {
func TestStartAcquisitionCat(t *testing.T) {
sources := []DataSource{
&MockCat{},
}
out := make(chan types.Event)
acquisTomb := tomb.Tomb{}
go func() {
if err := StartAcquisition(sources, out, &acquisTomb); err != nil {
t.Errorf("unexpected error")
}
}()
count := 0
READLOOP:
for {
select {
case <-out:
count++
case <-time.After(1 * time.Second):
break READLOOP
}
}
assert.Equal(t, 10, count)
}
func TestStartAcquisitionTail(t *testing.T) {
sources := []DataSource{
&MockTail{},
}
out := make(chan types.Event)
acquisTomb := tomb.Tomb{}
go func() {
if err := StartAcquisition(sources, out, &acquisTomb); err != nil {
t.Errorf("unexpected error")
}
}()
count := 0
READLOOP:
for {
select {
case <-out:
count++
case <-time.After(1 * time.Second):
break READLOOP
}
}
assert.Equal(t, 10, count)
acquisTomb.Kill(nil)
time.Sleep(1 * time.Second)
require.NoError(t, acquisTomb.Err(), "tomb is not dead")
}
type MockTailError struct {
MockTail
}
func (f *MockTailError) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
for i := 0; i < 10; i++ {
evt := types.Event{}
evt.Line.Src = "test"
out <- evt
}
t.Kill(fmt.Errorf("got error (tomb)"))
return fmt.Errorf("got error")
}
func TestStartAcquisitionTailError(t *testing.T) {
sources := []DataSource{
&MockTailError{},
}
out := make(chan types.Event)
acquisTomb := tomb.Tomb{}
go func() {
if err := StartAcquisition(sources, out, &acquisTomb); err != nil && err.Error() != "got error (tomb)" {
t.Errorf("expected error, got '%s'", err)
}
}()
count := 0
READLOOP:
for {
select {
case <-out:
count++
case <-time.After(1 * time.Second):
break READLOOP
}
}
assert.Equal(t, 10, count)
//acquisTomb.Kill(nil)
time.Sleep(1 * time.Second)
cstest.RequireErrorContains(t, acquisTomb.Err(), "got error (tomb)")
}
type MockSourceByDSN struct {
configuration.DataSourceCommonCfg `yaml:",inline"`
Toto string `yaml:"toto"`
logger *log.Entry //nolint: unused
}
func (f *MockSourceByDSN) UnmarshalConfig(cfg []byte) error { return nil }
func (f *MockSourceByDSN) Configure(cfg []byte, logger *log.Entry, metricsLevel int) error {
return nil
}
func (f *MockSourceByDSN) GetMode() string { return f.Mode }
func (f *MockSourceByDSN) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSourceByDSN) StreamingAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSourceByDSN) CanRun() error { return nil }
func (f *MockSourceByDSN) GetMetrics() []prometheus.Collector { return nil }
func (f *MockSourceByDSN) GetAggregMetrics() []prometheus.Collector { return nil }
func (f *MockSourceByDSN) Dump() interface{} { return f }
func (f *MockSourceByDSN) GetName() string { return "mockdsn" }
func (f *MockSourceByDSN) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
dsn = strings.TrimPrefix(dsn, "mockdsn://")
if dsn != "test_expect" {
return fmt.Errorf("unexpected value")
}
return nil
}
func (f *MockSourceByDSN) GetUuid() string { return "" }
func TestConfigureByDSN(t *testing.T) {
tests := []struct {
dsn string
ExpectedError string
ExpectedResLen int
}{
{
dsn: "baddsn",
ExpectedError: "baddsn isn't valid dsn (no protocol)",
},
{
dsn: "foobar://toto",
ExpectedError: "no acquisition for protocol foobar://",
},
{
dsn: "mockdsn://test_expect",
ExpectedResLen: 1,
},
{
dsn: "mockdsn://bad",
ExpectedError: "unexpected value",
},
}
if GetDataSourceIface("mockdsn") == nil {
AcquisitionSources["mockdsn"] = func() DataSource { return &MockSourceByDSN{} }
}
for _, tc := range tests {
tc := tc
t.Run(tc.dsn, func(t *testing.T) {
srcs, err := LoadAcquisitionFromDSN(tc.dsn, map[string]string{"type": "test_label"}, "")
cstest.RequireErrorContains(t, err, tc.ExpectedError)
assert.Len(t, srcs, tc.ExpectedResLen)
})
}
}