Thibault "bui" Koechlin ce6a61df1c
Refactor Acquisition Interface (#773)
* Add new acquisition interface + new modules (cloudwatch, syslog)

Co-authored-by: Sebastien Blot <>
2021-06-11 09:53:53 +02:00

418 lines
11 KiB

package fileacquisition
import (
leaky ""
log ""
var linesRead = prometheus.NewCounterVec(
Name: "cs_filesource_hits_total",
Help: "Total lines that were read.",
type FileConfiguration struct {
Filenames []string
Filename string
ForceInotify bool `yaml:"force_inotify"`
configuration.DataSourceCommonCfg `yaml:",inline"`
type FileSource struct {
config FileConfiguration
watcher *fsnotify.Watcher
watchedDirectories map[string]bool
tails map[string]bool
logger *log.Entry
files []string
func (f *FileSource) Configure(Config []byte, logger *log.Entry) error {
fileConfig := FileConfiguration{}
f.logger = logger
f.watchedDirectories = make(map[string]bool)
f.tails = make(map[string]bool)
err := yaml.UnmarshalStrict(Config, &fileConfig)
if err != nil {
return errors.Wrap(err, "Cannot parse FileAcquisition configuration")
f.logger.Tracef("FileAcquisition configuration: %+v", fileConfig)
if len(fileConfig.Filename) != 0 {
fileConfig.Filenames = append(fileConfig.Filenames, fileConfig.Filename)
if len(fileConfig.Filenames) == 0 {
return fmt.Errorf("no filename or filenames configuration provided")
f.config = fileConfig
if f.config.Mode == "" {
f.config.Mode = configuration.TAIL_MODE
if f.config.Mode != configuration.CAT_MODE && f.config.Mode != configuration.TAIL_MODE {
return fmt.Errorf("unsupported mode %s for file source", f.config.Mode)
f.watcher, err = fsnotify.NewWatcher()
if err != nil {
return errors.Wrapf(err, "Could not create fsnotify watcher")
f.logger.Tracef("Actual FileAcquisition Configuration %+v", f.config)
for _, pattern := range f.config.Filenames {
if f.config.ForceInotify {
directory := path.Dir(pattern)
f.logger.Infof("Force add watch on %s", directory)
if !f.watchedDirectories[directory] {
err = f.watcher.Add(directory)
if err != nil {
f.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
f.watchedDirectories[directory] = true
files, err := filepath.Glob(pattern)
if err != nil {
return errors.Wrap(err, "Glob failure")
if len(files) == 0 {
f.logger.Warnf("No matching files for pattern %s", pattern)
for _, file := range files {
if files[0] != pattern && f.config.Mode == configuration.TAIL_MODE { //we have a glob pattern
directory := path.Dir(file)
if !f.watchedDirectories[directory] {
err = f.watcher.Add(directory)
if err != nil {
f.logger.Errorf("Could not create watch on directory %s : %s", directory, err)
f.watchedDirectories[directory] = true
f.logger.Infof("Adding file %s to datasources", file)
f.files = append(f.files, file)
return nil
func (f *FileSource) ConfigureByDSN(dsn string, labelType string, logger *log.Entry) error {
if !strings.HasPrefix(dsn, "file://") {
return fmt.Errorf("invalid DSN %s for file source, must start with file://", dsn)
f.logger = logger
dsn = strings.TrimPrefix(dsn, "file://")
args := strings.Split(dsn, "?")
if len(args[0]) == 0 {
return fmt.Errorf("empty file:// DSN")
if len(args) == 2 && len(args[1]) != 0 {
params, err := url.ParseQuery(args[1])
if err != nil {
return fmt.Errorf("could not parse file args : %s", err)
for key, value := range params {
if key != "log_level" {
return fmt.Errorf("unsupported key %s in file DSN", key)
if len(value) != 1 {
return fmt.Errorf("expected zero or one value for 'log_level'")
lvl, err := log.ParseLevel(value[0])
if err != nil {
return errors.Wrapf(err, "unknown level %s", value[0])
f.config = FileConfiguration{}
f.config.Labels = map[string]string{"type": labelType}
f.config.Mode = configuration.CAT_MODE
f.logger.Debugf("Will try pattern %s", args[0])
files, err := filepath.Glob(args[0])
if err != nil {
return errors.Wrap(err, "Glob failure")
if len(files) == 0 {
return fmt.Errorf("no matching files for pattern %s", args[0])
if len(files) > 1 {
f.logger.Infof("Will read %d files", len(files))
for _, file := range files {
f.logger.Infof("Adding file %s to filelist", file)
f.files = append(f.files, file)
return nil
func (f *FileSource) GetMode() string {
return f.config.Mode
//SupportedModes returns the supported modes by the acquisition module
func (f *FileSource) SupportedModes() []string {
return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
//OneShotAcquisition reads a set of file and returns when done
func (f *FileSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
f.logger.Debug("In oneshot")
for _, file := range f.files {
fi, err := os.Stat(file)
if err != nil {
return fmt.Errorf("could not stat file %s : %w", file, err)
if fi.IsDir() {
f.logger.Warnf("%s is a directory, ignoring it.", file)
f.logger.Infof("reading %s at once", file)
err = f.readFile(file, out, t)
if err != nil {
return err
return nil
func (f *FileSource) GetMetrics() []prometheus.Collector {
return []prometheus.Collector{linesRead}
func (f *FileSource) GetAggregMetrics() []prometheus.Collector {
return []prometheus.Collector{linesRead}
func (f *FileSource) GetName() string {
return "file"
func (f *FileSource) CanRun() error {
return nil
func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
f.logger.Debug("Starting live acquisition")
t.Go(func() error {
return f.monitorNewFiles(out, t)
for _, file := range f.files {
err := unix.Access(file, unix.R_OK)
if err != nil {
f.logger.Errorf("unable to read %s : %s", file, err)
fi, err := os.Stat(file)
if err != nil {
return fmt.Errorf("could not stat file %s : %w", file, err)
if fi.IsDir() {
f.logger.Warnf("%s is a directory, ignoring it.", file)
tail, err := tail.TailFile(file, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekEnd}})
if err != nil {
f.logger.Errorf("Could not start tailing file %s : %s", file, err)
f.tails[file] = true
t.Go(func() error {
defer types.CatchPanic("crowdsec/acquis/file/live/fsnotify")
return f.tailFile(out, t, tail)
return nil
func (f *FileSource) Dump() interface{} {
return f
func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
logger := f.logger.WithField("goroutine", "inotify")
for {
select {
case event, ok := <-f.watcher.Events:
if !ok {
return nil
if event.Op&fsnotify.Create == fsnotify.Create {
fi, err := os.Stat(event.Name)
if err != nil {
logger.Errorf("Could not stat() new file %s, ignoring it : %s", event.Name, err)
if fi.IsDir() {
logger.Debugf("Detected new file %s", event.Name)
matched := false
for _, pattern := range f.config.Filenames {
logger.Debugf("Matching %s with %s", pattern, event.Name)
matched, err = path.Match(pattern, event.Name)
if err != nil {
logger.Errorf("Could not match pattern : %s", err)
if matched {
if !matched {
if f.tails[event.Name] {
//we already have a tail on it, do not start a new one
logger.Debugf("Already tailing file %s, not creating a new tail", event.Name)
err = unix.Access(event.Name, unix.R_OK)
if err != nil {
logger.Errorf("unable to read %s : %s", event.Name, err)
//Slightly different parameters for Location, as we want to read the first lines of the newly created file
tail, err := tail.TailFile(event.Name, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekStart}})
if err != nil {
logger.Errorf("Could not start tailing file %s : %s", event.Name, err)
f.tails[event.Name] = true
t.Go(func() error {
defer types.CatchPanic("crowdsec/acquis/tailfile")
return f.tailFile(out, t, tail)
case err, ok := <-f.watcher.Errors:
if !ok {
return nil
logger.Errorf("Error while monitoring folder: %s", err)
case <-t.Dying():
err := f.watcher.Close()
if err != nil {
return errors.Wrapf(err, "could not remove all inotify watches")
return nil
func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tail) error {
logger := f.logger.WithField("tail", tail.Filename)
logger.Debugf("-> Starting tail of %s", tail.Filename)
for {
l := types.Line{}
select {
case <-t.Dying():
logger.Infof("File datasource %s stopping", tail.Filename)
if err := tail.Stop(); err != nil {
f.logger.Errorf("error in stop : %s", err)
return err
return nil
case <-tail.Tomb.Dying(): //our tailer is dying
logger.Warningf("File reader of %s died", tail.Filename)
t.Kill(fmt.Errorf("dead reader for %s", tail.Filename))
return fmt.Errorf("reader for %s is dead", tail.Filename)
case line := <-tail.Lines:
if line == nil {
logger.Debugf("Nil line")
return fmt.Errorf("tail for %s is empty", tail.Filename)
if line.Err != nil {
logger.Warningf("fetch error : %v", line.Err)
return line.Err
if line.Text == "" { //skip empty lines
linesRead.With(prometheus.Labels{"source": tail.Filename}).Inc()
l.Raw = line.Text
l.Labels = f.config.Labels
l.Time = line.Time
l.Src = tail.Filename
l.Process = true
l.Module = f.GetName()
//we're tailing, it must be real time logs
logger.Debugf("pushing %+v", l)
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}
func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tomb) error {
var scanner *bufio.Scanner
logger := f.logger.WithField("oneshot", filename)
fd, err := os.Open(filename)
if err != nil {
return errors.Wrapf(err, "failed opening %s", filename)
defer fd.Close()
if strings.HasSuffix(filename, ".gz") {
gz, err := gzip.NewReader(fd)
if err != nil {
logger.Errorf("Failed to read gz file: %s", err)
return errors.Wrapf(err, "failed to read gz %s", filename)
defer gz.Close()
scanner = bufio.NewScanner(gz)
} else {
scanner = bufio.NewScanner(fd)
for scanner.Scan() {
logger.Debugf("line %s", scanner.Text())
l := types.Line{}
l.Raw = scanner.Text()
l.Time = time.Now()
l.Src = filename
l.Labels = f.config.Labels
l.Process = true
l.Module = f.GetName()
linesRead.With(prometheus.Labels{"source": filename}).Inc()
//we're reading logs at once, it must be time-machine buckets
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.TIMEMACHINE}
return nil