diff --git a/pkg/acquisition/modules/loki/loki_test.go b/pkg/acquisition/modules/loki/loki_test.go index 0e1140520..1ddf063a3 100644 --- a/pkg/acquisition/modules/loki/loki_test.go +++ b/pkg/acquisition/modules/loki/loki_test.go @@ -383,6 +383,47 @@ query: > } } +func TestStopStreaming(t *testing.T) { + config := ` +mode: tail +source: loki +url: http://127.0.0.1:3100 +query: > + {server="demo"} +` + logger := log.New() + subLogger := logger.WithFields(log.Fields{ + "type": "loki", + }) + title := time.Now().String() + lokiSource := LokiSource{} + err := lokiSource.Configure([]byte(config), subLogger) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + out := make(chan types.Event) + drainTomb := tomb.Tomb{} + drainTomb.Go(func() error { + <-out + return nil + }) + lokiTomb := &tomb.Tomb{} + err = lokiSource.StreamingAcquisition(out, lokiTomb) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + feedLoki(subLogger, 1, title) + err = drainTomb.Wait() + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + lokiTomb.Kill(nil) + err = lokiTomb.Wait() + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } +} + type LogStreams struct { Streams []LogStream `json:"streams"` }