diff --git a/pkg/acquisition/modules/file/file.go b/pkg/acquisition/modules/file/file.go index efc897152..ee3c844a5 100644 --- a/pkg/acquisition/modules/file/file.go +++ b/pkg/acquisition/modules/file/file.go @@ -3,6 +3,7 @@ package fileacquisition import ( "bufio" "compress/gzip" + "errors" "fmt" "io" "net/url" @@ -16,7 +17,6 @@ import ( "github.com/fsnotify/fsnotify" "github.com/nxadm/tail" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "gopkg.in/tomb.v2" @@ -63,6 +63,7 @@ func (f *FileSource) GetUuid() string { func (f *FileSource) UnmarshalConfig(yamlConfig []byte) error { f.config = FileConfiguration{} + err := yaml.UnmarshalStrict(yamlConfig, &f.config) if err != nil { return fmt.Errorf("cannot parse FileAcquisition configuration: %w", err) @@ -77,7 +78,7 @@ func (f *FileSource) UnmarshalConfig(yamlConfig []byte) error { } if len(f.config.Filenames) == 0 { - return fmt.Errorf("no filename or filenames configuration provided") + return errors.New("no filename or filenames configuration provided") } if f.config.Mode == "" { @@ -93,6 +94,7 @@ func (f *FileSource) UnmarshalConfig(yamlConfig []byte) error { if err != nil { return fmt.Errorf("could not compile regexp %s: %w", exclude, err) } + f.exclude_regexps = append(f.exclude_regexps, re) } @@ -123,56 +125,68 @@ func (f *FileSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLeve if f.config.ForceInotify { directory := filepath.Dir(pattern) f.logger.Infof("Force add watch on %s", directory) + if !f.watchedDirectories[directory] { err = f.watcher.Add(directory) if err != nil { f.logger.Errorf("Could not create watch on directory %s : %s", directory, err) continue } + f.watchedDirectories[directory] = true } } + files, err := filepath.Glob(pattern) if err != nil { return fmt.Errorf("glob failure: %w", err) } + if len(files) == 0 { f.logger.Warnf("No matching files for pattern %s", pattern) continue } - for _, file := range files { - //check if file is excluded + for _, file := range files { + // check if file is excluded excluded := false + for _, pattern := range f.exclude_regexps { if pattern.MatchString(file) { excluded = true + f.logger.Infof("Skipping file %s as it matches exclude pattern %s", file, pattern) + break } } + if excluded { continue } - if files[0] != pattern && f.config.Mode == configuration.TAIL_MODE { //we have a glob pattern + + if files[0] != pattern && f.config.Mode == configuration.TAIL_MODE { // we have a glob pattern directory := filepath.Dir(file) f.logger.Debugf("Will add watch to directory: %s", directory) - if !f.watchedDirectories[directory] { + if !f.watchedDirectories[directory] { err = f.watcher.Add(directory) if err != nil { f.logger.Errorf("Could not create watch on directory %s : %s", directory, err) continue } + f.watchedDirectories[directory] = true } else { f.logger.Debugf("Watch for directory %s already exists", directory) } } + f.logger.Infof("Adding file %s to datasources", file) f.files = append(f.files, file) } } + return nil } @@ -189,7 +203,7 @@ func (f *FileSource) ConfigureByDSN(dsn string, labels map[string]string, logger args := strings.Split(dsn, "?") if len(args[0]) == 0 { - return fmt.Errorf("empty file:// DSN") + return errors.New("empty file:// DSN") } if len(args) == 2 && len(args[1]) != 0 { @@ -197,25 +211,30 @@ func (f *FileSource) ConfigureByDSN(dsn string, labels map[string]string, logger if err != nil { return fmt.Errorf("could not parse file args: %w", err) } + for key, value := range params { switch key { case "log_level": if len(value) != 1 { return errors.New("expected zero or one value for 'log_level'") } + lvl, err := log.ParseLevel(value[0]) if err != nil { return fmt.Errorf("unknown level %s: %w", value[0], err) } + f.logger.Logger.SetLevel(lvl) case "max_buffer_size": if len(value) != 1 { return errors.New("expected zero or one value for 'max_buffer_size'") } + maxBufferSize, err := strconv.Atoi(value[0]) if err != nil { return fmt.Errorf("could not parse max_buffer_size %s: %w", value[0], err) } + f.config.MaxBufferSize = maxBufferSize default: return fmt.Errorf("unknown parameter %s", key) @@ -228,6 +247,7 @@ func (f *FileSource) ConfigureByDSN(dsn string, labels map[string]string, logger f.config.UniqueId = uuid f.logger.Debugf("Will try pattern %s", args[0]) + files, err := filepath.Glob(args[0]) if err != nil { return fmt.Errorf("glob failure: %w", err) @@ -245,6 +265,7 @@ func (f *FileSource) ConfigureByDSN(dsn string, labels map[string]string, logger f.logger.Infof("Adding file %s to filelist", file) f.files = append(f.files, file) } + return nil } @@ -260,22 +281,26 @@ func (f *FileSource) SupportedModes() []string { // OneShotAcquisition reads a set of file and returns when done func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { f.logger.Debug("In oneshot") + for _, file := range f.files { fi, err := os.Stat(file) if err != nil { return fmt.Errorf("could not stat file %s : %w", file, err) } + if fi.IsDir() { f.logger.Warnf("%s is a directory, ignoring it.", file) continue } + f.logger.Infof("reading %s at once", file) + err = f.readFile(file, out, t) if err != nil { return err } - } + return nil } @@ -300,27 +325,33 @@ func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er t.Go(func() error { return f.monitorNewFiles(out, t) }) + for _, file := range f.files { - //before opening the file, check if we need to specifically avoid it. (XXX) + // before opening the file, check if we need to specifically avoid it. (XXX) skip := false + for _, pattern := range f.exclude_regexps { if pattern.MatchString(file) { f.logger.Infof("file %s matches exclusion pattern %s, skipping", file, pattern.String()) + skip = true + break } } + if skip { continue } - //cf. https://github.com/crowdsecurity/crowdsec/issues/1168 - //do not rely on stat, reclose file immediately as it's opened by Tail + // cf. https://github.com/crowdsecurity/crowdsec/issues/1168 + // do not rely on stat, reclose file immediately as it's opened by Tail fd, err := os.Open(file) if err != nil { f.logger.Errorf("unable to read %s : %s", file, err) continue } + if err := fd.Close(); err != nil { f.logger.Errorf("unable to close %s : %s", file, err) continue @@ -330,6 +361,7 @@ func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er if err != nil { return fmt.Errorf("could not stat file %s : %w", file, err) } + if fi.IsDir() { f.logger.Warnf("%s is a directory, ignoring it.", file) continue @@ -343,9 +375,12 @@ func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er if err != nil { f.logger.Warningf("Could not get fs type for %s : %s", file, err) } + f.logger.Debugf("fs for %s is network: %t (%s)", file, networkFS, fsType) + if networkFS { f.logger.Warnf("Disabling inotify poll on %s as it is on a network share. You can manually set poll_without_inotify to true to make this message disappear, or to false to enforce inotify poll", file) + inotifyPoll = false } } @@ -355,6 +390,7 @@ func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er f.logger.Errorf("Could not start tailing file %s : %s", file, err) continue } + f.tailMapMutex.Lock() f.tails[file] = true f.tailMapMutex.Unlock() @@ -363,6 +399,7 @@ func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er return f.tailFile(out, t, tail) }) } + return nil } @@ -372,6 +409,7 @@ func (f *FileSource) Dump() interface{} { func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error { logger := f.logger.WithField("goroutine", "inotify") + for { select { case event, ok := <-f.watcher.Events: @@ -385,36 +423,47 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error { logger.Errorf("Could not stat() new file %s, ignoring it : %s", event.Name, err) continue } + if fi.IsDir() { continue } + logger.Debugf("Detected new file %s", event.Name) + matched := false + for _, pattern := range f.config.Filenames { logger.Debugf("Matching %s with %s", pattern, event.Name) + matched, err = filepath.Match(pattern, event.Name) if err != nil { logger.Errorf("Could not match pattern : %s", err) continue } + if matched { logger.Debugf("Matched %s with %s", pattern, event.Name) break } } + if !matched { continue } - //before opening the file, check if we need to specifically avoid it. (XXX) + // before opening the file, check if we need to specifically avoid it. (XXX) skip := false + for _, pattern := range f.exclude_regexps { if pattern.MatchString(event.Name) { f.logger.Infof("file %s matches exclusion pattern %s, skipping", event.Name, pattern.String()) + skip = true + break } } + if skip { continue } @@ -422,13 +471,14 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error { f.tailMapMutex.RLock() if f.tails[event.Name] { f.tailMapMutex.RUnlock() - //we already have a tail on it, do not start a new one + // we already have a tail on it, do not start a new one logger.Debugf("Already tailing file %s, not creating a new tail", event.Name) + break } f.tailMapMutex.RUnlock() - //cf. https://github.com/crowdsecurity/crowdsec/issues/1168 - //do not rely on stat, reclose file immediately as it's opened by Tail + // cf. https://github.com/crowdsecurity/crowdsec/issues/1168 + // do not rely on stat, reclose file immediately as it's opened by Tail fd, err := os.Open(event.Name) if err != nil { f.logger.Errorf("unable to read %s : %s", event.Name, err) @@ -450,19 +500,22 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error { if err != nil { f.logger.Warningf("Could not get fs type for %s : %s", event.Name, err) } + f.logger.Debugf("fs for %s is network: %t (%s)", event.Name, networkFS, fsType) + if networkFS { inotifyPoll = false } } } - //Slightly different parameters for Location, as we want to read the first lines of the newly created file + // Slightly different parameters for Location, as we want to read the first lines of the newly created file tail, err := tail.TailFile(event.Name, tail.Config{ReOpen: true, Follow: true, Poll: inotifyPoll, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekStart}}) if err != nil { logger.Errorf("Could not start tailing file %s : %s", event.Name, err) break } + f.tailMapMutex.Lock() f.tails[event.Name] = true f.tailMapMutex.Unlock() @@ -475,12 +528,14 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error { if !ok { return nil } + logger.Errorf("Error while monitoring folder: %s", err) case <-t.Dying(): err := f.watcher.Close() if err != nil { return fmt.Errorf("could not remove all inotify watches: %w", err) } + return nil } } @@ -489,39 +544,47 @@ func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error { func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tail) error { logger := f.logger.WithField("tail", tail.Filename) logger.Debugf("-> Starting tail of %s", tail.Filename) + for { select { case <-t.Dying(): logger.Infof("File datasource %s stopping", tail.Filename) + if err := tail.Stop(); err != nil { f.logger.Errorf("error in stop : %s", err) return err } + return nil - case <-tail.Dying(): //our tailer is dying + case <-tail.Dying(): // our tailer is dying err := tail.Err() errMsg := fmt.Sprintf("file reader of %s died", tail.Filename) if err != nil { errMsg = fmt.Sprintf(errMsg+" : %s", err) } + logger.Warningf(errMsg) - t.Kill(fmt.Errorf(errMsg)) - return fmt.Errorf(errMsg) + + return nil case line := <-tail.Lines: if line == nil { logger.Warningf("tail for %s is empty", tail.Filename) continue } + if line.Err != nil { logger.Warningf("fetch error : %v", line.Err) return line.Err } - if line.Text == "" { //skip empty lines + + if line.Text == "" { // skip empty lines continue } + if f.metricsLevel != configuration.METRICS_NONE { linesRead.With(prometheus.Labels{"source": tail.Filename}).Inc() } + src := tail.Filename if f.metricsLevel == configuration.METRICS_AGGREGATE { src = filepath.Base(tail.Filename) @@ -535,7 +598,7 @@ func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tai Process: true, Module: f.GetName(), } - //we're tailing, it must be real time logs + // we're tailing, it must be real time logs logger.Debugf("pushing %+v", l) expectMode := types.LIVE @@ -549,12 +612,14 @@ func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tai func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tomb) error { var scanner *bufio.Scanner + logger := f.logger.WithField("oneshot", filename) fd, err := os.Open(filename) if err != nil { return fmt.Errorf("failed opening %s: %w", filename, err) } + defer fd.Close() if strings.HasSuffix(filename, ".gz") { @@ -563,17 +628,20 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom logger.Errorf("Failed to read gz file: %s", err) return fmt.Errorf("failed to read gz %s: %w", filename, err) } + defer gz.Close() scanner = bufio.NewScanner(gz) - } else { scanner = bufio.NewScanner(fd) } + scanner.Split(bufio.ScanLines) + if f.config.MaxBufferSize > 0 { buf := make([]byte, 0, 64*1024) scanner.Buffer(buf, f.config.MaxBufferSize) } + for scanner.Scan() { select { case <-t.Dying(): @@ -583,6 +651,7 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom if scanner.Text() == "" { continue } + l := types.Line{ Raw: scanner.Text(), Time: time.Now().UTC(), @@ -594,15 +663,19 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom logger.Debugf("line %s", l.Raw) linesRead.With(prometheus.Labels{"source": filename}).Inc() - //we're reading logs at once, it must be time-machine buckets + // we're reading logs at once, it must be time-machine buckets out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE} } } + if err := scanner.Err(); err != nil { logger.Errorf("Error while reading file: %s", err) t.Kill(err) + return err } + t.Kill(nil) + return nil }