From 4917aa23c944da9613fa30136a59ce6a6df4a456 Mon Sep 17 00:00:00 2001 From: AlteredCoder <64792091+AlteredCoder@users.noreply.github.com> Date: Thu, 2 Dec 2021 15:55:50 +0100 Subject: [PATCH] Docker datasource (#1064) * add docker datasource --- cmd/crowdsec-cli/explain.go | 2 +- go.mod | 5 +- go.sum | 2 + pkg/acquisition/acquisition.go | 24 +- pkg/acquisition/modules/docker/docker.go | 518 ++++++++++++++++++ pkg/acquisition/modules/docker/docker_test.go | 330 +++++++++++ pkg/csconfig/crowdsec_service.go | 6 + 7 files changed, 874 insertions(+), 13 deletions(-) create mode 100644 pkg/acquisition/modules/docker/docker.go create mode 100644 pkg/acquisition/modules/docker/docker_test.go diff --git a/cmd/crowdsec-cli/explain.go b/cmd/crowdsec-cli/explain.go index eb21980ff..4c8b190b3 100644 --- a/cmd/crowdsec-cli/explain.go +++ b/cmd/crowdsec-cli/explain.go @@ -28,7 +28,7 @@ Explain log pipeline Example: ` cscli explain --file ./myfile.log --type nginx cscli explain --log "Sep 19 18:33:22 scw-d95986 sshd[24347]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=1.2.3.4" --type syslog -cscli explain -dsn "file://myfile.log" --type nginx +cscli explain --dsn "file://myfile.log" --type nginx `, Args: cobra.ExactArgs(0), DisableAutoGenTag: true, diff --git a/go.mod b/go.mod index 005c409e0..03c4c5fc8 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/Masterminds/semver v1.5.0 // indirect github.com/Masterminds/sprig v2.22.0+incompatible github.com/Microsoft/go-winio v0.4.16 // indirect + github.com/ahmetb/dlog v0.0.0-20170105205344-4fb5f8204f26 github.com/alexliesenfeld/health v0.5.1 github.com/antonmedv/expr v1.8.9 github.com/appleboy/gin-jwt/v2 v2.6.4 @@ -24,7 +25,7 @@ require ( github.com/docker/docker v20.10.2+incompatible github.com/docker/go-connections v0.4.0 github.com/enescakir/emoji v1.0.0 - github.com/fatih/color v1.13.0 // indirect + github.com/fatih/color v1.13.0 github.com/fsnotify/fsnotify v1.4.9 github.com/gin-gonic/gin v1.6.3 github.com/go-co-op/gocron v1.9.0 @@ -62,7 +63,7 @@ require ( github.com/prometheus/client_golang v1.10.0 github.com/prometheus/client_model v0.2.0 github.com/prometheus/prom2json v1.3.0 - github.com/r3labs/diff/v2 v2.14.1 // indirect + github.com/r3labs/diff/v2 v2.14.1 github.com/rivo/uniseg v0.2.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/sirupsen/logrus v1.8.1 diff --git a/go.sum b/go.sum index 50e3a634f..1c0c276eb 100644 --- a/go.sum +++ b/go.sum @@ -45,6 +45,8 @@ github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMx github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM= +github.com/ahmetb/dlog v0.0.0-20170105205344-4fb5f8204f26 h1:3YVZUqkoev4mL+aCwVOSWV4M7pN+NURHL38Z2zq5JKA= +github.com/ahmetb/dlog v0.0.0-20170105205344-4fb5f8204f26/go.mod h1:ymXt5bw5uSNu4jveerFxE0vNYxF8ncqbptntMaFMg3k= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= diff --git a/pkg/acquisition/acquisition.go b/pkg/acquisition/acquisition.go index 94e953c08..8bd04b86f 100644 --- a/pkg/acquisition/acquisition.go +++ b/pkg/acquisition/acquisition.go @@ -8,10 +8,10 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" cloudwatchacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/cloudwatch" + dockeracquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/docker" fileacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/file" journalctlacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/journalctl" syslogacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog" - "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/types" "github.com/pkg/errors" @@ -24,15 +24,15 @@ import ( // The interface each datasource must implement type DataSource interface { - GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module - GetAggregMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module (aggregated mode, limits cardinality) - Configure([]byte, *log.Entry) error // Configure the datasource - ConfigureByDSN(string, map[string]string, *log.Entry) error // Configure the datasource - GetMode() string // Get the mode (TAIL, CAT or SERVER) - GetName() string // Get the name of the module - OneShotAcquisition(chan types.Event, *tomb.Tomb) error // Start one shot acquisition(eg, cat a file) - StreamingAcquisition(chan types.Event, *tomb.Tomb) error // Start live acquisition (eg, tail a file) - CanRun() error // Whether the datasource can run or not (eg, journalctl on BSD is a non-sense) + GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module + GetAggregMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module (aggregated mode, limits cardinality) + Configure([]byte, *log.Entry) error // Configure the datasource + ConfigureByDSN(string, map[string]string, *log.Entry) error // Configure the datasource + GetMode() string // Get the mode (TAIL, CAT or SERVER) + GetName() string // Get the name of the module + OneShotAcquisition(chan types.Event, *tomb.Tomb) error // Start one shot acquisition(eg, cat a file) + StreamingAcquisition(chan types.Event, *tomb.Tomb) error // Start live acquisition (eg, tail a file) + CanRun() error // Whether the datasource can run or not (eg, journalctl on BSD is a non-sense) Dump() interface{} } @@ -56,6 +56,10 @@ var AcquisitionSources = []struct { name: "syslog", iface: func() DataSource { return &syslogacquisition.SyslogSource{} }, }, + { + name: "docker", + iface: func() DataSource { return &dockeracquisition.DockerSource{} }, + }, } func GetDataSourceIface(dataSourceType string) DataSource { diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go new file mode 100644 index 000000000..bf883c653 --- /dev/null +++ b/pkg/acquisition/modules/docker/docker.go @@ -0,0 +1,518 @@ +package dockeracquisition + +import ( + "bufio" + "context" + "fmt" + "net/url" + "regexp" + "strconv" + "strings" + "time" + + "github.com/ahmetb/dlog" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" + leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" + "github.com/crowdsecurity/crowdsec/pkg/types" + dockerTypes "github.com/docker/docker/api/types" + "github.com/docker/docker/client" + + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" + "gopkg.in/tomb.v2" + "gopkg.in/yaml.v2" +) + +var linesRead = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "cs_dockersource_hits_total", + Help: "Total lines that were read.", + }, + []string{"source"}) + +type DockerConfiguration struct { + CheckInterval string `yaml:"check_interval"` + FollowStdout bool `yaml:"follow_stdout"` + FollowStdErr bool `yaml:"follow_stderr"` + Until string `yaml:"until"` + Since string `yaml:"since"` + DockerHost string `yaml:"docker_host"` + ContainerName []string `yaml:"container_name"` + ContainerID []string `yaml:"container_id"` + ContainerNameRegexp []string `yaml:"container_name_regexp"` + ContainerIDRegexp []string `yaml:"container_id_regexp"` + ForceInotify bool `yaml:"force_inotify"` + configuration.DataSourceCommonCfg `yaml:",inline"` +} + +type DockerSource struct { + Config DockerConfiguration + runningContainerState map[string]*ContainerConfig + compiledContainerName []*regexp.Regexp + compiledContainerID []*regexp.Regexp + CheckIntervalDuration time.Duration + logger *log.Entry + Client client.CommonAPIClient + t *tomb.Tomb + containerLogsOptions *dockerTypes.ContainerLogsOptions +} + +type ContainerConfig struct { + Name string + ID string + t *tomb.Tomb + logger *log.Entry + Labels map[string]string +} + +func (d *DockerSource) Configure(Config []byte, logger *log.Entry) error { + var err error + + d.Config = DockerConfiguration{ + FollowStdout: true, // default + FollowStdErr: true, // default + CheckInterval: "1s", // default + } + d.logger = logger + + d.runningContainerState = make(map[string]*ContainerConfig) + + err = yaml.UnmarshalStrict(Config, &d.Config) + if err != nil { + return errors.Wrap(err, "Cannot parse DockerAcquisition configuration") + } + + d.logger.Tracef("DockerAcquisition configuration: %+v", d.Config) + if len(d.Config.ContainerName) == 0 && len(d.Config.ContainerID) == 0 && len(d.Config.ContainerIDRegexp) == 0 && len(d.Config.ContainerNameRegexp) == 0 { + return fmt.Errorf("no containers names or containers ID configuration provided") + } + + d.CheckIntervalDuration, err = time.ParseDuration(d.Config.CheckInterval) + if err != nil { + return fmt.Errorf("parsing 'check_interval' parameters: %s", d.CheckIntervalDuration) + } + + if d.Config.Mode == "" { + d.Config.Mode = configuration.TAIL_MODE + } + if d.Config.Mode != configuration.CAT_MODE && d.Config.Mode != configuration.TAIL_MODE { + return fmt.Errorf("unsupported mode %s for docker datasource", d.Config.Mode) + } + d.logger.Tracef("Actual DockerAcquisition configuration %+v", d.Config) + + for _, cont := range d.Config.ContainerNameRegexp { + d.compiledContainerName = append(d.compiledContainerName, regexp.MustCompile(cont)) + } + + for _, cont := range d.Config.ContainerIDRegexp { + d.compiledContainerID = append(d.compiledContainerID, regexp.MustCompile(cont)) + } + + dockerClient, err := client.NewClientWithOpts(client.FromEnv) + if err != nil { + return err + } + + if d.Config.Since == "" { + d.Config.Since = time.Now().Format(time.RFC3339) + } + + d.containerLogsOptions = &dockerTypes.ContainerLogsOptions{ + ShowStdout: d.Config.FollowStdout, + ShowStderr: d.Config.FollowStdErr, + Follow: true, + Since: d.Config.Since, + } + + if d.Config.Until != "" { + d.containerLogsOptions.Until = d.Config.Until + } + + if d.Config.DockerHost != "" { + if err := client.WithHost(d.Config.DockerHost)(dockerClient); err != nil { + return err + } + } + d.Client = dockerClient + + return nil +} + +func (d *DockerSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error { + var err error + + if !strings.HasPrefix(dsn, d.GetName()+"://") { + return fmt.Errorf("invalid DSN %s for docker source, must start with %s://", dsn, d.GetName()) + } + + d.Config = DockerConfiguration{ + FollowStdout: true, + FollowStdErr: true, + CheckInterval: "1s", + } + d.Config.ContainerName = make([]string, 0) + d.Config.ContainerID = make([]string, 0) + d.runningContainerState = make(map[string]*ContainerConfig) + d.Config.Mode = configuration.CAT_MODE + d.logger = logger + d.Config.Labels = labels + + dockerClient, err := client.NewClientWithOpts(client.FromEnv) + if err != nil { + return err + } + + d.containerLogsOptions = &dockerTypes.ContainerLogsOptions{ + ShowStdout: d.Config.FollowStdout, + ShowStderr: d.Config.FollowStdErr, + Follow: false, + } + dsn = strings.TrimPrefix(dsn, d.GetName()+"://") + args := strings.Split(dsn, "?") + + if len(args) == 0 { + return fmt.Errorf("invalid dsn: %s", dsn) + } + + if len(args) == 1 && args[0] == "" { + return fmt.Errorf("empty %s DSN", d.GetName()+"://") + } + d.Config.ContainerName = append(d.Config.ContainerName, args[0]) + // we add it as an ID also so user can provide docker name or docker ID + d.Config.ContainerID = append(d.Config.ContainerID, args[0]) + + // no parameters + if len(args) == 1 { + d.Client = dockerClient + return nil + } + + parameters, err := url.ParseQuery(args[1]) + if err != nil { + return errors.Wrapf(err, "while parsing parameters %s: %s", dsn, err) + } + + for k, v := range parameters { + switch k { + case "log_level": + if len(v) != 1 { + return fmt.Errorf("only one 'log_level' parameters is required, not many") + } + lvl, err := log.ParseLevel(v[0]) + if err != nil { + return errors.Wrapf(err, "unknown level %s", v[0]) + } + d.logger.Logger.SetLevel(lvl) + case "until": + if len(v) != 1 { + return fmt.Errorf("only one 'until' parameters is required, not many") + } + d.containerLogsOptions.Until = v[0] + case "since": + if len(v) != 1 { + return fmt.Errorf("only one 'since' parameters is required, not many") + } + d.containerLogsOptions.Since = v[0] + case "follow_stdout": + if len(v) != 1 { + return fmt.Errorf("only one 'follow_stdout' parameters is required, not many") + } + followStdout, err := strconv.ParseBool(v[0]) + if err != nil { + return fmt.Errorf("parsing 'follow_stdout' parameters: %s", err) + } + d.Config.FollowStdout = followStdout + d.containerLogsOptions.ShowStdout = followStdout + case "follow_stderr": + if len(v) != 1 { + return fmt.Errorf("only one 'follow_stderr' parameters is required, not many") + } + followStdErr, err := strconv.ParseBool(v[0]) + if err != nil { + return fmt.Errorf("parsing 'follow_stderr' parameters: %s", err) + } + d.Config.FollowStdErr = followStdErr + d.containerLogsOptions.ShowStderr = followStdErr + case "docker_host": + if len(v) != 1 { + return fmt.Errorf("only one 'docker_host' parameters is required, not many") + } + if err := client.WithHost(v[0])(dockerClient); err != nil { + return err + } + } + } + d.Client = dockerClient + return nil +} + +func (d *DockerSource) GetMode() string { + return d.Config.Mode +} + +//SupportedModes returns the supported modes by the acquisition module +func (d *DockerSource) SupportedModes() []string { + return []string{configuration.TAIL_MODE, configuration.CAT_MODE} +} + +//OneShotAcquisition reads a set of file and returns when done +func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { + d.logger.Debug("In oneshot") + runningContainer, err := d.Client.ContainerList(context.Background(), dockerTypes.ContainerListOptions{}) + if err != nil { + return err + } + foundOne := false + for _, container := range runningContainer { + if _, ok := d.runningContainerState[container.ID]; ok { + d.logger.Debugf("container with id %s is already being read from", container.ID) + continue + } + if containerConfig, ok := d.EvalContainer(container); ok { + d.logger.Infof("reading logs from container %s", containerConfig.Name) + d.logger.Debugf("logs options: %+v", *d.containerLogsOptions) + dockerReader, err := d.Client.ContainerLogs(context.Background(), containerConfig.ID, *d.containerLogsOptions) + if err != nil { + d.logger.Errorf("unable to read logs from container: %+v", err) + return err + } + // we use this library to normalize docker API logs (cf. https://ahmet.im/blog/docker-logs-api-binary-format-explained/) + reader := dlog.NewReader(dockerReader) + foundOne = true + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + line := scanner.Text() + if line == "" { + continue + } + l := types.Line{} + l.Raw = line + l.Labels = d.Config.Labels + l.Time = time.Now() + l.Src = containerConfig.Name + l.Process = true + l.Module = d.GetName() + linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc() + evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE} + out <- evt + d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw) + } + d.runningContainerState[container.ID] = containerConfig + } + } + + t.Kill(nil) + + if !foundOne { + return fmt.Errorf("no docker found, can't run one shot acquisition") + } + + return nil +} + +func (d *DockerSource) GetMetrics() []prometheus.Collector { + return []prometheus.Collector{linesRead} +} + +func (d *DockerSource) GetAggregMetrics() []prometheus.Collector { + return []prometheus.Collector{linesRead} +} + +func (d *DockerSource) GetName() string { + return "docker" +} + +func (d *DockerSource) CanRun() error { + return nil +} + +func (d *DockerSource) EvalContainer(container dockerTypes.Container) (*ContainerConfig, bool) { + for _, containerID := range d.Config.ContainerID { + if containerID == container.ID { + return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels}, true + } + } + + for _, containerName := range d.Config.ContainerName { + for _, name := range container.Names { + if strings.HasPrefix(name, "/") && len(name) > 0 { + name = name[1:] + } + if name == containerName { + return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels}, true + } + } + + } + + for _, cont := range d.compiledContainerID { + if matched := cont.Match([]byte(container.ID)); matched { + return &ContainerConfig{ID: container.ID, Name: container.Names[0], Labels: d.Config.Labels}, true + } + } + + for _, cont := range d.compiledContainerName { + for _, name := range container.Names { + if matched := cont.Match([]byte(name)); matched { + return &ContainerConfig{ID: container.ID, Name: name, Labels: d.Config.Labels}, true + } + } + + } + + return &ContainerConfig{}, false +} + +func (d *DockerSource) WatchContainer(monitChan chan *ContainerConfig, deleteChan chan *ContainerConfig) error { + ticker := time.NewTicker(d.CheckIntervalDuration) + d.logger.Infof("Container watcher started, interval: %s", d.CheckIntervalDuration.String()) + for { + select { + case <-d.t.Dying(): + d.logger.Infof("stopping container watcher") + return nil + case <-ticker.C: + // to track for garbage collection + runningContainersID := make(map[string]bool) + runningContainer, err := d.Client.ContainerList(context.Background(), dockerTypes.ContainerListOptions{}) + if err != nil { + if strings.Contains(strings.ToLower(err.Error()), "cannot connect to the docker daemon at") { + for idx, container := range d.runningContainerState { + if d.runningContainerState[idx].t.Alive() { + d.logger.Infof("killing tail for container %s", container.Name) + d.runningContainerState[idx].t.Kill(nil) + if err := d.runningContainerState[idx].t.Wait(); err != nil { + d.logger.Infof("error while waiting for death of %s : %s", container.Name, err) + } + } + delete(d.runningContainerState, idx) + } + } else { + log.Debugf("container list err: %s", err.Error()) + } + continue + } + + for _, container := range runningContainer { + runningContainersID[container.ID] = true + + // don't need to re eval an already monitored container + if _, ok := d.runningContainerState[container.ID]; ok { + continue + } + if containerConfig, ok := d.EvalContainer(container); ok { + monitChan <- containerConfig + } + } + + for containerStateID, containerConfig := range d.runningContainerState { + if _, ok := runningContainersID[containerStateID]; !ok { + deleteChan <- containerConfig + } + } + d.logger.Tracef("Reading logs from %d containers", len(d.runningContainerState)) + + ticker.Reset(d.CheckIntervalDuration) + } + } +} + +func (d *DockerSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error { + d.t = t + monitChan := make(chan *ContainerConfig) + deleteChan := make(chan *ContainerConfig) + d.logger.Infof("Starting docker acquisition") + t.Go(func() error { + return d.DockerManager(monitChan, deleteChan, out) + }) + + return d.WatchContainer(monitChan, deleteChan) +} + +func (d *DockerSource) Dump() interface{} { + return d +} + +func ReadTailScanner(scanner *bufio.Scanner, out chan string, t *tomb.Tomb) error { + for scanner.Scan() { + out <- scanner.Text() + } + return nil +} + +func (d *DockerSource) TailDocker(container *ContainerConfig, outChan chan types.Event) error { + container.logger.Infof("start tail for container %s", container.Name) + dockerReader, err := d.Client.ContainerLogs(context.Background(), container.ID, *d.containerLogsOptions) + if err != nil { + container.logger.Errorf("unable to read logs from container: %+v", err) + return err + } + // we use this library to normalize docker API logs (cf. https://ahmet.im/blog/docker-logs-api-binary-format-explained/) + reader := dlog.NewReader(dockerReader) + scanner := bufio.NewScanner(reader) + readerChan := make(chan string) + readerTomb := &tomb.Tomb{} + readerTomb.Go(func() error { + return ReadTailScanner(scanner, readerChan, readerTomb) + }) + for { + select { + case <-container.t.Dying(): + readerTomb.Kill(nil) + container.logger.Infof("tail stopped for container %s", container.Name) + return nil + case line := <-readerChan: + if line == "" { + continue + } + l := types.Line{} + l.Raw = line + l.Labels = d.Config.Labels + l.Time = time.Now() + l.Src = container.Name + l.Process = true + l.Module = d.GetName() + evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE} + linesRead.With(prometheus.Labels{"source": container.Name}).Inc() + outChan <- evt + d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw) + } + } +} + +func (d *DockerSource) DockerManager(in chan *ContainerConfig, deleteChan chan *ContainerConfig, outChan chan types.Event) error { + d.logger.Info("DockerSource Manager started") + for { + select { + case newContainer := <-in: + if _, ok := d.runningContainerState[newContainer.ID]; !ok { + newContainer.t = &tomb.Tomb{} + newContainer.logger = d.logger.WithFields(log.Fields{"container_name": newContainer.Name}) + newContainer.t.Go(func() error { + return d.TailDocker(newContainer, outChan) + }) + d.runningContainerState[newContainer.ID] = newContainer + } + case containerToDelete := <-deleteChan: + if containerConfig, ok := d.runningContainerState[containerToDelete.ID]; ok { + log.Infof("container acquisition stopped for container '%s'", containerConfig.Name) + containerConfig.t.Kill(nil) + delete(d.runningContainerState, containerToDelete.ID) + } + case <-d.t.Dying(): + for idx, container := range d.runningContainerState { + if d.runningContainerState[idx].t.Alive() { + d.logger.Infof("killing tail for container %s", container.Name) + d.runningContainerState[idx].t.Kill(nil) + if err := d.runningContainerState[idx].t.Wait(); err != nil { + d.logger.Infof("error while waiting for death of %s : %s", container.Name, err) + } + } + } + d.runningContainerState = nil + d.logger.Debugf("routine cleanup done, return") + return nil + } + } +} diff --git a/pkg/acquisition/modules/docker/docker_test.go b/pkg/acquisition/modules/docker/docker_test.go new file mode 100644 index 000000000..d6b53c4af --- /dev/null +++ b/pkg/acquisition/modules/docker/docker_test.go @@ -0,0 +1,330 @@ +package dockeracquisition + +import ( + "context" + "fmt" + "io" + "os" + "strings" + "testing" + "time" + + "github.com/crowdsecurity/crowdsec/pkg/types" + dockerTypes "github.com/docker/docker/api/types" + "github.com/docker/docker/client" + log "github.com/sirupsen/logrus" + "gopkg.in/tomb.v2" + + "github.com/stretchr/testify/assert" +) + +const testContainerName = "docker_test" + +func TestConfigure(t *testing.T) { + log.Infof("Test 'TestConfigure'") + + tests := []struct { + config string + expectedErr string + }{ + { + config: `foobar: asd`, + expectedErr: "line 1: field foobar not found in type dockeracquisition.DockerConfiguration", + }, + { + config: ` +mode: tail +source: docker`, + expectedErr: "no containers names or containers ID configuration provided", + }, + { + config: ` +mode: cat +source: docker +container_name: + - toto`, + expectedErr: "", + }, + } + + subLogger := log.WithFields(log.Fields{ + "type": "docker", + }) + for _, test := range tests { + f := DockerSource{} + err := f.Configure([]byte(test.config), subLogger) + if test.expectedErr != "" && err == nil { + t.Fatalf("Expected err %s but got nil !", test.expectedErr) + } + if test.expectedErr != "" { + assert.Contains(t, err.Error(), test.expectedErr) + } + } +} + +func TestConfigureDSN(t *testing.T) { + log.Infof("Test 'TestConfigureDSN'") + + tests := []struct { + name string + dsn string + expectedErr string + }{ + { + name: "invalid DSN", + dsn: "asd://", + expectedErr: "invalid DSN asd:// for docker source, must start with docker://", + }, + { + name: "empty DSN", + dsn: "docker://", + expectedErr: "empty docker:// DSN", + }, + { + name: "DSN ok with log_level", + dsn: "docker://test_docker?log_level=warn", + expectedErr: "", + }, + { + name: "DSN invalid log_level", + dsn: "docker://test_docker?log_level=foobar", + expectedErr: "unknown level foobar: not a valid logrus Level:", + }, + { + name: "DSN ok with multiple parameters", + dsn: "docker://test_docker?since=42min&docker_host=unix:///var/run/podman/podman.sock", + expectedErr: "", + }, + } + subLogger := log.WithFields(log.Fields{ + "type": "docker", + }) + for _, test := range tests { + f := DockerSource{} + err := f.ConfigureByDSN(test.dsn, map[string]string{"type": "testtype"}, subLogger) + if test.expectedErr != "" { + assert.Contains(t, err.Error(), test.expectedErr) + } else { + assert.Equal(t, err, nil) + } + } +} + +type mockDockerCli struct { + client.Client +} + +func TestStreamingAcquisition(t *testing.T) { + log.SetOutput(os.Stdout) + log.SetLevel(log.InfoLevel) + log.Info("Test 'TestStreamingAcquisition'") + tests := []struct { + config string + expectedErr string + expectedOutput string + expectedLines int + logType string + logLevel log.Level + }{ + { + config: ` +source: docker +mode: cat +container_name: + - docker_test`, + expectedErr: "", + expectedOutput: "", + expectedLines: 3, + logType: "test", + logLevel: log.InfoLevel, + }, + { + config: ` +source: docker +mode: cat +container_name_regexp: + - docker_*`, + expectedErr: "", + expectedOutput: "", + expectedLines: 3, + logType: "test", + logLevel: log.InfoLevel, + }, + } + + for _, ts := range tests { + var logger *log.Logger + var subLogger *log.Entry + if ts.expectedOutput != "" { + logger.SetLevel(ts.logLevel) + subLogger = logger.WithFields(log.Fields{ + "type": "docker", + }) + } else { + subLogger = log.WithFields(log.Fields{ + "type": "docker", + }) + } + + dockerTomb := tomb.Tomb{} + out := make(chan types.Event) + dockerSource := DockerSource{} + err := dockerSource.Configure([]byte(ts.config), subLogger) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + dockerSource.Client = new(mockDockerCli) + actualLines := 0 + readerTomb := &tomb.Tomb{} + streamTomb := tomb.Tomb{} + streamTomb.Go(func() error { + return dockerSource.StreamingAcquisition(out, &dockerTomb) + }) + readerTomb.Go(func() error { + time.Sleep(1 * time.Second) + ticker := time.NewTicker(1 * time.Second) + for { + select { + case <-out: + actualLines++ + ticker.Reset(1 * time.Second) + case <-ticker.C: + log.Infof("no more line to read") + readerTomb.Kill(nil) + return nil + } + } + }) + time.Sleep(10 * time.Second) + if ts.expectedErr == "" && err != nil { + t.Fatalf("Unexpected error : %s", err) + } else if ts.expectedErr != "" && err != nil { + assert.Contains(t, err.Error(), ts.expectedErr) + continue + } else if ts.expectedErr != "" && err == nil { + t.Fatalf("Expected error %s, but got nothing !", ts.expectedErr) + } + if err := readerTomb.Wait(); err != nil { + t.Fatal(err) + } + //time.Sleep(4 * time.Second) + if ts.expectedLines != 0 { + assert.Equal(t, ts.expectedLines, actualLines) + } + dockerSource.t.Kill(nil) + err = streamTomb.Wait() + if err != nil { + t.Fatalf("docker acquisition error: %s", err) + } + } + +} + +func (cli *mockDockerCli) ContainerList(ctx context.Context, options dockerTypes.ContainerListOptions) ([]dockerTypes.Container, error) { + containers := make([]dockerTypes.Container, 0) + container := &dockerTypes.Container{ + ID: "12456", + Names: []string{testContainerName}, + } + containers = append(containers, *container) + + return containers, nil +} + +func (cli *mockDockerCli) ContainerLogs(ctx context.Context, container string, options dockerTypes.ContainerLogsOptions) (io.ReadCloser, error) { + startLineByte := "\x01\x00\x00\x00\x00\x00\x00\x1f" + data := []string{"docker", "test", "1234"} + ret := "" + for _, line := range data { + ret += fmt.Sprintf("%s%s\n", startLineByte, line) + } + r := io.NopCloser(strings.NewReader(ret)) // r type is io.ReadCloser + return r, nil +} + +func TestOneShot(t *testing.T) { + log.Infof("Test 'TestOneShot'") + + tests := []struct { + dsn string + expectedErr string + expectedOutput string + expectedLines int + logType string + logLevel log.Level + }{ + { + dsn: "docker://non_exist_docker", + expectedErr: "no docker found, can't run one shot acquisition", + expectedOutput: "", + expectedLines: 0, + logType: "test", + logLevel: log.InfoLevel, + }, + { + dsn: "docker://" + testContainerName, + expectedErr: "", + expectedOutput: "", + expectedLines: 3, + logType: "test", + logLevel: log.InfoLevel, + }, + } + + for _, ts := range tests { + var subLogger *log.Entry + var logger *log.Logger + if ts.expectedOutput != "" { + logger.SetLevel(ts.logLevel) + subLogger = logger.WithFields(log.Fields{ + "type": "docker", + }) + } else { + log.SetLevel(ts.logLevel) + subLogger = log.WithFields(log.Fields{ + "type": "docker", + }) + } + + dockerClient := &DockerSource{} + labels := make(map[string]string) + labels["type"] = ts.logType + + if err := dockerClient.ConfigureByDSN(ts.dsn, labels, subLogger); err != nil { + t.Fatalf("unable to configure dsn '%s': %s", ts.dsn, err) + } + dockerClient.Client = new(mockDockerCli) + out := make(chan types.Event) + actualLines := 0 + if ts.expectedLines != 0 { + go func() { + READLOOP: + for { + select { + case <-out: + actualLines++ + case <-time.After(1 * time.Second): + break READLOOP + } + } + }() + } + tomb := tomb.Tomb{} + err := dockerClient.OneShotAcquisition(out, &tomb) + + if ts.expectedErr == "" && err != nil { + t.Fatalf("Unexpected error : %s", err) + } else if ts.expectedErr != "" && err != nil { + assert.Contains(t, err.Error(), ts.expectedErr) + continue + } else if ts.expectedErr != "" && err == nil { + t.Fatalf("Expected error %s, but got nothing !", ts.expectedErr) + } + // else we do the check before actualLines is incremented ... + time.Sleep(1 * time.Second) + if ts.expectedLines != 0 { + assert.Equal(t, ts.expectedLines, actualLines) + } + } + +} diff --git a/pkg/csconfig/crowdsec_service.go b/pkg/csconfig/crowdsec_service.go index 574c41ffd..b7a7c7b8e 100644 --- a/pkg/csconfig/crowdsec_service.go +++ b/pkg/csconfig/crowdsec_service.go @@ -60,6 +60,12 @@ func (c *Config) LoadCrowdsec() error { return errors.Wrap(err, "while globing acquis_dir") } c.Crowdsec.AcquisitionFiles = append(c.Crowdsec.AcquisitionFiles, files...) + + files, err = filepath.Glob(c.Crowdsec.AcquisitionDirPath + "/*.yml") + if err != nil { + return errors.Wrap(err, "while globing acquis_dir") + } + c.Crowdsec.AcquisitionFiles = append(c.Crowdsec.AcquisitionFiles, files...) } if c.Crowdsec.AcquisitionDirPath == "" && c.Crowdsec.AcquisitionFilePath == "" { log.Warningf("no acquisition_path nor acquisition_dir")