diff --git a/pkg/acquisition/modules/loki/loki_test.go b/pkg/acquisition/modules/loki/loki_test.go index 7a18c8e17..87db459c8 100644 --- a/pkg/acquisition/modules/loki/loki_test.go +++ b/pkg/acquisition/modules/loki/loki_test.go @@ -96,6 +96,8 @@ url: http://127.0.0.1:3101 mode: tail source: loki url: http://127.0.0.1:3100 +query: > + {server="demo"} `, // No Loki server here expectedErr: "", streamErr: "", @@ -130,6 +132,15 @@ url: http://127.0.0.1:3100 return lokiSource.StreamingAcquisition(out, &lokiTomb) }) + readTomb := tomb.Tomb{} + readTomb.Go(func() error { + for i := 0; i < 20; i++ { + evt := <-out + fmt.Println(evt) + } + return nil + }) + writerTomb := tomb.Tomb{} writerTomb.Go(func() error { streams := LogStreams{ @@ -164,6 +175,7 @@ url: http://127.0.0.1:3100 log.Error(string(b)) return fmt.Errorf("Bad post status %d", resp.StatusCode) } + subLogger.Info("20 Events sent") return nil }) err = writerTomb.Wait() @@ -173,6 +185,13 @@ url: http://127.0.0.1:3100 err = streamTomb.Wait() cstest.AssertErrorContains(t, err, ts.streamErr) + + if err == nil { + err = readTomb.Wait() + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + } } }