allow to spawn more monitoring routines

This commit is contained in:
Sebastien Blot 2023-11-21 10:37:44 +01:00
parent 5b17644248
commit 12db21673c
No known key found for this signature in database
GPG key ID: DFC2902F40449F6A

View file

@ -14,6 +14,7 @@ import (
"time" "time"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
"github.com/gofrs/uuid"
"github.com/nxadm/tail" "github.com/nxadm/tail"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -41,6 +42,7 @@ type FileConfiguration struct {
ForceInotify bool `yaml:"force_inotify"` ForceInotify bool `yaml:"force_inotify"`
MaxBufferSize int `yaml:"max_buffer_size"` MaxBufferSize int `yaml:"max_buffer_size"`
PollWithoutInotify bool `yaml:"poll_without_inotify"` PollWithoutInotify bool `yaml:"poll_without_inotify"`
MonitorRoutines int `yaml:"monitor_routines"`
configuration.DataSourceCommonCfg `yaml:",inline"` configuration.DataSourceCommonCfg `yaml:",inline"`
} }
@ -93,6 +95,10 @@ func (f *FileSource) UnmarshalConfig(yamlConfig []byte) error {
f.exclude_regexps = append(f.exclude_regexps, re) f.exclude_regexps = append(f.exclude_regexps, re)
} }
if f.config.MonitorRoutines == 0 {
f.config.MonitorRoutines = 1
}
return nil return nil
} }
@ -292,9 +298,11 @@ func (f *FileSource) CanRun() error {
func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error { func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
f.logger.Debug("Starting live acquisition") f.logger.Debug("Starting live acquisition")
t.Go(func() error { for i := 0; i < f.config.MonitorRoutines; i++ {
return f.monitorNewFiles(out, t) t.Go(func() error {
}) return f.monitorNewFiles(out, t)
})
}
for _, file := range f.files { for _, file := range f.files {
//before opening the file, check if we need to specifically avoid it. (XXX) //before opening the file, check if we need to specifically avoid it. (XXX)
skip := false skip := false
@ -349,7 +357,7 @@ func (f *FileSource) Dump() interface{} {
} }
func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error { func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
logger := f.logger.WithField("goroutine", "inotify") logger := f.logger.WithField("goroutine", "inotify").WithField("routine_id", uuid.Must(uuid.NewV4()).String())
for { for {
select { select {
case event, ok := <-f.watcher.Events: case event, ok := <-f.watcher.Events:
@ -424,6 +432,8 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
defer trace.CatchPanic("crowdsec/acquis/tailfile") defer trace.CatchPanic("crowdsec/acquis/tailfile")
return f.tailFile(out, t, tail) return f.tailFile(out, t, tail)
}) })
} else {
//logger.Infof("Event %s on %s", event.Op.String(), event.Name)
} }
case err, ok := <-f.watcher.Errors: case err, ok := <-f.watcher.Errors:
if !ok { if !ok {