diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index 61b450093..b1808e446 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -311,21 +311,26 @@ func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) er scanner = bufio.NewScanner(reader) } for scanner.Scan() { - line := scanner.Text() - if line == "" { - continue + select { + case <-t.Dying(): + d.logger.Infof("Shutting down reader for container %s", containerConfig.Name) + default: + line := scanner.Text() + if line == "" { + continue + } + l := types.Line{} + l.Raw = line + l.Labels = d.Config.Labels + l.Time = time.Now().UTC() + l.Src = containerConfig.Name + l.Process = true + l.Module = d.GetName() + linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc() + evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE} + out <- evt + d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw) } - l := types.Line{} - l.Raw = line - l.Labels = d.Config.Labels - l.Time = time.Now().UTC() - l.Src = containerConfig.Name - l.Process = true - l.Module = d.GetName() - linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc() - evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE} - out <- evt - d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw) } err = scanner.Err() if err != nil { diff --git a/pkg/acquisition/modules/file/file.go b/pkg/acquisition/modules/file/file.go index 56c51f3cb..efcf7f3f5 100644 --- a/pkg/acquisition/modules/file/file.go +++ b/pkg/acquisition/modules/file/file.go @@ -514,22 +514,28 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom scanner.Buffer(buf, f.config.MaxBufferSize) } for scanner.Scan() { - if scanner.Text() == "" { - continue - } - l := types.Line{ - Raw: scanner.Text(), - Time: time.Now().UTC(), - Src: filename, - Labels: f.config.Labels, - Process: true, - Module: f.GetName(), - } - logger.Debugf("line %s", l.Raw) - linesRead.With(prometheus.Labels{"source": filename}).Inc() + select { + case <-t.Dying(): + logger.Infof("File datasource %s stopping", filename) + return nil + default: + if scanner.Text() == "" { + continue + } + l := types.Line{ + Raw: scanner.Text(), + Time: time.Now().UTC(), + Src: filename, + Labels: f.config.Labels, + Process: true, + Module: f.GetName(), + } + logger.Debugf("line %s", l.Raw) + linesRead.With(prometheus.Labels{"source": filename}).Inc() - //we're reading logs at once, it must be time-machine buckets - out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE} + //we're reading logs at once, it must be time-machine buckets + out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE} + } } if err := scanner.Err(); err != nil { logger.Errorf("Error while reading file: %s", err) diff --git a/pkg/acquisition/modules/s3/s3.go b/pkg/acquisition/modules/s3/s3.go index 1428138f7..4ba31f43b 100644 --- a/pkg/acquisition/modules/s3/s3.go +++ b/pkg/acquisition/modules/s3/s3.go @@ -418,23 +418,29 @@ func (s *S3Source) readFile(bucket string, key string) error { scanner.Buffer(buf, s.Config.MaxBufferSize) } for scanner.Scan() { - text := scanner.Text() - logger.Tracef("Read line %s", text) - linesRead.WithLabelValues(bucket).Inc() - l := types.Line{} - l.Raw = text - l.Labels = s.Config.Labels - l.Time = time.Now().UTC() - l.Process = true - l.Module = s.GetName() - l.Src = bucket - var evt types.Event - if !s.Config.UseTimeMachine { - evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE} - } else { - evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE} + select { + case <-s.t.Dying(): + s.logger.Infof("Shutting down reader for %s/%s", bucket, key) + return nil + default: + text := scanner.Text() + logger.Tracef("Read line %s", text) + linesRead.WithLabelValues(bucket).Inc() + l := types.Line{} + l.Raw = text + l.Labels = s.Config.Labels + l.Time = time.Now().UTC() + l.Process = true + l.Module = s.GetName() + l.Src = bucket + var evt types.Event + if !s.Config.UseTimeMachine { + evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE} + } else { + evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE} + } + s.out <- evt } - s.out <- evt } if err := scanner.Err(); err != nil { return fmt.Errorf("failed to read object %s/%s: %s", bucket, key, err) @@ -629,6 +635,7 @@ func (s *S3Source) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error s.out = out s.ctx, s.cancel = context.WithCancel(context.Background()) s.Config.UseTimeMachine = true + s.t = t if s.Config.Key != "" { err := s.readFile(s.Config.BucketName, s.Config.Key) if err != nil {