From 61bea2648629f88a98f17d82dc48d70cfe869d2e Mon Sep 17 00:00:00 2001 From: blotus Date: Wed, 29 Mar 2023 16:04:17 +0200 Subject: [PATCH] Add `transform` configuration option for acquisition (#2144) --- cmd/crowdsec/main.go | 8 +- pkg/acquisition/acquisition.go | 119 +++++++++++++++--- pkg/acquisition/acquisition_test.go | 14 ++- .../configuration/configuration.go | 2 + .../modules/cloudwatch/cloudwatch.go | 8 +- .../modules/cloudwatch/cloudwatch_test.go | 4 +- pkg/acquisition/modules/docker/docker.go | 7 +- pkg/acquisition/modules/docker/docker_test.go | 4 +- pkg/acquisition/modules/file/file.go | 51 ++++++-- pkg/acquisition/modules/file/file_test.go | 2 +- .../modules/journalctl/journalctl.go | 7 +- .../modules/journalctl/journalctl_test.go | 2 +- pkg/acquisition/modules/kafka/kafka.go | 6 +- pkg/acquisition/modules/kinesis/kinesis.go | 6 +- .../modules/kubernetesaudit/k8s_audit.go | 6 +- pkg/acquisition/modules/s3/s3.go | 81 +++++++++--- pkg/acquisition/modules/s3/s3_test.go | 5 +- pkg/acquisition/modules/syslog/syslog.go | 6 +- .../modules/wineventlog/wineventlog.go | 6 +- .../wineventlog/wineventlog_windows.go | 6 +- 20 files changed, 281 insertions(+), 69 deletions(-) diff --git a/cmd/crowdsec/main.go b/cmd/crowdsec/main.go index d7894012b..6a2d1097d 100644 --- a/cmd/crowdsec/main.go +++ b/cmd/crowdsec/main.go @@ -69,6 +69,7 @@ type Flags struct { DisableAPI bool WinSvc string DisableCAPI bool + Transform string } type labelsMap map[string]string @@ -107,7 +108,7 @@ func LoadAcquisition(cConfig *csconfig.Config) error { flags.Labels = labels flags.Labels["type"] = flags.SingleFileType - dataSources, err = acquisition.LoadAcquisitionFromDSN(flags.OneShotDSN, flags.Labels) + dataSources, err = acquisition.LoadAcquisitionFromDSN(flags.OneShotDSN, flags.Labels, flags.Transform) if err != nil { return errors.Wrapf(err, "failed to configure datasource for %s", flags.OneShotDSN) } @@ -149,6 +150,7 @@ func (f *Flags) Parse() { flag.BoolVar(&f.ErrorLevel, "error", false, "print error-level on stderr") flag.BoolVar(&f.PrintVersion, "version", false, "display version") flag.StringVar(&f.OneShotDSN, "dsn", "", "Process a single data source in time-machine") + flag.StringVar(&f.Transform, "transform", "", "expr to apply on the event after acquisition") flag.StringVar(&f.SingleFileType, "type", "", "Labels.type for file in time-machine") flag.Var(&labels, "label", "Additional Labels for file in time-machine") flag.BoolVar(&f.TestMode, "t", false, "only test configs") @@ -257,6 +259,10 @@ func LoadConfig(cConfig *csconfig.Config) error { return errors.New("-dsn requires a -type argument") } + if flags.Transform != "" && flags.OneShotDSN == "" { + return errors.New("-transform requires a -dsn argument") + } + if flags.SingleFileType != "" && flags.OneShotDSN == "" { return errors.New("-type requires a -dsn argument") } diff --git a/pkg/acquisition/acquisition.go b/pkg/acquisition/acquisition.go index 5ffa2d441..315541154 100644 --- a/pkg/acquisition/acquisition.go +++ b/pkg/acquisition/acquisition.go @@ -7,6 +7,9 @@ import ( "os" "strings" + "github.com/antonmedv/expr" + "github.com/antonmedv/expr/vm" + "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" tomb "gopkg.in/tomb.v2" @@ -23,6 +26,7 @@ import ( s3acquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/s3" syslogacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog" wineventlogacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/wineventlog" + "github.com/crowdsecurity/crowdsec/pkg/exprhelpers" "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/types" @@ -30,16 +34,17 @@ import ( // The interface each datasource must implement type DataSource interface { - GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module - GetAggregMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module (aggregated mode, limits cardinality) - UnmarshalConfig([]byte) error // Decode and pre-validate the YAML datasource - anything that can be checked before runtime - Configure([]byte, *log.Entry) error // Complete the YAML datasource configuration and perform runtime checks. - ConfigureByDSN(string, map[string]string, *log.Entry) error // Configure the datasource - GetMode() string // Get the mode (TAIL, CAT or SERVER) - GetName() string // Get the name of the module - OneShotAcquisition(chan types.Event, *tomb.Tomb) error // Start one shot acquisition(eg, cat a file) - StreamingAcquisition(chan types.Event, *tomb.Tomb) error // Start live acquisition (eg, tail a file) - CanRun() error // Whether the datasource can run or not (eg, journalctl on BSD is a non-sense) + GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module + GetAggregMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module (aggregated mode, limits cardinality) + UnmarshalConfig([]byte) error // Decode and pre-validate the YAML datasource - anything that can be checked before runtime + Configure([]byte, *log.Entry) error // Complete the YAML datasource configuration and perform runtime checks. + ConfigureByDSN(string, map[string]string, *log.Entry, string) error // Configure the datasource + GetMode() string // Get the mode (TAIL, CAT or SERVER) + GetName() string // Get the name of the module + OneShotAcquisition(chan types.Event, *tomb.Tomb) error // Start one shot acquisition(eg, cat a file) + StreamingAcquisition(chan types.Event, *tomb.Tomb) error // Start live acquisition (eg, tail a file) + CanRun() error // Whether the datasource can run or not (eg, journalctl on BSD is a non-sense) + GetUuid() string // Get the unique identifier of the datasource Dump() interface{} } @@ -56,6 +61,8 @@ var AcquisitionSources = map[string]func() DataSource{ "s3": func() DataSource { return &s3acquisition.S3Source{} }, } +var transformRuntimes = map[string]*vm.Program{} + func GetDataSourceIface(dataSourceType string) DataSource { source := AcquisitionSources[dataSourceType] if source == nil { @@ -115,7 +122,7 @@ func detectBackwardCompatAcquis(sub configuration.DataSourceCommonCfg) string { return "" } -func LoadAcquisitionFromDSN(dsn string, labels map[string]string) ([]DataSource, error) { +func LoadAcquisitionFromDSN(dsn string, labels map[string]string, transformExpr string) ([]DataSource, error) { var sources []DataSource frags := strings.Split(dsn, ":") @@ -134,7 +141,15 @@ func LoadAcquisitionFromDSN(dsn string, labels map[string]string) ([]DataSource, subLogger := clog.WithFields(log.Fields{ "type": dsn, }) - err := dataSrc.ConfigureByDSN(dsn, labels, subLogger) + uniqueId := uuid.NewString() + if transformExpr != "" { + vm, err := expr.Compile(transformExpr, exprhelpers.GetExprOptions(map[string]interface{}{"evt": &types.Event{}})...) + if err != nil { + return nil, fmt.Errorf("while compiling transform expression '%s': %w", transformExpr, err) + } + transformRuntimes[uniqueId] = vm + } + err := dataSrc.ConfigureByDSN(dsn, labels, subLogger, uniqueId) if err != nil { return nil, fmt.Errorf("while configuration datasource for %s: %w", dsn, err) } @@ -185,10 +200,19 @@ func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg) ([]DataSource, if GetDataSourceIface(sub.Source) == nil { return nil, fmt.Errorf("unknown data source %s in %s (position: %d)", sub.Source, acquisFile, idx) } + uniqueId := uuid.NewString() + sub.UniqueId = uniqueId src, err := DataSourceConfigure(sub) if err != nil { return nil, fmt.Errorf("while configuring datasource of type %s from %s (position: %d): %w", sub.Source, acquisFile, idx, err) } + if sub.TransformExpr != "" { + vm, err := expr.Compile(sub.TransformExpr, exprhelpers.GetExprOptions(map[string]interface{}{"evt": &types.Event{}})...) + if err != nil { + return nil, fmt.Errorf("while compiling transform expression '%s' for datasource %s in %s (position: %d): %w", sub.TransformExpr, sub.Source, acquisFile, idx, err) + } + transformRuntimes[uniqueId] = vm + } sources = append(sources, *src) idx += 1 } @@ -212,11 +236,60 @@ func GetMetrics(sources []DataSource, aggregated bool) error { // ignore the error } } - } return nil } +func transform(transformChan chan types.Event, output chan types.Event, AcquisTomb *tomb.Tomb, transformRuntime *vm.Program, logger *log.Entry) { + defer types.CatchPanic("crowdsec/acquis") + logger.Infof("transformer started") + for { + select { + case <-AcquisTomb.Dying(): + logger.Debugf("transformer is dying") + return + case evt := <-transformChan: + logger.Tracef("Received event %s", evt.Line.Raw) + out, err := expr.Run(transformRuntime, map[string]interface{}{"evt": &evt}) + if err != nil { + logger.Errorf("while running transform expression: %s, sending event as-is", err) + output <- evt + } + if out == nil { + logger.Errorf("transform expression returned nil, sending event as-is") + output <- evt + } + switch v := out.(type) { + case string: + logger.Tracef("transform expression returned %s", v) + evt.Line.Raw = v + output <- evt + case []interface{}: + logger.Tracef("transform expression returned %v", v) //nolint:asasalint // We actually want to log the slice content + for _, line := range v { + l, ok := line.(string) + if !ok { + logger.Errorf("transform expression returned []interface{}, but cannot assert an element to string") + output <- evt + continue + } + evt.Line.Raw = l + output <- evt + } + case []string: + logger.Tracef("transform expression returned %v", v) + for _, line := range v { + evt.Line.Raw = line + output <- evt + } + default: + logger.Errorf("transform expression returned an invalid type %T, sending event as-is", out) + output <- evt + } + } + } +} + func StartAcquisition(sources []DataSource, output chan types.Event, AcquisTomb *tomb.Tomb) error { for i := 0; i < len(sources); i++ { subsrc := sources[i] //ensure its a copy @@ -225,10 +298,26 @@ func StartAcquisition(sources []DataSource, output chan types.Event, AcquisTomb AcquisTomb.Go(func() error { defer types.CatchPanic("crowdsec/acquis") var err error + + outChan := output + log.Debugf("datasource %s UUID: %s", subsrc.GetName(), subsrc.GetUuid()) + if transformRuntime, ok := transformRuntimes[subsrc.GetUuid()]; ok { + log.Infof("transform expression found for datasource %s", subsrc.GetName()) + transformChan := make(chan types.Event) + outChan = transformChan + transformLogger := log.WithFields(log.Fields{ + "component": "transform", + "datasource": subsrc.GetName(), + }) + AcquisTomb.Go(func() error { + transform(outChan, output, AcquisTomb, transformRuntime, transformLogger) + return nil + }) + } if subsrc.GetMode() == configuration.TAIL_MODE { - err = subsrc.StreamingAcquisition(output, AcquisTomb) + err = subsrc.StreamingAcquisition(outChan, AcquisTomb) } else { - err = subsrc.OneShotAcquisition(output, AcquisTomb) + err = subsrc.OneShotAcquisition(outChan, AcquisTomb) } if err != nil { //if one of the acqusition returns an error, we kill the others to properly shutdown diff --git a/pkg/acquisition/acquisition_test.go b/pkg/acquisition/acquisition_test.go index 8556d358a..fc057fc6d 100644 --- a/pkg/acquisition/acquisition_test.go +++ b/pkg/acquisition/acquisition_test.go @@ -58,9 +58,10 @@ func (f *MockSource) GetMetrics() []prometheus.Collector { func (f *MockSource) GetAggregMetrics() []prometheus.Collector { return nil } func (f *MockSource) Dump() interface{} { return f } func (f *MockSource) GetName() string { return "mock" } -func (f *MockSource) ConfigureByDSN(string, map[string]string, *log.Entry) error { +func (f *MockSource) ConfigureByDSN(string, map[string]string, *log.Entry, string) error { return fmt.Errorf("not supported") } +func (f *MockSource) GetUuid() string { return "" } // copy the mocksource, but this one can't run type MockSourceCantRun struct { @@ -326,9 +327,10 @@ func (f *MockCat) CanRun() error { return nil } func (f *MockCat) GetMetrics() []prometheus.Collector { return nil } func (f *MockCat) GetAggregMetrics() []prometheus.Collector { return nil } func (f *MockCat) Dump() interface{} { return f } -func (f *MockCat) ConfigureByDSN(string, map[string]string, *log.Entry) error { +func (f *MockCat) ConfigureByDSN(string, map[string]string, *log.Entry, string) error { return fmt.Errorf("not supported") } +func (f *MockCat) GetUuid() string { return "" } //---- @@ -367,9 +369,10 @@ func (f *MockTail) CanRun() error { return nil } func (f *MockTail) GetMetrics() []prometheus.Collector { return nil } func (f *MockTail) GetAggregMetrics() []prometheus.Collector { return nil } func (f *MockTail) Dump() interface{} { return f } -func (f *MockTail) ConfigureByDSN(string, map[string]string, *log.Entry) error { +func (f *MockTail) ConfigureByDSN(string, map[string]string, *log.Entry, string) error { return fmt.Errorf("not supported") } +func (f *MockTail) GetUuid() string { return "" } //func StartAcquisition(sources []DataSource, output chan types.Event, AcquisTomb *tomb.Tomb) error { @@ -490,13 +493,14 @@ func (f *MockSourceByDSN) GetMetrics() []prometheus.Collector func (f *MockSourceByDSN) GetAggregMetrics() []prometheus.Collector { return nil } func (f *MockSourceByDSN) Dump() interface{} { return f } func (f *MockSourceByDSN) GetName() string { return "mockdsn" } -func (f *MockSourceByDSN) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error { +func (f *MockSourceByDSN) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error { dsn = strings.TrimPrefix(dsn, "mockdsn://") if dsn != "test_expect" { return fmt.Errorf("unexpected value") } return nil } +func (f *MockSourceByDSN) GetUuid() string { return "" } func TestConfigureByDSN(t *testing.T) { tests := []struct { @@ -529,7 +533,7 @@ func TestConfigureByDSN(t *testing.T) { for _, tc := range tests { tc := tc t.Run(tc.dsn, func(t *testing.T) { - srcs, err := LoadAcquisitionFromDSN(tc.dsn, map[string]string{"type": "test_label"}) + srcs, err := LoadAcquisitionFromDSN(tc.dsn, map[string]string{"type": "test_label"}, "") cstest.RequireErrorContains(t, err, tc.ExpectedError) assert.Len(t, srcs, tc.ExpectedResLen) diff --git a/pkg/acquisition/configuration/configuration.go b/pkg/acquisition/configuration/configuration.go index 41d31ef25..5ec1a4ac4 100644 --- a/pkg/acquisition/configuration/configuration.go +++ b/pkg/acquisition/configuration/configuration.go @@ -11,6 +11,8 @@ type DataSourceCommonCfg struct { Source string `yaml:"source,omitempty"` Name string `yaml:"name,omitempty"` UseTimeMachine bool `yaml:"use_time_machine,omitempty"` + UniqueId string `yaml:"unique_id,omitempty"` + TransformExpr string `yaml:"transform,omitempty"` Config map[string]interface{} `yaml:",inline"` //to keep the datasource-specific configuration directives } diff --git a/pkg/acquisition/modules/cloudwatch/cloudwatch.go b/pkg/acquisition/modules/cloudwatch/cloudwatch.go index 0c5798615..1abf04c5b 100644 --- a/pkg/acquisition/modules/cloudwatch/cloudwatch.go +++ b/pkg/acquisition/modules/cloudwatch/cloudwatch.go @@ -100,6 +100,10 @@ var ( def_AwsConfigDir = "" ) +func (cw *CloudwatchSource) GetUuid() string { + return cw.Config.UniqueId +} + func (cw *CloudwatchSource) UnmarshalConfig(yamlConfig []byte) error { cw.Config = CloudwatchSourceConfiguration{} if err := yaml.UnmarshalStrict(yamlConfig, &cw.Config); err != nil { @@ -509,7 +513,7 @@ func (cw *CloudwatchSource) TailLogStream(cfg *LogStreamTailConfig, outChan chan } } -func (cw *CloudwatchSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error { +func (cw *CloudwatchSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error { cw.logger = logger dsn = strings.TrimPrefix(dsn, cw.GetName()+"://") @@ -524,6 +528,8 @@ func (cw *CloudwatchSource) ConfigureByDSN(dsn string, labels map[string]string, cw.Config.GroupName = frags[0] cw.Config.StreamName = &frags[1] cw.Config.Labels = labels + cw.Config.UniqueId = uuid + u, err := url.ParseQuery(args[1]) if err != nil { return errors.Wrapf(err, "while parsing %s", dsn) diff --git a/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go b/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go index d922909a4..b9a739462 100644 --- a/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go +++ b/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go @@ -623,7 +623,7 @@ func TestConfigureByDSN(t *testing.T) { dbgLogger := log.New().WithField("test", tc.name) dbgLogger.Logger.SetLevel(log.DebugLevel) cw := CloudwatchSource{} - err := cw.ConfigureByDSN(tc.dsn, tc.labels, dbgLogger) + err := cw.ConfigureByDSN(tc.dsn, tc.labels, dbgLogger, "") cstest.RequireErrorContains(t, err, tc.expectedCfgErr) }) } @@ -746,7 +746,7 @@ func TestOneShotAcquisition(t *testing.T) { dbgLogger.Logger.SetLevel(log.DebugLevel) dbgLogger.Infof("starting test") cw := CloudwatchSource{} - err := cw.ConfigureByDSN(tc.dsn, map[string]string{"type": "test"}, dbgLogger) + err := cw.ConfigureByDSN(tc.dsn, map[string]string{"type": "test"}, dbgLogger, "") cstest.RequireErrorContains(t, err, tc.expectedCfgErr) if tc.expectedCfgErr != "" { return diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index 276d70f4c..61b450093 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -67,6 +67,10 @@ type ContainerConfig struct { Tty bool } +func (d *DockerSource) GetUuid() string { + return d.Config.UniqueId +} + func (d *DockerSource) UnmarshalConfig(yamlConfig []byte) error { d.Config = DockerConfiguration{ FollowStdout: true, // default @@ -158,7 +162,7 @@ func (d *DockerSource) Configure(yamlConfig []byte, logger *log.Entry) error { return nil } -func (d *DockerSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error { +func (d *DockerSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error { var err error if !strings.HasPrefix(dsn, d.GetName()+"://") { @@ -170,6 +174,7 @@ func (d *DockerSource) ConfigureByDSN(dsn string, labels map[string]string, logg FollowStdErr: true, CheckInterval: "1s", } + d.Config.UniqueId = uuid d.Config.ContainerName = make([]string, 0) d.Config.ContainerID = make([]string, 0) d.runningContainerState = make(map[string]*ContainerConfig) diff --git a/pkg/acquisition/modules/docker/docker_test.go b/pkg/acquisition/modules/docker/docker_test.go index d019da314..f0a16b801 100644 --- a/pkg/acquisition/modules/docker/docker_test.go +++ b/pkg/acquisition/modules/docker/docker_test.go @@ -107,7 +107,7 @@ func TestConfigureDSN(t *testing.T) { }) for _, test := range tests { f := DockerSource{} - err := f.ConfigureByDSN(test.dsn, map[string]string{"type": "testtype"}, subLogger) + err := f.ConfigureByDSN(test.dsn, map[string]string{"type": "testtype"}, subLogger, "") cstest.AssertErrorContains(t, err, test.expectedErr) } } @@ -303,7 +303,7 @@ func TestOneShot(t *testing.T) { labels := make(map[string]string) labels["type"] = ts.logType - if err := dockerClient.ConfigureByDSN(ts.dsn, labels, subLogger); err != nil { + if err := dockerClient.ConfigureByDSN(ts.dsn, labels, subLogger, ""); err != nil { t.Fatalf("unable to configure dsn '%s': %s", ts.dsn, err) } dockerClient.Client = new(mockDockerCli) diff --git a/pkg/acquisition/modules/file/file.go b/pkg/acquisition/modules/file/file.go index d3e631fe2..56c51f3cb 100644 --- a/pkg/acquisition/modules/file/file.go +++ b/pkg/acquisition/modules/file/file.go @@ -10,6 +10,7 @@ import ( "path" "path/filepath" "regexp" + "strconv" "strings" "time" @@ -37,6 +38,7 @@ type FileConfiguration struct { ExcludeRegexps []string `yaml:"exclude_regexps"` Filename string ForceInotify bool `yaml:"force_inotify"` + MaxBufferSize int `yaml:"max_buffer_size"` configuration.DataSourceCommonCfg `yaml:",inline"` } @@ -50,6 +52,10 @@ type FileSource struct { exclude_regexps []*regexp.Regexp } +func (f *FileSource) GetUuid() string { + return f.config.UniqueId +} + func (f *FileSource) UnmarshalConfig(yamlConfig []byte) error { f.config = FileConfiguration{} err := yaml.UnmarshalStrict(yamlConfig, &f.config) @@ -163,12 +169,13 @@ func (f *FileSource) Configure(yamlConfig []byte, logger *log.Entry) error { return nil } -func (f *FileSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error { +func (f *FileSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error { if !strings.HasPrefix(dsn, "file://") { return fmt.Errorf("invalid DSN %s for file source, must start with file://", dsn) } f.logger = logger + f.config = FileConfiguration{} dsn = strings.TrimPrefix(dsn, "file://") @@ -184,23 +191,34 @@ func (f *FileSource) ConfigureByDSN(dsn string, labels map[string]string, logger return errors.Wrap(err, "could not parse file args") } for key, value := range params { - if key != "log_level" { - return fmt.Errorf("unsupported key %s in file DSN", key) + switch key { + case "log_level": + if len(value) != 1 { + return errors.New("expected zero or one value for 'log_level'") + } + lvl, err := log.ParseLevel(value[0]) + if err != nil { + return errors.Wrapf(err, "unknown level %s", value[0]) + } + f.logger.Logger.SetLevel(lvl) + case "max_buffer_size": + if len(value) != 1 { + return errors.New("expected zero or one value for 'max_buffer_size'") + } + maxBufferSize, err := strconv.Atoi(value[0]) + if err != nil { + return errors.Wrapf(err, "could not parse max_buffer_size %s", value[0]) + } + f.config.MaxBufferSize = maxBufferSize + default: + return fmt.Errorf("unknown parameter %s", key) } - if len(value) != 1 { - return errors.New("expected zero or one value for 'log_level'") - } - lvl, err := log.ParseLevel(value[0]) - if err != nil { - return errors.Wrapf(err, "unknown level %s", value[0]) - } - f.logger.Logger.SetLevel(lvl) } } - f.config = FileConfiguration{} f.config.Labels = labels f.config.Mode = configuration.CAT_MODE + f.config.UniqueId = uuid f.logger.Debugf("Will try pattern %s", args[0]) files, err := filepath.Glob(args[0]) @@ -491,6 +509,10 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom scanner = bufio.NewScanner(fd) } scanner.Split(bufio.ScanLines) + if f.config.MaxBufferSize > 0 { + buf := make([]byte, 0, 64*1024) + scanner.Buffer(buf, f.config.MaxBufferSize) + } for scanner.Scan() { if scanner.Text() == "" { continue @@ -509,6 +531,11 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom //we're reading logs at once, it must be time-machine buckets out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE} } + if err := scanner.Err(); err != nil { + logger.Errorf("Error while reading file: %s", err) + t.Kill(err) + return err + } t.Kill(nil) return nil } diff --git a/pkg/acquisition/modules/file/file_test.go b/pkg/acquisition/modules/file/file_test.go index aa055d471..d9ce82312 100644 --- a/pkg/acquisition/modules/file/file_test.go +++ b/pkg/acquisition/modules/file/file_test.go @@ -97,7 +97,7 @@ func TestConfigureDSN(t *testing.T) { tc := tc t.Run(tc.dsn, func(t *testing.T) { f := fileacquisition.FileSource{} - err := f.ConfigureByDSN(tc.dsn, map[string]string{"type": "testtype"}, subLogger) + err := f.ConfigureByDSN(tc.dsn, map[string]string{"type": "testtype"}, subLogger, "") cstest.RequireErrorContains(t, err, tc.expectedErr) }) } diff --git a/pkg/acquisition/modules/journalctl/journalctl.go b/pkg/acquisition/modules/journalctl/journalctl.go index 7bdc3e7bb..9858381e6 100644 --- a/pkg/acquisition/modules/journalctl/journalctl.go +++ b/pkg/acquisition/modules/journalctl/journalctl.go @@ -154,6 +154,10 @@ func (j *JournalCtlSource) runJournalCtl(out chan types.Event, t *tomb.Tomb) err } } +func (j *JournalCtlSource) GetUuid() string { + return j.config.UniqueId +} + func (j *JournalCtlSource) GetMetrics() []prometheus.Collector { return []prometheus.Collector{linesRead} } @@ -200,11 +204,12 @@ func (j *JournalCtlSource) Configure(yamlConfig []byte, logger *log.Entry) error return nil } -func (j *JournalCtlSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error { +func (j *JournalCtlSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error { j.logger = logger j.config = JournalCtlConfiguration{} j.config.Mode = configuration.CAT_MODE j.config.Labels = labels + j.config.UniqueId = uuid //format for the DSN is : journalctl://filters=FILTER1&filters=FILTER2 if !strings.HasPrefix(dsn, "journalctl://") { diff --git a/pkg/acquisition/modules/journalctl/journalctl_test.go b/pkg/acquisition/modules/journalctl/journalctl_test.go index 04280943a..e853f4b50 100644 --- a/pkg/acquisition/modules/journalctl/journalctl_test.go +++ b/pkg/acquisition/modules/journalctl/journalctl_test.go @@ -96,7 +96,7 @@ func TestConfigureDSN(t *testing.T) { }) for _, test := range tests { f := JournalCtlSource{} - err := f.ConfigureByDSN(test.dsn, map[string]string{"type": "testtype"}, subLogger) + err := f.ConfigureByDSN(test.dsn, map[string]string{"type": "testtype"}, subLogger, "") cstest.AssertErrorContains(t, err, test.expectedErr) } } diff --git a/pkg/acquisition/modules/kafka/kafka.go b/pkg/acquisition/modules/kafka/kafka.go index 7372c9e96..af9251163 100644 --- a/pkg/acquisition/modules/kafka/kafka.go +++ b/pkg/acquisition/modules/kafka/kafka.go @@ -54,6 +54,10 @@ type KafkaSource struct { Reader *kafka.Reader } +func (k *KafkaSource) GetUuid() string { + return k.Config.UniqueId +} + func (k *KafkaSource) UnmarshalConfig(yamlConfig []byte) error { k.Config = KafkaConfiguration{} @@ -102,7 +106,7 @@ func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry) error { return nil } -func (k *KafkaSource) ConfigureByDSN(string, map[string]string, *log.Entry) error { +func (k *KafkaSource) ConfigureByDSN(string, map[string]string, *log.Entry, string) error { return fmt.Errorf("%s datasource does not support command-line acquisition", dataSourceName) } diff --git a/pkg/acquisition/modules/kinesis/kinesis.go b/pkg/acquisition/modules/kinesis/kinesis.go index f62cb5c20..7700975b9 100644 --- a/pkg/acquisition/modules/kinesis/kinesis.go +++ b/pkg/acquisition/modules/kinesis/kinesis.go @@ -74,6 +74,10 @@ var linesReadShards = prometheus.NewCounterVec( []string{"stream", "shard"}, ) +func (k *KinesisSource) GetUuid() string { + return k.Config.UniqueId +} + func (k *KinesisSource) newClient() error { var sess *session.Session @@ -161,7 +165,7 @@ func (k *KinesisSource) Configure(yamlConfig []byte, logger *log.Entry) error { return nil } -func (k *KinesisSource) ConfigureByDSN(string, map[string]string, *log.Entry) error { +func (k *KinesisSource) ConfigureByDSN(string, map[string]string, *log.Entry, string) error { return fmt.Errorf("kinesis datasource does not support command-line acquisition") } diff --git a/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go b/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go index 5281aefd1..e189c413a 100644 --- a/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go +++ b/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go @@ -48,6 +48,10 @@ var requestCount = prometheus.NewCounterVec( }, []string{"source"}) +func (ka *KubernetesAuditSource) GetUuid() string { + return ka.config.UniqueId +} + func (ka *KubernetesAuditSource) GetMetrics() []prometheus.Collector { return []prometheus.Collector{eventCount, requestCount} } @@ -105,7 +109,7 @@ func (ka *KubernetesAuditSource) Configure(config []byte, logger *log.Entry) err return nil } -func (ka *KubernetesAuditSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error { +func (ka *KubernetesAuditSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error { return fmt.Errorf("k8s-audit datasource does not support command-line acquisition") } diff --git a/pkg/acquisition/modules/s3/s3.go b/pkg/acquisition/modules/s3/s3.go index d30bc84b6..1428138f7 100644 --- a/pkg/acquisition/modules/s3/s3.go +++ b/pkg/acquisition/modules/s3/s3.go @@ -2,12 +2,15 @@ package s3acquisition import ( "bufio" + "bytes" "compress/gzip" "context" "encoding/json" "fmt" + "io" "net/url" "sort" + "strconv" "strings" "time" @@ -22,7 +25,6 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/types" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" "gopkg.in/tomb.v2" "gopkg.in/yaml.v2" @@ -40,6 +42,7 @@ type S3Configuration struct { PollingInterval int `yaml:"polling_interval"` SQSName string `yaml:"sqs_name"` SQSFormat string `yaml:"sqs_format"` + MaxBufferSize int `yaml:"max_buffer_size"` } type S3Source struct { @@ -138,10 +141,12 @@ func (s *S3Source) newS3Client() error { if s.Config.AwsEndpoint != "" { config = config.WithEndpoint(s.Config.AwsEndpoint) } + s.s3Client = s3.New(sess, config) if s.s3Client == nil { return fmt.Errorf("failed to create S3 client") } + return nil } @@ -372,7 +377,7 @@ func (s *S3Source) readFile(bucket string, key string) error { //TODO: Handle SSE-C var scanner *bufio.Scanner - logger := s.logger.WithFields(logrus.Fields{ + logger := s.logger.WithFields(log.Fields{ "method": "readFile", "bucket": bucket, "key": key, @@ -382,20 +387,36 @@ func (s *S3Source) readFile(bucket string, key string) error { Bucket: aws.String(bucket), Key: aws.String(key), }) + if err != nil { return fmt.Errorf("failed to get object %s/%s: %w", bucket, key, err) } defer output.Body.Close() + if strings.HasSuffix(key, ".gz") { - gzReader, err := gzip.NewReader(output.Body) + //This *might* be a gzipped file, but sometimes the SDK will decompress the data for us (it's not clear when it happens, only had the issue with cloudtrail logs) + header := make([]byte, 2) + _, err := output.Body.Read(header) if err != nil { - return fmt.Errorf("failed to read gzip object %s/%s: %w", bucket, key, err) + return fmt.Errorf("failed to read header of object %s/%s: %w", bucket, key, err) + } + if header[0] == 0x1f && header[1] == 0x8b { + gz, err := gzip.NewReader(io.MultiReader(bytes.NewReader(header), output.Body)) + if err != nil { + return fmt.Errorf("failed to create gzip reader for object %s/%s: %w", bucket, key, err) + } + scanner = bufio.NewScanner(gz) + } else { + scanner = bufio.NewScanner(io.MultiReader(bytes.NewReader(header), output.Body)) } - defer gzReader.Close() - scanner = bufio.NewScanner(gzReader) } else { scanner = bufio.NewScanner(output.Body) } + if s.Config.MaxBufferSize > 0 { + s.logger.Infof("Setting max buffer size to %d", s.Config.MaxBufferSize) + buf := make([]byte, 0, bufio.MaxScanTokenSize) + scanner.Buffer(buf, s.Config.MaxBufferSize) + } for scanner.Scan() { text := scanner.Text() logger.Tracef("Read line %s", text) @@ -422,6 +443,10 @@ func (s *S3Source) readFile(bucket string, key string) error { return nil } +func (s *S3Source) GetUuid() string { + return s.Config.UniqueId +} + func (s *S3Source) GetMetrics() []prometheus.Collector { return []prometheus.Collector{linesRead, objectsRead, sqsMessagesReceived} } @@ -446,6 +471,10 @@ func (s *S3Source) UnmarshalConfig(yamlConfig []byte) error { s.Config.PollingInterval = 60 } + if s.Config.MaxBufferSize == 0 { + s.Config.MaxBufferSize = bufio.MaxScanTokenSize + } + if s.Config.PollingMethod != PollMethodList && s.Config.PollingMethod != PollMethodSQS { return fmt.Errorf("invalid polling method %s", s.Config.PollingMethod) } @@ -509,16 +538,16 @@ func (s *S3Source) Configure(yamlConfig []byte, logger *log.Entry) error { return nil } -func (s *S3Source) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error { +func (s *S3Source) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error { if !strings.HasPrefix(dsn, "s3://") { return fmt.Errorf("invalid DSN %s for S3 source, must start with s3://", dsn) } + s.Config = S3Configuration{} s.logger = logger.WithFields(log.Fields{ "bucket": s.Config.BucketName, "prefix": s.Config.Prefix, }) - dsn = strings.TrimPrefix(dsn, "s3://") args := strings.Split(dsn, "?") if len(args[0]) == 0 { @@ -531,23 +560,35 @@ func (s *S3Source) ConfigureByDSN(dsn string, labels map[string]string, logger * return errors.Wrap(err, "could not parse s3 args") } for key, value := range params { - if key != "log_level" { - return fmt.Errorf("unsupported key %s in s3 DSN", key) + switch key { + case "log_level": + if len(value) != 1 { + return errors.New("expected zero or one value for 'log_level'") + } + lvl, err := log.ParseLevel(value[0]) + if err != nil { + return errors.Wrapf(err, "unknown level %s", value[0]) + } + s.logger.Logger.SetLevel(lvl) + case "max_buffer_size": + if len(value) != 1 { + return errors.New("expected zero or one value for 'max_buffer_size'") + } + maxBufferSize, err := strconv.Atoi(value[0]) + if err != nil { + return errors.Wrapf(err, "invalid value for 'max_buffer_size'") + } + s.logger.Debugf("Setting max buffer size to %d", maxBufferSize) + s.Config.MaxBufferSize = maxBufferSize + default: + return fmt.Errorf("unknown parameter %s", key) } - if len(value) != 1 { - return errors.New("expected zero or one value for 'log_level'") - } - lvl, err := log.ParseLevel(value[0]) - if err != nil { - return errors.Wrapf(err, "unknown level %s", value[0]) - } - s.logger.Logger.SetLevel(lvl) } } - s.Config = S3Configuration{} s.Config.Labels = labels s.Config.Mode = configuration.CAT_MODE + s.Config.UniqueId = uuid pathParts := strings.Split(args[0], "/") s.logger.Debugf("pathParts: %v", pathParts) @@ -587,6 +628,7 @@ func (s *S3Source) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error s.logger.Infof("starting acquisition of %s/%s/%s", s.Config.BucketName, s.Config.Prefix, s.Config.Key) s.out = out s.ctx, s.cancel = context.WithCancel(context.Background()) + s.Config.UseTimeMachine = true if s.Config.Key != "" { err := s.readFile(s.Config.BucketName, s.Config.Key) if err != nil { @@ -605,6 +647,7 @@ func (s *S3Source) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error } } } + t.Kill(nil) return nil } diff --git a/pkg/acquisition/modules/s3/s3_test.go b/pkg/acquisition/modules/s3/s3_test.go index 5f7deeda1..02423b139 100644 --- a/pkg/acquisition/modules/s3/s3_test.go +++ b/pkg/acquisition/modules/s3/s3_test.go @@ -234,7 +234,7 @@ func TestDSNAcquis(t *testing.T) { linesRead := 0 f := S3Source{} logger := log.NewEntry(log.New()) - err := f.ConfigureByDSN(test.dsn, map[string]string{"foo": "bar"}, logger) + err := f.ConfigureByDSN(test.dsn, map[string]string{"foo": "bar"}, logger, "") if err != nil { t.Fatalf("unexpected error: %s", err.Error()) } @@ -257,7 +257,8 @@ func TestDSNAcquis(t *testing.T) { }() f.s3Client = mockS3Client{} - err = f.OneShotAcquisition(out, nil) + tmb := tomb.Tomb{} + err = f.OneShotAcquisition(out, &tmb) if err != nil { t.Fatalf("unexpected error: %s", err.Error()) } diff --git a/pkg/acquisition/modules/syslog/syslog.go b/pkg/acquisition/modules/syslog/syslog.go index ff18a9b8d..cc93c3e2b 100644 --- a/pkg/acquisition/modules/syslog/syslog.go +++ b/pkg/acquisition/modules/syslog/syslog.go @@ -48,6 +48,10 @@ var linesParsed = prometheus.NewCounterVec( }, []string{"source", "type"}) +func (s *SyslogSource) GetUuid() string { + return s.config.UniqueId +} + func (s *SyslogSource) GetName() string { return "syslog" } @@ -72,7 +76,7 @@ func (s *SyslogSource) GetAggregMetrics() []prometheus.Collector { return []prometheus.Collector{linesReceived, linesParsed} } -func (s *SyslogSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error { +func (s *SyslogSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error { return fmt.Errorf("syslog datasource does not support one shot acquisition") } diff --git a/pkg/acquisition/modules/wineventlog/wineventlog.go b/pkg/acquisition/modules/wineventlog/wineventlog.go index 6febf7b8a..f0eca5d13 100644 --- a/pkg/acquisition/modules/wineventlog/wineventlog.go +++ b/pkg/acquisition/modules/wineventlog/wineventlog.go @@ -15,6 +15,10 @@ import ( type WinEventLogSource struct{} +func (w *WinEventLogSource) GetUuid() string { + return "" +} + func (w *WinEventLogSource) UnmarshalConfig(yamlConfig []byte) error { return nil } @@ -23,7 +27,7 @@ func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry) erro return nil } -func (w *WinEventLogSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error { +func (w *WinEventLogSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error { return nil } diff --git a/pkg/acquisition/modules/wineventlog/wineventlog_windows.go b/pkg/acquisition/modules/wineventlog/wineventlog_windows.go index f9da08e4b..ba5829d8a 100644 --- a/pkg/acquisition/modules/wineventlog/wineventlog_windows.go +++ b/pkg/acquisition/modules/wineventlog/wineventlog_windows.go @@ -228,6 +228,10 @@ func (w *WinEventLogSource) generateConfig(query string) (*winlog.SubscribeConfi return &config, nil } +func (w *WinEventLogSource) GetUuid() string { + return w.config.UniqueId +} + func (w *WinEventLogSource) UnmarshalConfig(yamlConfig []byte) error { w.config = WinEventLogConfiguration{} @@ -280,7 +284,7 @@ func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry) erro return nil } -func (w *WinEventLogSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error { +func (w *WinEventLogSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error { return nil }