From 12db21673c98daef7a708b9c02233ac18982ec0a Mon Sep 17 00:00:00 2001 From: Sebastien Blot Date: Tue, 21 Nov 2023 10:37:44 +0100 Subject: [PATCH] allow to spawn more monitoring routines --- pkg/acquisition/modules/file/file.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/pkg/acquisition/modules/file/file.go b/pkg/acquisition/modules/file/file.go index 53bc86e7e..cfcaf5c23 100644 --- a/pkg/acquisition/modules/file/file.go +++ b/pkg/acquisition/modules/file/file.go @@ -14,6 +14,7 @@ import ( "time" "github.com/fsnotify/fsnotify" + "github.com/gofrs/uuid" "github.com/nxadm/tail" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -41,6 +42,7 @@ type FileConfiguration struct { ForceInotify bool `yaml:"force_inotify"` MaxBufferSize int `yaml:"max_buffer_size"` PollWithoutInotify bool `yaml:"poll_without_inotify"` + MonitorRoutines int `yaml:"monitor_routines"` configuration.DataSourceCommonCfg `yaml:",inline"` } @@ -93,6 +95,10 @@ func (f *FileSource) UnmarshalConfig(yamlConfig []byte) error { f.exclude_regexps = append(f.exclude_regexps, re) } + if f.config.MonitorRoutines == 0 { + f.config.MonitorRoutines = 1 + } + return nil } @@ -292,9 +298,11 @@ func (f *FileSource) CanRun() error { func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error { f.logger.Debug("Starting live acquisition") - t.Go(func() error { - return f.monitorNewFiles(out, t) - }) + for i := 0; i < f.config.MonitorRoutines; i++ { + t.Go(func() error { + return f.monitorNewFiles(out, t) + }) + } for _, file := range f.files { //before opening the file, check if we need to specifically avoid it. (XXX) skip := false @@ -349,7 +357,7 @@ func (f *FileSource) Dump() interface{} { } func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error { - logger := f.logger.WithField("goroutine", "inotify") + logger := f.logger.WithField("goroutine", "inotify").WithField("routine_id", uuid.Must(uuid.NewV4()).String()) for { select { case event, ok := <-f.watcher.Events: @@ -424,6 +432,8 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error { defer trace.CatchPanic("crowdsec/acquis/tailfile") return f.tailFile(out, t, tail) }) + } else { + //logger.Infof("Event %s on %s", event.Op.String(), event.Name) } case err, ok := <-f.watcher.Errors: if !ok {