crowdsec/pkg/acquisition/file_reader.go
Thibault "bui" Koechlin dbb420f79e
local api (#482)
Co-authored-by: AlteredCoder
Co-authored-by: erenJag
2020-11-30 10:37:17 +01:00

228 lines
5.9 KiB
Go

package acquisition
import (
"bufio"
"compress/gzip"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/nxadm/tail"
log "github.com/sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus"
tomb "gopkg.in/tomb.v2"
)
type FileSource struct {
Config DataSourceCfg
tails []*tail.Tail
Files []string
}
func (f *FileSource) Configure(Config DataSourceCfg) error {
f.Config = Config
if len(Config.Filename) == 0 && len(Config.Filenames) == 0 {
return fmt.Errorf("no filename or filenames")
}
//let's deal with the array no matter what
if len(Config.Filename) != 0 {
Config.Filenames = append(Config.Filenames, Config.Filename)
}
for _, fexpr := range Config.Filenames {
files, err := filepath.Glob(fexpr)
if err != nil {
return errors.Wrapf(err, "while globbing %s", fexpr)
}
if len(files) == 0 {
log.Warningf("[file datasource] no results for %s", fexpr)
continue
}
for _, file := range files {
/*check that we can read said file*/
if err := unix.Access(file, unix.R_OK); err != nil {
return fmt.Errorf("unable to open %s : %s", file, err)
}
log.Infof("[file datasource] opening file '%s'", file)
if f.Config.Mode == TAIL_MODE {
tail, err := tail.TailFile(file, tail.Config{ReOpen: true, Follow: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}})
if err != nil {
log.Errorf("[file datasource] skipping %s : %v", file, err)
continue
}
f.Files = append(f.Files, file)
f.tails = append(f.tails, tail)
} else if f.Config.Mode == CAT_MODE {
//simply check that the file exists, it will be read differently
if _, err := os.Stat(file); err != nil {
return fmt.Errorf("can't open file %s : %s", file, err)
}
f.Files = append(f.Files, file)
} else {
return fmt.Errorf("unknown mode %s for file acquisition", f.Config.Mode)
}
}
}
if len(f.Files) == 0 {
return fmt.Errorf("no files to read for %+v", Config.Filenames)
}
return nil
}
func (f *FileSource) Mode() string {
return f.Config.Mode
}
func (f *FileSource) StartReading(out chan types.Event, t *tomb.Tomb) error {
if f.Config.Mode == CAT_MODE {
return f.StartCat(out, t)
} else if f.Config.Mode == TAIL_MODE {
return f.StartTail(out, t)
} else {
return fmt.Errorf("unknown mode '%s' for file acquisition", f.Config.Mode)
}
}
/*A tail-mode file reader (tail) */
func (f *FileSource) StartTail(output chan types.Event, AcquisTomb *tomb.Tomb) error {
log.Debugf("starting file tail with %d items", len(f.tails))
for i := 0; i < len(f.tails); i++ {
idx := i
log.Debugf("starting %d", idx)
AcquisTomb.Go(func() error {
defer types.CatchPanic("crowdsec/acquis/tailfile")
return f.TailOneFile(output, AcquisTomb, idx)
})
}
return nil
}
/*A one shot file reader (cat) */
func (f *FileSource) StartCat(output chan types.Event, AcquisTomb *tomb.Tomb) error {
for i := 0; i < len(f.Files); i++ {
idx := i
log.Debugf("starting %d", idx)
AcquisTomb.Go(func() error {
defer types.CatchPanic("crowdsec/acquis/catfile")
return f.CatOneFile(output, AcquisTomb, idx)
})
}
return nil
}
/*A tail-mode file reader (tail) */
func (f *FileSource) TailOneFile(output chan types.Event, AcquisTomb *tomb.Tomb, idx int) error {
file := f.Files[idx]
tail := f.tails[idx]
clog := log.WithFields(log.Fields{
"acquisition file": f.Files[idx],
})
clog.Debugf("starting")
timeout := time.Tick(1 * time.Second)
for {
l := types.Line{}
select {
case <-AcquisTomb.Dying(): //we are being killed by main
clog.Infof("file datasource %s stopping", file)
if err := tail.Stop(); err != nil {
clog.Errorf("error in stop : %s", err)
}
return nil
case <-tail.Tomb.Dying(): //our tailer is dying
clog.Warningf("File reader of %s died", file)
AcquisTomb.Kill(fmt.Errorf("dead reader for %s", file))
return fmt.Errorf("reader for %s is dead", file)
case line := <-tail.Lines:
if line == nil {
clog.Debugf("Nil line")
return fmt.Errorf("tail for %s is empty", file)
}
if line.Err != nil {
log.Warningf("fetch error : %v", line.Err)
return line.Err
}
if line.Text == "" { //skip empty lines
continue
}
ReaderHits.With(prometheus.Labels{"source": file}).Inc()
l.Raw = line.Text
l.Labels = f.Config.Labels
l.Time = line.Time
l.Src = file
l.Process = true
//we're tailing, it must be real time logs
log.Debugf("pushing %+v", l)
output <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}
case <-timeout:
//time out, shall we do stuff ?
clog.Debugf("timeout")
}
}
}
/*A one shot file reader (cat) */
func (f *FileSource) CatOneFile(output chan types.Event, AcquisTomb *tomb.Tomb, idx int) error {
var scanner *bufio.Scanner
log.Infof("reading %s at once", f.Files[idx])
file := f.Files[idx]
clog := log.WithFields(log.Fields{
"file": file,
})
fd, err := os.Open(file)
defer fd.Close()
if err != nil {
clog.Errorf("Failed opening file: %s", err)
return errors.Wrapf(err, "failed opening %s", f.Files[idx])
}
if strings.HasSuffix(file, ".gz") {
gz, err := gzip.NewReader(fd)
if err != nil {
clog.Errorf("Failed to read gz file: %s", err)
return errors.Wrapf(err, "failed to read gz %s", f.Files[idx])
}
defer gz.Close()
scanner = bufio.NewScanner(gz)
} else {
scanner = bufio.NewScanner(fd)
}
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
log.Tracef("line %s", scanner.Text())
l := types.Line{}
l.Raw = scanner.Text()
l.Time = time.Now()
l.Src = file
l.Labels = f.Config.Labels
l.Process = true
ReaderHits.With(prometheus.Labels{"source": file}).Inc()
//we're reading logs at once, it must be time-machine buckets
output <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.TIMEMACHINE}
}
AcquisTomb.Kill(nil)
return nil
}