diff --git a/pkg/acquisition/acquisition.go b/pkg/acquisition/acquisition.go index 43093a503..e0df38ff2 100644 --- a/pkg/acquisition/acquisition.go +++ b/pkg/acquisition/acquisition.go @@ -1,11 +1,17 @@ package acquisition import ( + "errors" "fmt" "io" "os" "strings" + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" + tomb "gopkg.in/tomb.v2" + "gopkg.in/yaml.v2" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" cloudwatchacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/cloudwatch" dockeracquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/docker" @@ -17,19 +23,14 @@ import ( wineventlogacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/wineventlog" "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/types" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - log "github.com/sirupsen/logrus" - "gopkg.in/yaml.v2" - - tomb "gopkg.in/tomb.v2" ) // The interface each datasource must implement type DataSource interface { GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module GetAggregMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module (aggregated mode, limits cardinality) - Configure([]byte, *log.Entry) error // Configure the datasource + UnmarshalConfig([]byte) error // Decode and pre-validate the YAML datasource - anything that can be checked before runtime + Configure([]byte, *log.Entry) error // Complete the YAML datasource configuration and perform runtime checks. ConfigureByDSN(string, map[string]string, *log.Entry) error // Configure the datasource GetMode() string // Get the mode (TAIL, CAT or SERVER) GetName() string // Get the name of the module @@ -39,66 +40,37 @@ type DataSource interface { Dump() interface{} } -var AcquisitionSources = []struct { - name string - iface func() DataSource -}{ - { - name: "file", - iface: func() DataSource { return &fileacquisition.FileSource{} }, - }, - { - name: "journalctl", - iface: func() DataSource { return &journalctlacquisition.JournalCtlSource{} }, - }, - { - name: "cloudwatch", - iface: func() DataSource { return &cloudwatchacquisition.CloudwatchSource{} }, - }, - { - name: "syslog", - iface: func() DataSource { return &syslogacquisition.SyslogSource{} }, - }, - { - name: "docker", - iface: func() DataSource { return &dockeracquisition.DockerSource{} }, - }, - { - name: "kinesis", - iface: func() DataSource { return &kinesisacquisition.KinesisSource{} }, - }, - { - name: "wineventlog", - iface: func() DataSource { return &wineventlogacquisition.WinEventLogSource{} }, - }, - { - name: "kafka", - iface: func() DataSource { return &kafkaacquisition.KafkaSource{} }, - }, +var AcquisitionSources = map[string]func() DataSource{ + "file": func() DataSource { return &fileacquisition.FileSource{} }, + "journalctl": func() DataSource { return &journalctlacquisition.JournalCtlSource{} }, + "cloudwatch": func() DataSource { return &cloudwatchacquisition.CloudwatchSource{} }, + "syslog": func() DataSource { return &syslogacquisition.SyslogSource{} }, + "docker": func() DataSource { return &dockeracquisition.DockerSource{} }, + "kinesis": func() DataSource { return &kinesisacquisition.KinesisSource{} }, + "wineventlog": func() DataSource { return &wineventlogacquisition.WinEventLogSource{} }, + "kafka": func() DataSource { return &kafkaacquisition.KafkaSource{} }, } func GetDataSourceIface(dataSourceType string) DataSource { - for _, source := range AcquisitionSources { - if source.name == dataSourceType { - return source.iface() - } + source := AcquisitionSources[dataSourceType] + if source == nil { + return nil } - return nil + return source() } func DataSourceConfigure(commonConfig configuration.DataSourceCommonCfg) (*DataSource, error) { - - //we dump it back to []byte, because we want to decode the yaml blob twice : - //once to DataSourceCommonCfg, and then later to the dedicated type of the datasource + // we dump it back to []byte, because we want to decode the yaml blob twice: + // once to DataSourceCommonCfg, and then later to the dedicated type of the datasource yamlConfig, err := yaml.Marshal(commonConfig) if err != nil { - return nil, errors.Wrap(err, "unable to marshal back interface") + return nil, fmt.Errorf("unable to marshal back interface: %w", err) } if dataSrc := GetDataSourceIface(commonConfig.Source); dataSrc != nil { /* this logger will then be used by the datasource at runtime */ clog := log.New() if err := types.ConfigureLogger(clog); err != nil { - return nil, errors.Wrap(err, "while configuring datasource logger") + return nil, fmt.Errorf("while configuring datasource logger: %w", err) } if commonConfig.LogLevel != nil { clog.SetLevel(*commonConfig.LogLevel) @@ -112,11 +84,11 @@ func DataSourceConfigure(commonConfig configuration.DataSourceCommonCfg) (*DataS subLogger := clog.WithFields(customLog) /* check eventual dependencies are satisfied (ie. journald will check journalctl availability) */ if err := dataSrc.CanRun(); err != nil { - return nil, errors.Wrapf(err, "datasource %s cannot be run", commonConfig.Source) + return nil, fmt.Errorf("datasource %s cannot be run: %w", commonConfig.Source, err) } /* configure the actual datasource */ if err := dataSrc.Configure(yamlConfig, subLogger); err != nil { - return nil, errors.Wrapf(err, "failed to configure datasource %s", commonConfig.Source) + return nil, fmt.Errorf("failed to configure datasource %s: %w", commonConfig.Source, err) } return &dataSrc, nil @@ -124,9 +96,8 @@ func DataSourceConfigure(commonConfig configuration.DataSourceCommonCfg) (*DataS return nil, fmt.Errorf("cannot find source %s", commonConfig.Source) } -//detectBackwardCompatAcquis : try to magically detect the type for backward compat (type was not mandatory then) +// detectBackwardCompatAcquis: try to magically detect the type for backward compat (type was not mandatory then) func detectBackwardCompatAcquis(sub configuration.DataSourceCommonCfg) string { - if _, ok := sub.Config["filename"]; ok { return "file" } @@ -153,14 +124,14 @@ func LoadAcquisitionFromDSN(dsn string, labels map[string]string) ([]DataSource, /* this logger will then be used by the datasource at runtime */ clog := log.New() if err := types.ConfigureLogger(clog); err != nil { - return nil, errors.Wrap(err, "while configuring datasource logger") + return nil, fmt.Errorf("while configuring datasource logger: %w", err) } subLogger := clog.WithFields(log.Fields{ "type": dsn, }) err := dataSrc.ConfigureByDSN(dsn, labels, subLogger) if err != nil { - return nil, errors.Wrapf(err, "while configuration datasource for %s", dsn) + return nil, fmt.Errorf("while configuration datasource for %s: %w", dsn, err) } sources = append(sources, dataSrc) return sources, nil @@ -168,7 +139,6 @@ func LoadAcquisitionFromDSN(dsn string, labels map[string]string) ([]DataSource, // LoadAcquisitionFromFile unmarshals the configuration item and checks its availability func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource, error) { - var sources []DataSource for _, acquisFile := range config.AcquisitionFiles { @@ -184,8 +154,8 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource, var idx int err = dec.Decode(&sub) if err != nil { - if ! errors.Is(err, io.EOF) { - return nil, errors.Wrapf(err, "failed to yaml decode %s", acquisFile) + if !errors.Is(err, io.EOF) { + return nil, fmt.Errorf("failed to yaml decode %s: %w", acquisFile, err) } log.Tracef("End of yaml file") break @@ -212,7 +182,7 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource, } src, err := DataSourceConfigure(sub) if err != nil { - return nil, errors.Wrapf(err, "while configuring datasource of type %s from %s (position: %d)", sub.Source, acquisFile, idx) + return nil, fmt.Errorf("while configuring datasource of type %s from %s (position: %d): %w", sub.Source, acquisFile, idx, err) } sources = append(sources, *src) idx += 1 @@ -232,9 +202,9 @@ func GetMetrics(sources []DataSource, aggregated bool) error { for _, metric := range metrics { if err := prometheus.Register(metric); err != nil { if _, ok := err.(prometheus.AlreadyRegisteredError); !ok { - return errors.Wrapf(err, "could not register metrics for datasource %s", sources[i].GetName()) + return fmt.Errorf("could not register metrics for datasource %s: %w", sources[i].GetName(), err) } - //ignore the error + // ignore the error } } diff --git a/pkg/acquisition/acquisition_test.go b/pkg/acquisition/acquisition_test.go index a547970a8..db4ad432e 100644 --- a/pkg/acquisition/acquisition_test.go +++ b/pkg/acquisition/acquisition_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -26,10 +25,19 @@ type MockSource struct { logger *log.Entry } +func (f *MockSource) UnmarshalConfig(cfg []byte) error { + err := yaml.UnmarshalStrict(cfg, &f) + if err != nil { + return err + } + + return nil +} + func (f *MockSource) Configure(cfg []byte, logger *log.Entry) error { f.logger = logger - if err := yaml.UnmarshalStrict(cfg, &f); err != nil { - return errors.Wrap(err, "while unmarshaling to reader specific config") + if err := f.UnmarshalConfig(cfg); err != nil { + return err } if f.Mode == "" { f.Mode = configuration.CAT_MODE @@ -65,24 +73,10 @@ func (f *MockSourceCantRun) GetName() string { return "mock_cant_run" } // appendMockSource is only used to add mock source for tests func appendMockSource() { if GetDataSourceIface("mock") == nil { - mock := struct { - name string - iface func() DataSource - }{ - name: "mock", - iface: func() DataSource { return &MockSource{} }, - } - AcquisitionSources = append(AcquisitionSources, mock) + AcquisitionSources["mock"] = func() DataSource { return &MockSource{} } } if GetDataSourceIface("mock_cant_run") == nil { - mock := struct { - name string - iface func() DataSource - }{ - name: "mock_cant_run", - iface: func() DataSource { return &MockSourceCantRun{} }, - } - AcquisitionSources = append(AcquisitionSources, mock) + AcquisitionSources["mock_cant_run"] = func() DataSource { return &MockSourceCantRun{} } } } @@ -313,8 +307,10 @@ func (f *MockCat) Configure(cfg []byte, logger *log.Entry) error { } return nil } -func (f *MockCat) GetName() string { return "mock_cat" } -func (f *MockCat) GetMode() string { return "cat" } + +func (f *MockCat) UnmarshalConfig(cfg []byte) error { return nil } +func (f *MockCat) GetName() string { return "mock_cat" } +func (f *MockCat) GetMode() string { return "cat" } func (f *MockCat) OneShotAcquisition(out chan types.Event, tomb *tomb.Tomb) error { for i := 0; i < 10; i++ { evt := types.Event{} @@ -351,8 +347,10 @@ func (f *MockTail) Configure(cfg []byte, logger *log.Entry) error { } return nil } -func (f *MockTail) GetName() string { return "mock_tail" } -func (f *MockTail) GetMode() string { return "tail" } + +func (f *MockTail) UnmarshalConfig(cfg []byte) error { return nil } +func (f *MockTail) GetName() string { return "mock_tail" } +func (f *MockTail) GetMode() string { return "tail" } func (f *MockTail) OneShotAcquisition(out chan types.Event, tomb *tomb.Tomb) error { return fmt.Errorf("can't run in cat mode") } @@ -482,6 +480,7 @@ type MockSourceByDSN struct { logger *log.Entry //nolint: unused } +func (f *MockSourceByDSN) UnmarshalConfig(cfg []byte) error { return nil } func (f *MockSourceByDSN) Configure(cfg []byte, logger *log.Entry) error { return nil } func (f *MockSourceByDSN) GetMode() string { return f.Mode } func (f *MockSourceByDSN) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil } @@ -524,14 +523,7 @@ func TestConfigureByDSN(t *testing.T) { } if GetDataSourceIface("mockdsn") == nil { - mock := struct { - name string - iface func() DataSource - }{ - name: "mockdsn", - iface: func() DataSource { return &MockSourceByDSN{} }, - } - AcquisitionSources = append(AcquisitionSources, mock) + AcquisitionSources["mockdsn"] = func() DataSource { return &MockSourceByDSN{} } } for _, tc := range tests { diff --git a/pkg/acquisition/modules/cloudwatch/cloudwatch.go b/pkg/acquisition/modules/cloudwatch/cloudwatch.go index 9a3a66d5e..862285b80 100644 --- a/pkg/acquisition/modules/cloudwatch/cloudwatch.go +++ b/pkg/acquisition/modules/cloudwatch/cloudwatch.go @@ -12,17 +12,17 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" - "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" - leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" - "github.com/crowdsecurity/crowdsec/pkg/parser" - "github.com/crowdsecurity/crowdsec/pkg/types" + "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "gopkg.in/tomb.v2" "gopkg.in/yaml.v2" - "github.com/aws/aws-sdk-go/service/cloudwatchlogs" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" + leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" + "github.com/crowdsecurity/crowdsec/pkg/parser" + "github.com/crowdsecurity/crowdsec/pkg/types" ) var openedStreams = prometheus.NewGaugeVec( @@ -101,61 +101,81 @@ var ( def_AwsConfigDir = "" ) -func (cw *CloudwatchSource) Configure(cfg []byte, logger *log.Entry) error { - cwConfig := CloudwatchSourceConfiguration{} - targetStream := "*" - if err := yaml.UnmarshalStrict(cfg, &cwConfig); err != nil { - return errors.Wrap(err, "Cannot parse CloudwatchSource configuration") +func (cw *CloudwatchSource) UnmarshalConfig(yamlConfig []byte) error { + cw.Config = CloudwatchSourceConfiguration{} + if err := yaml.UnmarshalStrict(yamlConfig, &cw.Config); err != nil { + return fmt.Errorf("cannot parse CloudwatchSource configuration: %w", err) } - cw.Config = cwConfig + if len(cw.Config.GroupName) == 0 { return fmt.Errorf("group_name is mandatory for CloudwatchSource") } - cw.logger = logger.WithField("group", cw.Config.GroupName) + if cw.Config.Mode == "" { cw.Config.Mode = configuration.TAIL_MODE } - logger.Debugf("Starting configuration for Cloudwatch group %s", cw.Config.GroupName) if cw.Config.DescribeLogStreamsLimit == nil { cw.Config.DescribeLogStreamsLimit = &def_DescribeLogStreamsLimit } - logger.Tracef("describelogstreams_limit set to %d", *cw.Config.DescribeLogStreamsLimit) + if cw.Config.PollNewStreamInterval == nil { cw.Config.PollNewStreamInterval = &def_PollNewStreamInterval } - logger.Tracef("poll_new_stream_interval set to %v", *cw.Config.PollNewStreamInterval) + if cw.Config.MaxStreamAge == nil { cw.Config.MaxStreamAge = &def_MaxStreamAge } - logger.Tracef("max_stream_age set to %v", *cw.Config.MaxStreamAge) + if cw.Config.PollStreamInterval == nil { cw.Config.PollStreamInterval = &def_PollStreamInterval } - logger.Tracef("poll_stream_interval set to %v", *cw.Config.PollStreamInterval) + if cw.Config.StreamReadTimeout == nil { cw.Config.StreamReadTimeout = &def_StreamReadTimeout } - logger.Tracef("stream_read_timeout set to %v", *cw.Config.StreamReadTimeout) + if cw.Config.GetLogEventsPagesLimit == nil { cw.Config.GetLogEventsPagesLimit = &def_GetLogEventsPagesLimit } - logger.Tracef("getlogeventspages_limit set to %v", *cw.Config.GetLogEventsPagesLimit) + if cw.Config.AwsApiCallTimeout == nil { cw.Config.AwsApiCallTimeout = &def_AwsApiCallTimeout } - logger.Tracef("aws_api_timeout set to %v", *cw.Config.AwsApiCallTimeout) - if *cw.Config.MaxStreamAge > *cw.Config.StreamReadTimeout { - logger.Warningf("max_stream_age > stream_read_timeout, stream might keep being opened/closed") - } + if cw.Config.AwsConfigDir == nil { cw.Config.AwsConfigDir = &def_AwsConfigDir } - logger.Tracef("aws_config_dir set to %s", *cw.Config.AwsConfigDir) + + return nil +} + +func (cw *CloudwatchSource) Configure(yamlConfig []byte, logger *log.Entry) error { + err := cw.UnmarshalConfig(yamlConfig) + if err != nil { + return err + } + + cw.logger = logger.WithField("group", cw.Config.GroupName) + + cw.logger.Debugf("Starting configuration for Cloudwatch group %s", cw.Config.GroupName) + cw.logger.Tracef("describelogstreams_limit set to %d", *cw.Config.DescribeLogStreamsLimit) + cw.logger.Tracef("poll_new_stream_interval set to %v", *cw.Config.PollNewStreamInterval) + cw.logger.Tracef("max_stream_age set to %v", *cw.Config.MaxStreamAge) + cw.logger.Tracef("poll_stream_interval set to %v", *cw.Config.PollStreamInterval) + cw.logger.Tracef("stream_read_timeout set to %v", *cw.Config.StreamReadTimeout) + cw.logger.Tracef("getlogeventspages_limit set to %v", *cw.Config.GetLogEventsPagesLimit) + cw.logger.Tracef("aws_api_timeout set to %v", *cw.Config.AwsApiCallTimeout) + + if *cw.Config.MaxStreamAge > *cw.Config.StreamReadTimeout { + cw.logger.Warningf("max_stream_age > stream_read_timeout, stream might keep being opened/closed") + } + cw.logger.Tracef("aws_config_dir set to %s", *cw.Config.AwsConfigDir) + if *cw.Config.AwsConfigDir != "" { _, err := os.Stat(*cw.Config.AwsConfigDir) if err != nil { - logger.Errorf("can't read aws_config_dir '%s' got err %s", *cw.Config.AwsConfigDir, err) + cw.logger.Errorf("can't read aws_config_dir '%s' got err %s", *cw.Config.AwsConfigDir, err) return fmt.Errorf("can't read aws_config_dir %s got err %s ", *cw.Config.AwsConfigDir, err) } os.Setenv("AWS_SDK_LOAD_CONFIG", "1") @@ -164,7 +184,7 @@ func (cw *CloudwatchSource) Configure(cfg []byte, logger *log.Entry) error { os.Setenv("AWS_SHARED_CREDENTIALS_FILE", fmt.Sprintf("%s/credentials", *cw.Config.AwsConfigDir)) } else { if cw.Config.AwsRegion == nil { - logger.Errorf("aws_region is not specified, specify it or aws_config_dir") + cw.logger.Errorf("aws_region is not specified, specify it or aws_config_dir") return fmt.Errorf("aws_region is not specified, specify it or aws_config_dir") } os.Setenv("AWS_REGION", *cw.Config.AwsRegion) @@ -174,6 +194,8 @@ func (cw *CloudwatchSource) Configure(cfg []byte, logger *log.Entry) error { return err } cw.streamIndexes = make(map[string]string) + + targetStream := "*" if cw.Config.StreamRegexp != nil { if _, err := regexp.Compile(*cw.Config.StreamRegexp); err != nil { return errors.Wrapf(err, "error while compiling regexp '%s'", *cw.Config.StreamRegexp) @@ -183,7 +205,7 @@ func (cw *CloudwatchSource) Configure(cfg []byte, logger *log.Entry) error { targetStream = *cw.Config.StreamName } - logger.Infof("Adding cloudwatch group '%s' (stream:%s) to datasources", cw.Config.GroupName, targetStream) + cw.logger.Infof("Adding cloudwatch group '%s' (stream:%s) to datasources", cw.Config.GroupName, targetStream) return nil } diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index 117eadda2..db9e4ca9e 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -10,18 +10,19 @@ import ( "strings" "time" - "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" - leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" - "github.com/crowdsecurity/crowdsec/pkg/types" - "github.com/crowdsecurity/dlog" dockerTypes "github.com/docker/docker/api/types" "github.com/docker/docker/client" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "gopkg.in/tomb.v2" "gopkg.in/yaml.v2" + + "github.com/crowdsecurity/dlog" + + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" + leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" + "github.com/crowdsecurity/crowdsec/pkg/types" ) var linesRead = prometheus.NewCounterVec( @@ -67,24 +68,22 @@ type ContainerConfig struct { Tty bool } -func (d *DockerSource) Configure(Config []byte, logger *log.Entry) error { - var err error - +func (d *DockerSource) UnmarshalConfig(yamlConfig []byte) error { d.Config = DockerConfiguration{ FollowStdout: true, // default FollowStdErr: true, // default CheckInterval: "1s", // default } - d.logger = logger - d.runningContainerState = make(map[string]*ContainerConfig) - - err = yaml.UnmarshalStrict(Config, &d.Config) + err := yaml.UnmarshalStrict(yamlConfig, &d.Config) if err != nil { return errors.Wrap(err, "Cannot parse DockerAcquisition configuration") } - d.logger.Tracef("DockerAcquisition configuration: %+v", d.Config) + if d.logger != nil { + d.logger.Tracef("DockerAcquisition configuration: %+v", d.Config) + } + if len(d.Config.ContainerName) == 0 && len(d.Config.ContainerID) == 0 && len(d.Config.ContainerIDRegexp) == 0 && len(d.Config.ContainerNameRegexp) == 0 { return fmt.Errorf("no containers names or containers ID configuration provided") } @@ -100,7 +99,6 @@ func (d *DockerSource) Configure(Config []byte, logger *log.Entry) error { if d.Config.Mode != configuration.CAT_MODE && d.Config.Mode != configuration.TAIL_MODE { return fmt.Errorf("unsupported mode %s for docker datasource", d.Config.Mode) } - d.logger.Tracef("Actual DockerAcquisition configuration %+v", d.Config) for _, cont := range d.Config.ContainerNameRegexp { d.compiledContainerName = append(d.compiledContainerName, regexp.MustCompile(cont)) @@ -110,11 +108,6 @@ func (d *DockerSource) Configure(Config []byte, logger *log.Entry) error { d.compiledContainerID = append(d.compiledContainerID, regexp.MustCompile(cont)) } - dockerClient, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) - if err != nil { - return err - } - if d.Config.Since == "" { d.Config.Since = time.Now().UTC().Format(time.RFC3339) } @@ -130,17 +123,37 @@ func (d *DockerSource) Configure(Config []byte, logger *log.Entry) error { d.containerLogsOptions.Until = d.Config.Until } + return nil +} + +func (d *DockerSource) Configure(yamlConfig []byte, logger *log.Entry) error { + d.logger = logger + + err := d.UnmarshalConfig(yamlConfig) + if err != nil { + return err + } + + d.runningContainerState = make(map[string]*ContainerConfig) + + d.logger.Tracef("Actual DockerAcquisition configuration %+v", d.Config) + + dockerClient, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + if err != nil { + return err + } + if d.Config.DockerHost != "" { - if err := client.WithHost(d.Config.DockerHost)(dockerClient); err != nil { + err = client.WithHost(d.Config.DockerHost)(dockerClient) + if err != nil { return err } } d.Client = dockerClient _, err = d.Client.Info(context.Background()) - if err != nil { - return errors.Wrapf(err, "failed to configure docker datasource %s", d.Config.DockerHost) + return fmt.Errorf("failed to configure docker datasource %s: %w", d.Config.DockerHost, err) } return nil diff --git a/pkg/acquisition/modules/file/file.go b/pkg/acquisition/modules/file/file.go index b1b29c930..a98a0fd36 100644 --- a/pkg/acquisition/modules/file/file.go +++ b/pkg/acquisition/modules/file/file.go @@ -13,9 +13,6 @@ import ( "strings" "time" - "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" - leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" - "github.com/crowdsecurity/crowdsec/pkg/types" "github.com/fsnotify/fsnotify" "github.com/nxadm/tail" "github.com/pkg/errors" @@ -23,6 +20,10 @@ import ( log "github.com/sirupsen/logrus" "gopkg.in/tomb.v2" "gopkg.in/yaml.v2" + + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" + leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" + "github.com/crowdsecurity/crowdsec/pkg/types" ) var linesRead = prometheus.NewCounterVec( @@ -50,41 +51,62 @@ type FileSource struct { exclude_regexps []*regexp.Regexp } -func (f *FileSource) Configure(Config []byte, logger *log.Entry) error { - fileConfig := FileConfiguration{} - f.logger = logger - f.watchedDirectories = make(map[string]bool) - f.tails = make(map[string]bool) - err := yaml.UnmarshalStrict(Config, &fileConfig) +func (f *FileSource) UnmarshalConfig(yamlConfig []byte) error { + f.config = FileConfiguration{} + err := yaml.UnmarshalStrict(yamlConfig, &f.config) if err != nil { - return errors.Wrap(err, "Cannot parse FileAcquisition configuration") + return fmt.Errorf("cannot parse FileAcquisition configuration: %w", err) } - f.logger.Tracef("FileAcquisition configuration: %+v", fileConfig) - if len(fileConfig.Filename) != 0 { - fileConfig.Filenames = append(fileConfig.Filenames, fileConfig.Filename) + + if f.logger != nil { + f.logger.Tracef("FileAcquisition configuration: %+v", f.config) } - if len(fileConfig.Filenames) == 0 { + + if len(f.config.Filename) != 0 { + f.config.Filenames = append(f.config.Filenames, f.config.Filename) + } + + if len(f.config.Filenames) == 0 { return fmt.Errorf("no filename or filenames configuration provided") } - f.config = fileConfig + if f.config.Mode == "" { f.config.Mode = configuration.TAIL_MODE } + if f.config.Mode != configuration.CAT_MODE && f.config.Mode != configuration.TAIL_MODE { return fmt.Errorf("unsupported mode %s for file source", f.config.Mode) } + + for _, exclude := range f.config.ExcludeRegexps { + re, err := regexp.Compile(exclude) + if err != nil { + return fmt.Errorf("could not compile regexp %s: %w", exclude, err) + } + f.exclude_regexps = append(f.exclude_regexps, re) + } + + return nil +} + +func (f *FileSource) Configure(yamlConfig []byte, logger *log.Entry) error { + f.logger = logger + + err := f.UnmarshalConfig(yamlConfig) + if err != nil { + return err + } + + f.watchedDirectories = make(map[string]bool) + f.tails = make(map[string]bool) + f.watcher, err = fsnotify.NewWatcher() if err != nil { return errors.Wrapf(err, "Could not create fsnotify watcher") } - for _, exclude := range f.config.ExcludeRegexps { - re, err := regexp.Compile(exclude) - if err != nil { - return errors.Wrapf(err, "Could not compile regexp %s", exclude) - } - f.exclude_regexps = append(f.exclude_regexps, re) - } + f.logger.Tracef("Actual FileAcquisition Configuration %+v", f.config) + for _, pattern := range f.config.Filenames { if f.config.ForceInotify { directory := filepath.Dir(pattern) diff --git a/pkg/acquisition/modules/file/file_test.go b/pkg/acquisition/modules/file/file_test.go index 06653fbe1..aa055d471 100644 --- a/pkg/acquisition/modules/file/file_test.go +++ b/pkg/acquisition/modules/file/file_test.go @@ -7,14 +7,15 @@ import ( "testing" "time" - fileacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/file" - "github.com/crowdsecurity/crowdsec/pkg/cstest" - "github.com/crowdsecurity/crowdsec/pkg/types" log "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/tomb.v2" + + fileacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/file" + "github.com/crowdsecurity/crowdsec/pkg/cstest" + "github.com/crowdsecurity/crowdsec/pkg/types" ) func TestBadConfiguration(t *testing.T) { @@ -42,7 +43,7 @@ func TestBadConfiguration(t *testing.T) { name: "bad exclude regexp", config: `filenames: ["asd.log"] exclude_regexps: ["as[a-$d"]`, - expectedErr: "Could not compile regexp as", + expectedErr: "could not compile regexp as", }, } diff --git a/pkg/acquisition/modules/journalctl/journalctl.go b/pkg/acquisition/modules/journalctl/journalctl.go index 674563753..ed4445718 100644 --- a/pkg/acquisition/modules/journalctl/journalctl.go +++ b/pkg/acquisition/modules/journalctl/journalctl.go @@ -9,15 +9,15 @@ import ( "strings" "time" - "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" - leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" - "github.com/crowdsecurity/crowdsec/pkg/types" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" - "gopkg.in/tomb.v2" "gopkg.in/yaml.v2" + + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" + leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" + "github.com/crowdsecurity/crowdsec/pkg/types" ) type JournalCtlConfiguration struct { @@ -163,28 +163,41 @@ func (j *JournalCtlSource) GetAggregMetrics() []prometheus.Collector { return []prometheus.Collector{linesRead} } -func (j *JournalCtlSource) Configure(yamlConfig []byte, logger *log.Entry) error { - config := JournalCtlConfiguration{} - j.logger = logger - err := yaml.UnmarshalStrict(yamlConfig, &config) +func (j *JournalCtlSource) UnmarshalConfig(yamlConfig []byte) error { + j.config = JournalCtlConfiguration{} + err := yaml.UnmarshalStrict(yamlConfig, &j.config) if err != nil { - return errors.Wrap(err, "Cannot parse JournalCtlSource configuration") + return fmt.Errorf("cannot parse JournalCtlSource configuration: %w", err) } - if config.Mode == "" { - config.Mode = configuration.TAIL_MODE + + if j.config.Mode == "" { + j.config.Mode = configuration.TAIL_MODE } + var args []string - if config.Mode == configuration.TAIL_MODE { + if j.config.Mode == configuration.TAIL_MODE { args = journalctlArgstreaming } else { args = journalctlArgsOneShot } - if len(config.Filters) == 0 { + + if len(j.config.Filters) == 0 { return fmt.Errorf("journalctl_filter is required") } - j.args = append(args, config.Filters...) - j.src = fmt.Sprintf("journalctl-%s", strings.Join(config.Filters, ".")) - j.config = config + j.args = append(args, j.config.Filters...) + j.src = fmt.Sprintf("journalctl-%s", strings.Join(j.config.Filters, ".")) + + return nil +} + +func (j *JournalCtlSource) Configure(yamlConfig []byte, logger *log.Entry) error { + j.logger = logger + + err := j.UnmarshalConfig(yamlConfig) + if err != nil { + return err + } + return nil } diff --git a/pkg/acquisition/modules/kafka/kafka.go b/pkg/acquisition/modules/kafka/kafka.go index b8b8937e5..f3197f192 100644 --- a/pkg/acquisition/modules/kafka/kafka.go +++ b/pkg/acquisition/modules/kafka/kafka.go @@ -10,15 +10,16 @@ import ( "strconv" "time" - "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" - "github.com/crowdsecurity/crowdsec/pkg/leakybucket" - "github.com/crowdsecurity/crowdsec/pkg/types" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/segmentio/kafka-go" log "github.com/sirupsen/logrus" "gopkg.in/tomb.v2" "gopkg.in/yaml.v2" + + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" + "github.com/crowdsecurity/crowdsec/pkg/leakybucket" + "github.com/crowdsecurity/crowdsec/pkg/types" ) var ( @@ -54,35 +55,51 @@ type KafkaSource struct { Reader *kafka.Reader } -func (k *KafkaSource) Configure(Config []byte, logger *log.Entry) error { - var err error - +func (k *KafkaSource) UnmarshalConfig(yamlConfig []byte) error { k.Config = KafkaConfiguration{} - k.logger = logger - err = yaml.UnmarshalStrict(Config, &k.Config) + + err := yaml.UnmarshalStrict(yamlConfig, &k.Config) if err != nil { - return errors.Wrapf(err, "cannot parse %s datasource configuration", dataSourceName) + return fmt.Errorf("cannot parse %s datasource configuration: %w", dataSourceName, err) } + if len(k.Config.Brokers) == 0 { return fmt.Errorf("cannot create a %s reader with an empty list of broker addresses", dataSourceName) } + if k.Config.Topic == "" { return fmt.Errorf("cannot create a %s reader with am empty topic", dataSourceName) } + if k.Config.Mode == "" { k.Config.Mode = configuration.TAIL_MODE } + + return err +} + +func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry) error { + k.logger = logger + + err := k.UnmarshalConfig(yamlConfig) + if err != nil { + return err + } + dialer, err := k.Config.NewDialer() if err != nil { return errors.Wrapf(err, "cannot create %s dialer", dataSourceName) } + k.Reader, err = k.Config.NewReader(dialer) if err != nil { return errors.Wrapf(err, "cannote create %s reader", dataSourceName) } + if k.Reader == nil { return fmt.Errorf("cannot create %s reader", dataSourceName) } + return nil } diff --git a/pkg/acquisition/modules/kinesis/kinesis.go b/pkg/acquisition/modules/kinesis/kinesis.go index cce5ffbfc..48911b372 100644 --- a/pkg/acquisition/modules/kinesis/kinesis.go +++ b/pkg/acquisition/modules/kinesis/kinesis.go @@ -13,14 +13,15 @@ import ( "github.com/aws/aws-sdk-go/aws/arn" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" - "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" - "github.com/crowdsecurity/crowdsec/pkg/leakybucket" - "github.com/crowdsecurity/crowdsec/pkg/types" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "gopkg.in/tomb.v2" "gopkg.in/yaml.v2" + + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" + "github.com/crowdsecurity/crowdsec/pkg/leakybucket" + "github.com/crowdsecurity/crowdsec/pkg/types" ) type KinesisConfiguration struct { @@ -113,17 +114,18 @@ func (k *KinesisSource) GetAggregMetrics() []prometheus.Collector { return []prometheus.Collector{linesRead, linesReadShards} } -func (k *KinesisSource) Configure(yamlConfig []byte, logger *log.Entry) error { - config := KinesisConfiguration{} - k.logger = logger - err := yaml.UnmarshalStrict(yamlConfig, &config) +func (k *KinesisSource) UnmarshalConfig(yamlConfig []byte) error { + k.Config = KinesisConfiguration{} + + err := yaml.UnmarshalStrict(yamlConfig, &k.Config) if err != nil { return errors.Wrap(err, "Cannot parse kinesis datasource configuration") } - if config.Mode == "" { - config.Mode = configuration.TAIL_MODE + + if k.Config.Mode == "" { + k.Config.Mode = configuration.TAIL_MODE } - k.Config = config + if k.Config.StreamName == "" && !k.Config.UseEnhancedFanOut { return fmt.Errorf("stream_name is mandatory when use_enhanced_fanout is false") } @@ -139,10 +141,23 @@ func (k *KinesisSource) Configure(yamlConfig []byte, logger *log.Entry) error { if k.Config.MaxRetries <= 0 { k.Config.MaxRetries = 10 } + + return nil +} + +func (k *KinesisSource) Configure(yamlConfig []byte, logger *log.Entry) error { + k.logger = logger + + err := k.UnmarshalConfig(yamlConfig) + if err != nil { + return err + } + err = k.newClient() if err != nil { - return errors.Wrap(err, "Cannot create kinesis client") + return fmt.Errorf("cannot create kinesis client: %w", err) } + k.shardReaderTomb = &tomb.Tomb{} return nil } diff --git a/pkg/acquisition/modules/syslog/syslog.go b/pkg/acquisition/modules/syslog/syslog.go index 2cd0083b7..29f1dc574 100644 --- a/pkg/acquisition/modules/syslog/syslog.go +++ b/pkg/acquisition/modules/syslog/syslog.go @@ -6,18 +6,18 @@ import ( "strings" "time" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" + "gopkg.in/tomb.v2" + "gopkg.in/yaml.v2" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog/internal/parser/rfc3164" "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog/internal/parser/rfc5424" syslogserver "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog/internal/server" leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" "github.com/crowdsecurity/crowdsec/pkg/types" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - log "github.com/sirupsen/logrus" - - "gopkg.in/tomb.v2" - "gopkg.in/yaml.v2" ) type SyslogConfiguration struct { @@ -89,31 +89,43 @@ func validateAddr(addr string) bool { return net.ParseIP(addr) != nil } -func (s *SyslogSource) Configure(yamlConfig []byte, logger *log.Entry) error { - s.logger = logger - s.logger.Infof("Starting syslog datasource configuration") - syslogConfig := SyslogConfiguration{} - syslogConfig.Mode = configuration.TAIL_MODE - err := yaml.UnmarshalStrict(yamlConfig, &syslogConfig) +func (s *SyslogSource) UnmarshalConfig(yamlConfig []byte) error { + s.config = SyslogConfiguration{} + s.config.Mode = configuration.TAIL_MODE + + err := yaml.UnmarshalStrict(yamlConfig, &s.config) if err != nil { return errors.Wrap(err, "Cannot parse syslog configuration") } - if syslogConfig.Addr == "" { - syslogConfig.Addr = "127.0.0.1" //do we want a usable or secure default ? + + if s.config.Addr == "" { + s.config.Addr = "127.0.0.1" //do we want a usable or secure default ? } - if syslogConfig.Port == 0 { - syslogConfig.Port = 514 + if s.config.Port == 0 { + s.config.Port = 514 } - if syslogConfig.MaxMessageLen == 0 { - syslogConfig.MaxMessageLen = 2048 + if s.config.MaxMessageLen == 0 { + s.config.MaxMessageLen = 2048 } - if !validatePort(syslogConfig.Port) { - return fmt.Errorf("invalid port %d", syslogConfig.Port) + if !validatePort(s.config.Port) { + return fmt.Errorf("invalid port %d", s.config.Port) } - if !validateAddr(syslogConfig.Addr) { - return fmt.Errorf("invalid listen IP %s", syslogConfig.Addr) + if !validateAddr(s.config.Addr) { + return fmt.Errorf("invalid listen IP %s", s.config.Addr) } - s.config = syslogConfig + + return nil +} + +func (s *SyslogSource) Configure(yamlConfig []byte, logger *log.Entry) error { + s.logger = logger + s.logger.Infof("Starting syslog datasource configuration") + + err := s.UnmarshalConfig(yamlConfig) + if err != nil { + return err + } + return nil } diff --git a/pkg/acquisition/modules/wineventlog/wineventlog.go b/pkg/acquisition/modules/wineventlog/wineventlog.go index 92bbd7be4..6febf7b8a 100644 --- a/pkg/acquisition/modules/wineventlog/wineventlog.go +++ b/pkg/acquisition/modules/wineventlog/wineventlog.go @@ -5,15 +5,20 @@ package wineventlogacquisition import ( "errors" - "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" - "github.com/crowdsecurity/crowdsec/pkg/types" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "gopkg.in/tomb.v2" + + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" + "github.com/crowdsecurity/crowdsec/pkg/types" ) type WinEventLogSource struct{} +func (w *WinEventLogSource) UnmarshalConfig(yamlConfig []byte) error { + return nil +} + func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry) error { return nil } diff --git a/pkg/acquisition/modules/wineventlog/wineventlog_windows.go b/pkg/acquisition/modules/wineventlog/wineventlog_windows.go index 7e7bb5778..c4a87524c 100644 --- a/pkg/acquisition/modules/wineventlog/wineventlog_windows.go +++ b/pkg/acquisition/modules/wineventlog/wineventlog_windows.go @@ -9,9 +9,6 @@ import ( "syscall" "time" - "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" - leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" - "github.com/crowdsecurity/crowdsec/pkg/types" "github.com/google/winops/winlog" "github.com/google/winops/winlog/wevtapi" "github.com/prometheus/client_golang/prometheus" @@ -19,6 +16,10 @@ import ( "golang.org/x/sys/windows" "gopkg.in/tomb.v2" "gopkg.in/yaml.v2" + + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" + leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" + "github.com/crowdsecurity/crowdsec/pkg/types" ) type WinEventLogConfiguration struct { @@ -228,29 +229,26 @@ func (w *WinEventLogSource) generateConfig(query string) (*winlog.SubscribeConfi return &config, nil } -func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry) error { - - config := WinEventLogConfiguration{} - w.logger = logger - err := yaml.UnmarshalStrict(yamlConfig, &config) +func (w *WinEventLogSource) UnmarshalConfig(yamlConfig []byte) error { + w.config = WinEventLogConfiguration{} + err := yaml.UnmarshalStrict(yamlConfig, &w.config) if err != nil { return fmt.Errorf("unable to parse configuration: %v", err) } - if config.EventChannel != "" && config.XPathQuery != "" { + if w.config.EventChannel != "" && w.config.XPathQuery != "" { return fmt.Errorf("event_channel and xpath_query are mutually exclusive") } - if config.EventChannel == "" && config.XPathQuery == "" { + if w.config.EventChannel == "" && w.config.XPathQuery == "" { return fmt.Errorf("event_channel or xpath_query must be set") } - config.Mode = configuration.TAIL_MODE - w.config = config + w.config.Mode = configuration.TAIL_MODE - if config.XPathQuery != "" { - w.query = config.XPathQuery + if w.config.XPathQuery != "" { + w.query = w.config.XPathQuery } else { w.query, err = w.buildXpathQuery() if err != nil { @@ -258,15 +256,26 @@ func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry) erro } } - w.evtConfig, err = w.generateConfig(w.query) + if w.config.PrettyName != "" { + w.name = w.config.PrettyName + } else { + w.name = w.query + } + + return nil +} + +func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry) error { + w.logger = logger + + err := w.UnmarshalConfig(yamlConfig) if err != nil { return err } - if config.PrettyName != "" { - w.name = config.PrettyName - } else { - w.name = w.query + w.evtConfig, err = w.generateConfig(w.query) + if err != nil { + return err } return nil diff --git a/pkg/cstest/utils.go b/pkg/cstest/utils.go index 2c26be89e..fb8300094 100644 --- a/pkg/cstest/utils.go +++ b/pkg/cstest/utils.go @@ -1,7 +1,9 @@ package cstest import ( + "strings" "testing" + "text/template" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -28,3 +30,20 @@ func RequireErrorContains(t *testing.T, err error, expectedErr string) { require.NoError(t, err) } + +// Interpolate fills a string template with the given values, can be map or struct. +// example: Interpolate("{{.Name}}", map[string]string{"Name": "JohnDoe"}) +func Interpolate(s string, data interface{}) (string, error) { + tmpl, err := template.New("").Parse(s) + if err != nil { + return "", err + } + + var b strings.Builder + err = tmpl.Execute(&b, data) + if err != nil { + return "", err + } + + return b.String(), nil +}