From 44ec3b9e01ca992b7806c2359f9a9d50636698ea Mon Sep 17 00:00:00 2001 From: blotus Date: Fri, 8 Mar 2024 13:56:59 +0100 Subject: [PATCH] file acquis: add mutex to protect access to the internal tail map (#2878) --- pkg/acquisition/modules/file/file.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/acquisition/modules/file/file.go b/pkg/acquisition/modules/file/file.go index 9ab418a84..a0c226574 100644 --- a/pkg/acquisition/modules/file/file.go +++ b/pkg/acquisition/modules/file/file.go @@ -11,6 +11,7 @@ import ( "regexp" "strconv" "strings" + "sync" "time" "github.com/fsnotify/fsnotify" @@ -52,6 +53,7 @@ type FileSource struct { logger *log.Entry files []string exclude_regexps []*regexp.Regexp + tailMapMutex *sync.RWMutex } func (f *FileSource) GetUuid() string { @@ -105,6 +107,7 @@ func (f *FileSource) Configure(yamlConfig []byte, logger *log.Entry) error { } f.watchedDirectories = make(map[string]bool) + f.tailMapMutex = &sync.RWMutex{} f.tails = make(map[string]bool) f.watcher, err = fsnotify.NewWatcher() @@ -350,7 +353,9 @@ func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er f.logger.Errorf("Could not start tailing file %s : %s", file, err) continue } + f.tailMapMutex.Lock() f.tails[file] = true + f.tailMapMutex.Unlock() t.Go(func() error { defer trace.CatchPanic("crowdsec/acquis/file/live/fsnotify") return f.tailFile(out, t, tail) @@ -412,11 +417,14 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error { continue } + f.tailMapMutex.RLock() if f.tails[event.Name] { + f.tailMapMutex.RUnlock() //we already have a tail on it, do not start a new one logger.Debugf("Already tailing file %s, not creating a new tail", event.Name) break } + f.tailMapMutex.RUnlock() //cf. https://github.com/crowdsecurity/crowdsec/issues/1168 //do not rely on stat, reclose file immediately as it's opened by Tail fd, err := os.Open(event.Name) @@ -453,7 +461,9 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error { logger.Errorf("Could not start tailing file %s : %s", event.Name, err) break } + f.tailMapMutex.Lock() f.tails[event.Name] = true + f.tailMapMutex.Unlock() t.Go(func() error { defer trace.CatchPanic("crowdsec/acquis/tailfile") return f.tailFile(out, t, tail)