acquisition: validate datasources before configuration (static checks) (#1841)

* acquisition: validate datasources before configuration (allow static configuration checks)

* remove comment

* import reviser, format

* error wrap
This commit is contained in:
mmetc 2022-11-30 17:36:56 +01:00 committed by GitHub
parent f2528f3e29
commit 4a6a9c4355
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 361 additions and 251 deletions

View file

@ -1,11 +1,17 @@
package acquisition
import (
"errors"
"fmt"
"io"
"os"
"strings"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
tomb "gopkg.in/tomb.v2"
"gopkg.in/yaml.v2"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
cloudwatchacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/cloudwatch"
dockeracquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/docker"
@ -17,19 +23,14 @@ import (
wineventlogacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/wineventlog"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
tomb "gopkg.in/tomb.v2"
)
// The interface each datasource must implement
type DataSource interface {
GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module
GetAggregMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module (aggregated mode, limits cardinality)
Configure([]byte, *log.Entry) error // Configure the datasource
UnmarshalConfig([]byte) error // Decode and pre-validate the YAML datasource - anything that can be checked before runtime
Configure([]byte, *log.Entry) error // Complete the YAML datasource configuration and perform runtime checks.
ConfigureByDSN(string, map[string]string, *log.Entry) error // Configure the datasource
GetMode() string // Get the mode (TAIL, CAT or SERVER)
GetName() string // Get the name of the module
@ -39,66 +40,37 @@ type DataSource interface {
Dump() interface{}
}
var AcquisitionSources = []struct {
name string
iface func() DataSource
}{
{
name: "file",
iface: func() DataSource { return &fileacquisition.FileSource{} },
},
{
name: "journalctl",
iface: func() DataSource { return &journalctlacquisition.JournalCtlSource{} },
},
{
name: "cloudwatch",
iface: func() DataSource { return &cloudwatchacquisition.CloudwatchSource{} },
},
{
name: "syslog",
iface: func() DataSource { return &syslogacquisition.SyslogSource{} },
},
{
name: "docker",
iface: func() DataSource { return &dockeracquisition.DockerSource{} },
},
{
name: "kinesis",
iface: func() DataSource { return &kinesisacquisition.KinesisSource{} },
},
{
name: "wineventlog",
iface: func() DataSource { return &wineventlogacquisition.WinEventLogSource{} },
},
{
name: "kafka",
iface: func() DataSource { return &kafkaacquisition.KafkaSource{} },
},
var AcquisitionSources = map[string]func() DataSource{
"file": func() DataSource { return &fileacquisition.FileSource{} },
"journalctl": func() DataSource { return &journalctlacquisition.JournalCtlSource{} },
"cloudwatch": func() DataSource { return &cloudwatchacquisition.CloudwatchSource{} },
"syslog": func() DataSource { return &syslogacquisition.SyslogSource{} },
"docker": func() DataSource { return &dockeracquisition.DockerSource{} },
"kinesis": func() DataSource { return &kinesisacquisition.KinesisSource{} },
"wineventlog": func() DataSource { return &wineventlogacquisition.WinEventLogSource{} },
"kafka": func() DataSource { return &kafkaacquisition.KafkaSource{} },
}
func GetDataSourceIface(dataSourceType string) DataSource {
for _, source := range AcquisitionSources {
if source.name == dataSourceType {
return source.iface()
}
source := AcquisitionSources[dataSourceType]
if source == nil {
return nil
}
return nil
return source()
}
func DataSourceConfigure(commonConfig configuration.DataSourceCommonCfg) (*DataSource, error) {
//we dump it back to []byte, because we want to decode the yaml blob twice :
//once to DataSourceCommonCfg, and then later to the dedicated type of the datasource
// we dump it back to []byte, because we want to decode the yaml blob twice:
// once to DataSourceCommonCfg, and then later to the dedicated type of the datasource
yamlConfig, err := yaml.Marshal(commonConfig)
if err != nil {
return nil, errors.Wrap(err, "unable to marshal back interface")
return nil, fmt.Errorf("unable to marshal back interface: %w", err)
}
if dataSrc := GetDataSourceIface(commonConfig.Source); dataSrc != nil {
/* this logger will then be used by the datasource at runtime */
clog := log.New()
if err := types.ConfigureLogger(clog); err != nil {
return nil, errors.Wrap(err, "while configuring datasource logger")
return nil, fmt.Errorf("while configuring datasource logger: %w", err)
}
if commonConfig.LogLevel != nil {
clog.SetLevel(*commonConfig.LogLevel)
@ -112,11 +84,11 @@ func DataSourceConfigure(commonConfig configuration.DataSourceCommonCfg) (*DataS
subLogger := clog.WithFields(customLog)
/* check eventual dependencies are satisfied (ie. journald will check journalctl availability) */
if err := dataSrc.CanRun(); err != nil {
return nil, errors.Wrapf(err, "datasource %s cannot be run", commonConfig.Source)
return nil, fmt.Errorf("datasource %s cannot be run: %w", commonConfig.Source, err)
}
/* configure the actual datasource */
if err := dataSrc.Configure(yamlConfig, subLogger); err != nil {
return nil, errors.Wrapf(err, "failed to configure datasource %s", commonConfig.Source)
return nil, fmt.Errorf("failed to configure datasource %s: %w", commonConfig.Source, err)
}
return &dataSrc, nil
@ -124,9 +96,8 @@ func DataSourceConfigure(commonConfig configuration.DataSourceCommonCfg) (*DataS
return nil, fmt.Errorf("cannot find source %s", commonConfig.Source)
}
//detectBackwardCompatAcquis : try to magically detect the type for backward compat (type was not mandatory then)
// detectBackwardCompatAcquis: try to magically detect the type for backward compat (type was not mandatory then)
func detectBackwardCompatAcquis(sub configuration.DataSourceCommonCfg) string {
if _, ok := sub.Config["filename"]; ok {
return "file"
}
@ -153,14 +124,14 @@ func LoadAcquisitionFromDSN(dsn string, labels map[string]string) ([]DataSource,
/* this logger will then be used by the datasource at runtime */
clog := log.New()
if err := types.ConfigureLogger(clog); err != nil {
return nil, errors.Wrap(err, "while configuring datasource logger")
return nil, fmt.Errorf("while configuring datasource logger: %w", err)
}
subLogger := clog.WithFields(log.Fields{
"type": dsn,
})
err := dataSrc.ConfigureByDSN(dsn, labels, subLogger)
if err != nil {
return nil, errors.Wrapf(err, "while configuration datasource for %s", dsn)
return nil, fmt.Errorf("while configuration datasource for %s: %w", dsn, err)
}
sources = append(sources, dataSrc)
return sources, nil
@ -168,7 +139,6 @@ func LoadAcquisitionFromDSN(dsn string, labels map[string]string) ([]DataSource,
// LoadAcquisitionFromFile unmarshals the configuration item and checks its availability
func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource, error) {
var sources []DataSource
for _, acquisFile := range config.AcquisitionFiles {
@ -184,8 +154,8 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource,
var idx int
err = dec.Decode(&sub)
if err != nil {
if ! errors.Is(err, io.EOF) {
return nil, errors.Wrapf(err, "failed to yaml decode %s", acquisFile)
if !errors.Is(err, io.EOF) {
return nil, fmt.Errorf("failed to yaml decode %s: %w", acquisFile, err)
}
log.Tracef("End of yaml file")
break
@ -212,7 +182,7 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource,
}
src, err := DataSourceConfigure(sub)
if err != nil {
return nil, errors.Wrapf(err, "while configuring datasource of type %s from %s (position: %d)", sub.Source, acquisFile, idx)
return nil, fmt.Errorf("while configuring datasource of type %s from %s (position: %d): %w", sub.Source, acquisFile, idx, err)
}
sources = append(sources, *src)
idx += 1
@ -232,9 +202,9 @@ func GetMetrics(sources []DataSource, aggregated bool) error {
for _, metric := range metrics {
if err := prometheus.Register(metric); err != nil {
if _, ok := err.(prometheus.AlreadyRegisteredError); !ok {
return errors.Wrapf(err, "could not register metrics for datasource %s", sources[i].GetName())
return fmt.Errorf("could not register metrics for datasource %s: %w", sources[i].GetName(), err)
}
//ignore the error
// ignore the error
}
}

View file

@ -6,7 +6,6 @@ import (
"testing"
"time"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
@ -26,10 +25,19 @@ type MockSource struct {
logger *log.Entry
}
func (f *MockSource) UnmarshalConfig(cfg []byte) error {
err := yaml.UnmarshalStrict(cfg, &f)
if err != nil {
return err
}
return nil
}
func (f *MockSource) Configure(cfg []byte, logger *log.Entry) error {
f.logger = logger
if err := yaml.UnmarshalStrict(cfg, &f); err != nil {
return errors.Wrap(err, "while unmarshaling to reader specific config")
if err := f.UnmarshalConfig(cfg); err != nil {
return err
}
if f.Mode == "" {
f.Mode = configuration.CAT_MODE
@ -65,24 +73,10 @@ func (f *MockSourceCantRun) GetName() string { return "mock_cant_run" }
// appendMockSource is only used to add mock source for tests
func appendMockSource() {
if GetDataSourceIface("mock") == nil {
mock := struct {
name string
iface func() DataSource
}{
name: "mock",
iface: func() DataSource { return &MockSource{} },
}
AcquisitionSources = append(AcquisitionSources, mock)
AcquisitionSources["mock"] = func() DataSource { return &MockSource{} }
}
if GetDataSourceIface("mock_cant_run") == nil {
mock := struct {
name string
iface func() DataSource
}{
name: "mock_cant_run",
iface: func() DataSource { return &MockSourceCantRun{} },
}
AcquisitionSources = append(AcquisitionSources, mock)
AcquisitionSources["mock_cant_run"] = func() DataSource { return &MockSourceCantRun{} }
}
}
@ -313,8 +307,10 @@ func (f *MockCat) Configure(cfg []byte, logger *log.Entry) error {
}
return nil
}
func (f *MockCat) GetName() string { return "mock_cat" }
func (f *MockCat) GetMode() string { return "cat" }
func (f *MockCat) UnmarshalConfig(cfg []byte) error { return nil }
func (f *MockCat) GetName() string { return "mock_cat" }
func (f *MockCat) GetMode() string { return "cat" }
func (f *MockCat) OneShotAcquisition(out chan types.Event, tomb *tomb.Tomb) error {
for i := 0; i < 10; i++ {
evt := types.Event{}
@ -351,8 +347,10 @@ func (f *MockTail) Configure(cfg []byte, logger *log.Entry) error {
}
return nil
}
func (f *MockTail) GetName() string { return "mock_tail" }
func (f *MockTail) GetMode() string { return "tail" }
func (f *MockTail) UnmarshalConfig(cfg []byte) error { return nil }
func (f *MockTail) GetName() string { return "mock_tail" }
func (f *MockTail) GetMode() string { return "tail" }
func (f *MockTail) OneShotAcquisition(out chan types.Event, tomb *tomb.Tomb) error {
return fmt.Errorf("can't run in cat mode")
}
@ -482,6 +480,7 @@ type MockSourceByDSN struct {
logger *log.Entry //nolint: unused
}
func (f *MockSourceByDSN) UnmarshalConfig(cfg []byte) error { return nil }
func (f *MockSourceByDSN) Configure(cfg []byte, logger *log.Entry) error { return nil }
func (f *MockSourceByDSN) GetMode() string { return f.Mode }
func (f *MockSourceByDSN) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
@ -524,14 +523,7 @@ func TestConfigureByDSN(t *testing.T) {
}
if GetDataSourceIface("mockdsn") == nil {
mock := struct {
name string
iface func() DataSource
}{
name: "mockdsn",
iface: func() DataSource { return &MockSourceByDSN{} },
}
AcquisitionSources = append(AcquisitionSources, mock)
AcquisitionSources["mockdsn"] = func() DataSource { return &MockSourceByDSN{} }
}
for _, tc := range tests {

View file

@ -12,17 +12,17 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/parser"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
"gopkg.in/yaml.v2"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/parser"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
var openedStreams = prometheus.NewGaugeVec(
@ -101,61 +101,81 @@ var (
def_AwsConfigDir = ""
)
func (cw *CloudwatchSource) Configure(cfg []byte, logger *log.Entry) error {
cwConfig := CloudwatchSourceConfiguration{}
targetStream := "*"
if err := yaml.UnmarshalStrict(cfg, &cwConfig); err != nil {
return errors.Wrap(err, "Cannot parse CloudwatchSource configuration")
func (cw *CloudwatchSource) UnmarshalConfig(yamlConfig []byte) error {
cw.Config = CloudwatchSourceConfiguration{}
if err := yaml.UnmarshalStrict(yamlConfig, &cw.Config); err != nil {
return fmt.Errorf("cannot parse CloudwatchSource configuration: %w", err)
}
cw.Config = cwConfig
if len(cw.Config.GroupName) == 0 {
return fmt.Errorf("group_name is mandatory for CloudwatchSource")
}
cw.logger = logger.WithField("group", cw.Config.GroupName)
if cw.Config.Mode == "" {
cw.Config.Mode = configuration.TAIL_MODE
}
logger.Debugf("Starting configuration for Cloudwatch group %s", cw.Config.GroupName)
if cw.Config.DescribeLogStreamsLimit == nil {
cw.Config.DescribeLogStreamsLimit = &def_DescribeLogStreamsLimit
}
logger.Tracef("describelogstreams_limit set to %d", *cw.Config.DescribeLogStreamsLimit)
if cw.Config.PollNewStreamInterval == nil {
cw.Config.PollNewStreamInterval = &def_PollNewStreamInterval
}
logger.Tracef("poll_new_stream_interval set to %v", *cw.Config.PollNewStreamInterval)
if cw.Config.MaxStreamAge == nil {
cw.Config.MaxStreamAge = &def_MaxStreamAge
}
logger.Tracef("max_stream_age set to %v", *cw.Config.MaxStreamAge)
if cw.Config.PollStreamInterval == nil {
cw.Config.PollStreamInterval = &def_PollStreamInterval
}
logger.Tracef("poll_stream_interval set to %v", *cw.Config.PollStreamInterval)
if cw.Config.StreamReadTimeout == nil {
cw.Config.StreamReadTimeout = &def_StreamReadTimeout
}
logger.Tracef("stream_read_timeout set to %v", *cw.Config.StreamReadTimeout)
if cw.Config.GetLogEventsPagesLimit == nil {
cw.Config.GetLogEventsPagesLimit = &def_GetLogEventsPagesLimit
}
logger.Tracef("getlogeventspages_limit set to %v", *cw.Config.GetLogEventsPagesLimit)
if cw.Config.AwsApiCallTimeout == nil {
cw.Config.AwsApiCallTimeout = &def_AwsApiCallTimeout
}
logger.Tracef("aws_api_timeout set to %v", *cw.Config.AwsApiCallTimeout)
if *cw.Config.MaxStreamAge > *cw.Config.StreamReadTimeout {
logger.Warningf("max_stream_age > stream_read_timeout, stream might keep being opened/closed")
}
if cw.Config.AwsConfigDir == nil {
cw.Config.AwsConfigDir = &def_AwsConfigDir
}
logger.Tracef("aws_config_dir set to %s", *cw.Config.AwsConfigDir)
return nil
}
func (cw *CloudwatchSource) Configure(yamlConfig []byte, logger *log.Entry) error {
err := cw.UnmarshalConfig(yamlConfig)
if err != nil {
return err
}
cw.logger = logger.WithField("group", cw.Config.GroupName)
cw.logger.Debugf("Starting configuration for Cloudwatch group %s", cw.Config.GroupName)
cw.logger.Tracef("describelogstreams_limit set to %d", *cw.Config.DescribeLogStreamsLimit)
cw.logger.Tracef("poll_new_stream_interval set to %v", *cw.Config.PollNewStreamInterval)
cw.logger.Tracef("max_stream_age set to %v", *cw.Config.MaxStreamAge)
cw.logger.Tracef("poll_stream_interval set to %v", *cw.Config.PollStreamInterval)
cw.logger.Tracef("stream_read_timeout set to %v", *cw.Config.StreamReadTimeout)
cw.logger.Tracef("getlogeventspages_limit set to %v", *cw.Config.GetLogEventsPagesLimit)
cw.logger.Tracef("aws_api_timeout set to %v", *cw.Config.AwsApiCallTimeout)
if *cw.Config.MaxStreamAge > *cw.Config.StreamReadTimeout {
cw.logger.Warningf("max_stream_age > stream_read_timeout, stream might keep being opened/closed")
}
cw.logger.Tracef("aws_config_dir set to %s", *cw.Config.AwsConfigDir)
if *cw.Config.AwsConfigDir != "" {
_, err := os.Stat(*cw.Config.AwsConfigDir)
if err != nil {
logger.Errorf("can't read aws_config_dir '%s' got err %s", *cw.Config.AwsConfigDir, err)
cw.logger.Errorf("can't read aws_config_dir '%s' got err %s", *cw.Config.AwsConfigDir, err)
return fmt.Errorf("can't read aws_config_dir %s got err %s ", *cw.Config.AwsConfigDir, err)
}
os.Setenv("AWS_SDK_LOAD_CONFIG", "1")
@ -164,7 +184,7 @@ func (cw *CloudwatchSource) Configure(cfg []byte, logger *log.Entry) error {
os.Setenv("AWS_SHARED_CREDENTIALS_FILE", fmt.Sprintf("%s/credentials", *cw.Config.AwsConfigDir))
} else {
if cw.Config.AwsRegion == nil {
logger.Errorf("aws_region is not specified, specify it or aws_config_dir")
cw.logger.Errorf("aws_region is not specified, specify it or aws_config_dir")
return fmt.Errorf("aws_region is not specified, specify it or aws_config_dir")
}
os.Setenv("AWS_REGION", *cw.Config.AwsRegion)
@ -174,6 +194,8 @@ func (cw *CloudwatchSource) Configure(cfg []byte, logger *log.Entry) error {
return err
}
cw.streamIndexes = make(map[string]string)
targetStream := "*"
if cw.Config.StreamRegexp != nil {
if _, err := regexp.Compile(*cw.Config.StreamRegexp); err != nil {
return errors.Wrapf(err, "error while compiling regexp '%s'", *cw.Config.StreamRegexp)
@ -183,7 +205,7 @@ func (cw *CloudwatchSource) Configure(cfg []byte, logger *log.Entry) error {
targetStream = *cw.Config.StreamName
}
logger.Infof("Adding cloudwatch group '%s' (stream:%s) to datasources", cw.Config.GroupName, targetStream)
cw.logger.Infof("Adding cloudwatch group '%s' (stream:%s) to datasources", cw.Config.GroupName, targetStream)
return nil
}

View file

@ -10,18 +10,19 @@ import (
"strings"
"time"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/crowdsecurity/dlog"
dockerTypes "github.com/docker/docker/api/types"
"github.com/docker/docker/client"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
"gopkg.in/yaml.v2"
"github.com/crowdsecurity/dlog"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
var linesRead = prometheus.NewCounterVec(
@ -67,24 +68,22 @@ type ContainerConfig struct {
Tty bool
}
func (d *DockerSource) Configure(Config []byte, logger *log.Entry) error {
var err error
func (d *DockerSource) UnmarshalConfig(yamlConfig []byte) error {
d.Config = DockerConfiguration{
FollowStdout: true, // default
FollowStdErr: true, // default
CheckInterval: "1s", // default
}
d.logger = logger
d.runningContainerState = make(map[string]*ContainerConfig)
err = yaml.UnmarshalStrict(Config, &d.Config)
err := yaml.UnmarshalStrict(yamlConfig, &d.Config)
if err != nil {
return errors.Wrap(err, "Cannot parse DockerAcquisition configuration")
}
d.logger.Tracef("DockerAcquisition configuration: %+v", d.Config)
if d.logger != nil {
d.logger.Tracef("DockerAcquisition configuration: %+v", d.Config)
}
if len(d.Config.ContainerName) == 0 && len(d.Config.ContainerID) == 0 && len(d.Config.ContainerIDRegexp) == 0 && len(d.Config.ContainerNameRegexp) == 0 {
return fmt.Errorf("no containers names or containers ID configuration provided")
}
@ -100,7 +99,6 @@ func (d *DockerSource) Configure(Config []byte, logger *log.Entry) error {
if d.Config.Mode != configuration.CAT_MODE && d.Config.Mode != configuration.TAIL_MODE {
return fmt.Errorf("unsupported mode %s for docker datasource", d.Config.Mode)
}
d.logger.Tracef("Actual DockerAcquisition configuration %+v", d.Config)
for _, cont := range d.Config.ContainerNameRegexp {
d.compiledContainerName = append(d.compiledContainerName, regexp.MustCompile(cont))
@ -110,11 +108,6 @@ func (d *DockerSource) Configure(Config []byte, logger *log.Entry) error {
d.compiledContainerID = append(d.compiledContainerID, regexp.MustCompile(cont))
}
dockerClient, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
return err
}
if d.Config.Since == "" {
d.Config.Since = time.Now().UTC().Format(time.RFC3339)
}
@ -130,17 +123,37 @@ func (d *DockerSource) Configure(Config []byte, logger *log.Entry) error {
d.containerLogsOptions.Until = d.Config.Until
}
return nil
}
func (d *DockerSource) Configure(yamlConfig []byte, logger *log.Entry) error {
d.logger = logger
err := d.UnmarshalConfig(yamlConfig)
if err != nil {
return err
}
d.runningContainerState = make(map[string]*ContainerConfig)
d.logger.Tracef("Actual DockerAcquisition configuration %+v", d.Config)
dockerClient, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
return err
}
if d.Config.DockerHost != "" {
if err := client.WithHost(d.Config.DockerHost)(dockerClient); err != nil {
err = client.WithHost(d.Config.DockerHost)(dockerClient)
if err != nil {
return err
}
}
d.Client = dockerClient
_, err = d.Client.Info(context.Background())
if err != nil {
return errors.Wrapf(err, "failed to configure docker datasource %s", d.Config.DockerHost)
return fmt.Errorf("failed to configure docker datasource %s: %w", d.Config.DockerHost, err)
}
return nil

View file

@ -13,9 +13,6 @@ import (
"strings"
"time"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/fsnotify/fsnotify"
"github.com/nxadm/tail"
"github.com/pkg/errors"
@ -23,6 +20,10 @@ import (
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
"gopkg.in/yaml.v2"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
var linesRead = prometheus.NewCounterVec(
@ -50,41 +51,62 @@ type FileSource struct {
exclude_regexps []*regexp.Regexp
}
func (f *FileSource) Configure(Config []byte, logger *log.Entry) error {
fileConfig := FileConfiguration{}
f.logger = logger
f.watchedDirectories = make(map[string]bool)
f.tails = make(map[string]bool)
err := yaml.UnmarshalStrict(Config, &fileConfig)
func (f *FileSource) UnmarshalConfig(yamlConfig []byte) error {
f.config = FileConfiguration{}
err := yaml.UnmarshalStrict(yamlConfig, &f.config)
if err != nil {
return errors.Wrap(err, "Cannot parse FileAcquisition configuration")
return fmt.Errorf("cannot parse FileAcquisition configuration: %w", err)
}
f.logger.Tracef("FileAcquisition configuration: %+v", fileConfig)
if len(fileConfig.Filename) != 0 {
fileConfig.Filenames = append(fileConfig.Filenames, fileConfig.Filename)
if f.logger != nil {
f.logger.Tracef("FileAcquisition configuration: %+v", f.config)
}
if len(fileConfig.Filenames) == 0 {
if len(f.config.Filename) != 0 {
f.config.Filenames = append(f.config.Filenames, f.config.Filename)
}
if len(f.config.Filenames) == 0 {
return fmt.Errorf("no filename or filenames configuration provided")
}
f.config = fileConfig
if f.config.Mode == "" {
f.config.Mode = configuration.TAIL_MODE
}
if f.config.Mode != configuration.CAT_MODE && f.config.Mode != configuration.TAIL_MODE {
return fmt.Errorf("unsupported mode %s for file source", f.config.Mode)
}
for _, exclude := range f.config.ExcludeRegexps {
re, err := regexp.Compile(exclude)
if err != nil {
return fmt.Errorf("could not compile regexp %s: %w", exclude, err)
}
f.exclude_regexps = append(f.exclude_regexps, re)
}
return nil
}
func (f *FileSource) Configure(yamlConfig []byte, logger *log.Entry) error {
f.logger = logger
err := f.UnmarshalConfig(yamlConfig)
if err != nil {
return err
}
f.watchedDirectories = make(map[string]bool)
f.tails = make(map[string]bool)
f.watcher, err = fsnotify.NewWatcher()
if err != nil {
return errors.Wrapf(err, "Could not create fsnotify watcher")
}
for _, exclude := range f.config.ExcludeRegexps {
re, err := regexp.Compile(exclude)
if err != nil {
return errors.Wrapf(err, "Could not compile regexp %s", exclude)
}
f.exclude_regexps = append(f.exclude_regexps, re)
}
f.logger.Tracef("Actual FileAcquisition Configuration %+v", f.config)
for _, pattern := range f.config.Filenames {
if f.config.ForceInotify {
directory := filepath.Dir(pattern)

View file

@ -7,14 +7,15 @@ import (
"testing"
"time"
fileacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/file"
"github.com/crowdsecurity/crowdsec/pkg/cstest"
"github.com/crowdsecurity/crowdsec/pkg/types"
log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/tomb.v2"
fileacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/file"
"github.com/crowdsecurity/crowdsec/pkg/cstest"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
func TestBadConfiguration(t *testing.T) {
@ -42,7 +43,7 @@ func TestBadConfiguration(t *testing.T) {
name: "bad exclude regexp",
config: `filenames: ["asd.log"]
exclude_regexps: ["as[a-$d"]`,
expectedErr: "Could not compile regexp as",
expectedErr: "could not compile regexp as",
},
}

View file

@ -9,15 +9,15 @@ import (
"strings"
"time"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
"gopkg.in/yaml.v2"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
type JournalCtlConfiguration struct {
@ -163,28 +163,41 @@ func (j *JournalCtlSource) GetAggregMetrics() []prometheus.Collector {
return []prometheus.Collector{linesRead}
}
func (j *JournalCtlSource) Configure(yamlConfig []byte, logger *log.Entry) error {
config := JournalCtlConfiguration{}
j.logger = logger
err := yaml.UnmarshalStrict(yamlConfig, &config)
func (j *JournalCtlSource) UnmarshalConfig(yamlConfig []byte) error {
j.config = JournalCtlConfiguration{}
err := yaml.UnmarshalStrict(yamlConfig, &j.config)
if err != nil {
return errors.Wrap(err, "Cannot parse JournalCtlSource configuration")
return fmt.Errorf("cannot parse JournalCtlSource configuration: %w", err)
}
if config.Mode == "" {
config.Mode = configuration.TAIL_MODE
if j.config.Mode == "" {
j.config.Mode = configuration.TAIL_MODE
}
var args []string
if config.Mode == configuration.TAIL_MODE {
if j.config.Mode == configuration.TAIL_MODE {
args = journalctlArgstreaming
} else {
args = journalctlArgsOneShot
}
if len(config.Filters) == 0 {
if len(j.config.Filters) == 0 {
return fmt.Errorf("journalctl_filter is required")
}
j.args = append(args, config.Filters...)
j.src = fmt.Sprintf("journalctl-%s", strings.Join(config.Filters, "."))
j.config = config
j.args = append(args, j.config.Filters...)
j.src = fmt.Sprintf("journalctl-%s", strings.Join(j.config.Filters, "."))
return nil
}
func (j *JournalCtlSource) Configure(yamlConfig []byte, logger *log.Entry) error {
j.logger = logger
err := j.UnmarshalConfig(yamlConfig)
if err != nil {
return err
}
return nil
}

View file

@ -10,15 +10,16 @@ import (
"strconv"
"time"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/segmentio/kafka-go"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
"gopkg.in/yaml.v2"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
var (
@ -54,35 +55,51 @@ type KafkaSource struct {
Reader *kafka.Reader
}
func (k *KafkaSource) Configure(Config []byte, logger *log.Entry) error {
var err error
func (k *KafkaSource) UnmarshalConfig(yamlConfig []byte) error {
k.Config = KafkaConfiguration{}
k.logger = logger
err = yaml.UnmarshalStrict(Config, &k.Config)
err := yaml.UnmarshalStrict(yamlConfig, &k.Config)
if err != nil {
return errors.Wrapf(err, "cannot parse %s datasource configuration", dataSourceName)
return fmt.Errorf("cannot parse %s datasource configuration: %w", dataSourceName, err)
}
if len(k.Config.Brokers) == 0 {
return fmt.Errorf("cannot create a %s reader with an empty list of broker addresses", dataSourceName)
}
if k.Config.Topic == "" {
return fmt.Errorf("cannot create a %s reader with am empty topic", dataSourceName)
}
if k.Config.Mode == "" {
k.Config.Mode = configuration.TAIL_MODE
}
return err
}
func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry) error {
k.logger = logger
err := k.UnmarshalConfig(yamlConfig)
if err != nil {
return err
}
dialer, err := k.Config.NewDialer()
if err != nil {
return errors.Wrapf(err, "cannot create %s dialer", dataSourceName)
}
k.Reader, err = k.Config.NewReader(dialer)
if err != nil {
return errors.Wrapf(err, "cannote create %s reader", dataSourceName)
}
if k.Reader == nil {
return fmt.Errorf("cannot create %s reader", dataSourceName)
}
return nil
}

View file

@ -13,14 +13,15 @@ import (
"github.com/aws/aws-sdk-go/aws/arn"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
"gopkg.in/yaml.v2"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
type KinesisConfiguration struct {
@ -113,17 +114,18 @@ func (k *KinesisSource) GetAggregMetrics() []prometheus.Collector {
return []prometheus.Collector{linesRead, linesReadShards}
}
func (k *KinesisSource) Configure(yamlConfig []byte, logger *log.Entry) error {
config := KinesisConfiguration{}
k.logger = logger
err := yaml.UnmarshalStrict(yamlConfig, &config)
func (k *KinesisSource) UnmarshalConfig(yamlConfig []byte) error {
k.Config = KinesisConfiguration{}
err := yaml.UnmarshalStrict(yamlConfig, &k.Config)
if err != nil {
return errors.Wrap(err, "Cannot parse kinesis datasource configuration")
}
if config.Mode == "" {
config.Mode = configuration.TAIL_MODE
if k.Config.Mode == "" {
k.Config.Mode = configuration.TAIL_MODE
}
k.Config = config
if k.Config.StreamName == "" && !k.Config.UseEnhancedFanOut {
return fmt.Errorf("stream_name is mandatory when use_enhanced_fanout is false")
}
@ -139,10 +141,23 @@ func (k *KinesisSource) Configure(yamlConfig []byte, logger *log.Entry) error {
if k.Config.MaxRetries <= 0 {
k.Config.MaxRetries = 10
}
return nil
}
func (k *KinesisSource) Configure(yamlConfig []byte, logger *log.Entry) error {
k.logger = logger
err := k.UnmarshalConfig(yamlConfig)
if err != nil {
return err
}
err = k.newClient()
if err != nil {
return errors.Wrap(err, "Cannot create kinesis client")
return fmt.Errorf("cannot create kinesis client: %w", err)
}
k.shardReaderTomb = &tomb.Tomb{}
return nil
}

View file

@ -6,18 +6,18 @@ import (
"strings"
"time"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
"gopkg.in/yaml.v2"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog/internal/parser/rfc3164"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog/internal/parser/rfc5424"
syslogserver "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog/internal/server"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
"gopkg.in/yaml.v2"
)
type SyslogConfiguration struct {
@ -89,31 +89,43 @@ func validateAddr(addr string) bool {
return net.ParseIP(addr) != nil
}
func (s *SyslogSource) Configure(yamlConfig []byte, logger *log.Entry) error {
s.logger = logger
s.logger.Infof("Starting syslog datasource configuration")
syslogConfig := SyslogConfiguration{}
syslogConfig.Mode = configuration.TAIL_MODE
err := yaml.UnmarshalStrict(yamlConfig, &syslogConfig)
func (s *SyslogSource) UnmarshalConfig(yamlConfig []byte) error {
s.config = SyslogConfiguration{}
s.config.Mode = configuration.TAIL_MODE
err := yaml.UnmarshalStrict(yamlConfig, &s.config)
if err != nil {
return errors.Wrap(err, "Cannot parse syslog configuration")
}
if syslogConfig.Addr == "" {
syslogConfig.Addr = "127.0.0.1" //do we want a usable or secure default ?
if s.config.Addr == "" {
s.config.Addr = "127.0.0.1" //do we want a usable or secure default ?
}
if syslogConfig.Port == 0 {
syslogConfig.Port = 514
if s.config.Port == 0 {
s.config.Port = 514
}
if syslogConfig.MaxMessageLen == 0 {
syslogConfig.MaxMessageLen = 2048
if s.config.MaxMessageLen == 0 {
s.config.MaxMessageLen = 2048
}
if !validatePort(syslogConfig.Port) {
return fmt.Errorf("invalid port %d", syslogConfig.Port)
if !validatePort(s.config.Port) {
return fmt.Errorf("invalid port %d", s.config.Port)
}
if !validateAddr(syslogConfig.Addr) {
return fmt.Errorf("invalid listen IP %s", syslogConfig.Addr)
if !validateAddr(s.config.Addr) {
return fmt.Errorf("invalid listen IP %s", s.config.Addr)
}
s.config = syslogConfig
return nil
}
func (s *SyslogSource) Configure(yamlConfig []byte, logger *log.Entry) error {
s.logger = logger
s.logger.Infof("Starting syslog datasource configuration")
err := s.UnmarshalConfig(yamlConfig)
if err != nil {
return err
}
return nil
}

View file

@ -5,15 +5,20 @@ package wineventlogacquisition
import (
"errors"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
type WinEventLogSource struct{}
func (w *WinEventLogSource) UnmarshalConfig(yamlConfig []byte) error {
return nil
}
func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry) error {
return nil
}

View file

@ -9,9 +9,6 @@ import (
"syscall"
"time"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/google/winops/winlog"
"github.com/google/winops/winlog/wevtapi"
"github.com/prometheus/client_golang/prometheus"
@ -19,6 +16,10 @@ import (
"golang.org/x/sys/windows"
"gopkg.in/tomb.v2"
"gopkg.in/yaml.v2"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
type WinEventLogConfiguration struct {
@ -228,29 +229,26 @@ func (w *WinEventLogSource) generateConfig(query string) (*winlog.SubscribeConfi
return &config, nil
}
func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry) error {
config := WinEventLogConfiguration{}
w.logger = logger
err := yaml.UnmarshalStrict(yamlConfig, &config)
func (w *WinEventLogSource) UnmarshalConfig(yamlConfig []byte) error {
w.config = WinEventLogConfiguration{}
err := yaml.UnmarshalStrict(yamlConfig, &w.config)
if err != nil {
return fmt.Errorf("unable to parse configuration: %v", err)
}
if config.EventChannel != "" && config.XPathQuery != "" {
if w.config.EventChannel != "" && w.config.XPathQuery != "" {
return fmt.Errorf("event_channel and xpath_query are mutually exclusive")
}
if config.EventChannel == "" && config.XPathQuery == "" {
if w.config.EventChannel == "" && w.config.XPathQuery == "" {
return fmt.Errorf("event_channel or xpath_query must be set")
}
config.Mode = configuration.TAIL_MODE
w.config = config
w.config.Mode = configuration.TAIL_MODE
if config.XPathQuery != "" {
w.query = config.XPathQuery
if w.config.XPathQuery != "" {
w.query = w.config.XPathQuery
} else {
w.query, err = w.buildXpathQuery()
if err != nil {
@ -258,15 +256,26 @@ func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry) erro
}
}
w.evtConfig, err = w.generateConfig(w.query)
if w.config.PrettyName != "" {
w.name = w.config.PrettyName
} else {
w.name = w.query
}
return nil
}
func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry) error {
w.logger = logger
err := w.UnmarshalConfig(yamlConfig)
if err != nil {
return err
}
if config.PrettyName != "" {
w.name = config.PrettyName
} else {
w.name = w.query
w.evtConfig, err = w.generateConfig(w.query)
if err != nil {
return err
}
return nil

View file

@ -1,7 +1,9 @@
package cstest
import (
"strings"
"testing"
"text/template"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -28,3 +30,20 @@ func RequireErrorContains(t *testing.T, err error, expectedErr string) {
require.NoError(t, err)
}
// Interpolate fills a string template with the given values, can be map or struct.
// example: Interpolate("{{.Name}}", map[string]string{"Name": "JohnDoe"})
func Interpolate(s string, data interface{}) (string, error) {
tmpl, err := template.New("").Parse(s)
if err != nil {
return "", err
}
var b strings.Builder
err = tmpl.Execute(&b, data)
if err != nil {
return "", err
}
return b.String(), nil
}