file acquis: add mutex to protect access to the internal tail map (#2878)
This commit is contained in:
parent
6c5e8afde9
commit
44ec3b9e01
|
@ -11,6 +11,7 @@ import (
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/fsnotify/fsnotify"
|
"github.com/fsnotify/fsnotify"
|
||||||
|
@ -52,6 +53,7 @@ type FileSource struct {
|
||||||
logger *log.Entry
|
logger *log.Entry
|
||||||
files []string
|
files []string
|
||||||
exclude_regexps []*regexp.Regexp
|
exclude_regexps []*regexp.Regexp
|
||||||
|
tailMapMutex *sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileSource) GetUuid() string {
|
func (f *FileSource) GetUuid() string {
|
||||||
|
@ -105,6 +107,7 @@ func (f *FileSource) Configure(yamlConfig []byte, logger *log.Entry) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
f.watchedDirectories = make(map[string]bool)
|
f.watchedDirectories = make(map[string]bool)
|
||||||
|
f.tailMapMutex = &sync.RWMutex{}
|
||||||
f.tails = make(map[string]bool)
|
f.tails = make(map[string]bool)
|
||||||
|
|
||||||
f.watcher, err = fsnotify.NewWatcher()
|
f.watcher, err = fsnotify.NewWatcher()
|
||||||
|
@ -350,7 +353,9 @@ func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er
|
||||||
f.logger.Errorf("Could not start tailing file %s : %s", file, err)
|
f.logger.Errorf("Could not start tailing file %s : %s", file, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
f.tailMapMutex.Lock()
|
||||||
f.tails[file] = true
|
f.tails[file] = true
|
||||||
|
f.tailMapMutex.Unlock()
|
||||||
t.Go(func() error {
|
t.Go(func() error {
|
||||||
defer trace.CatchPanic("crowdsec/acquis/file/live/fsnotify")
|
defer trace.CatchPanic("crowdsec/acquis/file/live/fsnotify")
|
||||||
return f.tailFile(out, t, tail)
|
return f.tailFile(out, t, tail)
|
||||||
|
@ -412,11 +417,14 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
f.tailMapMutex.RLock()
|
||||||
if f.tails[event.Name] {
|
if f.tails[event.Name] {
|
||||||
|
f.tailMapMutex.RUnlock()
|
||||||
//we already have a tail on it, do not start a new one
|
//we already have a tail on it, do not start a new one
|
||||||
logger.Debugf("Already tailing file %s, not creating a new tail", event.Name)
|
logger.Debugf("Already tailing file %s, not creating a new tail", event.Name)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
f.tailMapMutex.RUnlock()
|
||||||
//cf. https://github.com/crowdsecurity/crowdsec/issues/1168
|
//cf. https://github.com/crowdsecurity/crowdsec/issues/1168
|
||||||
//do not rely on stat, reclose file immediately as it's opened by Tail
|
//do not rely on stat, reclose file immediately as it's opened by Tail
|
||||||
fd, err := os.Open(event.Name)
|
fd, err := os.Open(event.Name)
|
||||||
|
@ -453,7 +461,9 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
|
||||||
logger.Errorf("Could not start tailing file %s : %s", event.Name, err)
|
logger.Errorf("Could not start tailing file %s : %s", event.Name, err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
f.tailMapMutex.Lock()
|
||||||
f.tails[event.Name] = true
|
f.tails[event.Name] = true
|
||||||
|
f.tailMapMutex.Unlock()
|
||||||
t.Go(func() error {
|
t.Go(func() error {
|
||||||
defer trace.CatchPanic("crowdsec/acquis/tailfile")
|
defer trace.CatchPanic("crowdsec/acquis/tailfile")
|
||||||
return f.tailFile(out, t, tail)
|
return f.tailFile(out, t, tail)
|
||||||
|
|
Loading…
Reference in a new issue