Read loop.

This commit is contained in:
Mathieu Lecarme 2022-06-07 16:16:15 +02:00 committed by lperdereau
parent 92e3ea565b
commit 6c213828a4

View file

@ -96,6 +96,8 @@ url: http://127.0.0.1:3101
mode: tail mode: tail
source: loki source: loki
url: http://127.0.0.1:3100 url: http://127.0.0.1:3100
query: >
{server="demo"}
`, // No Loki server here `, // No Loki server here
expectedErr: "", expectedErr: "",
streamErr: "", streamErr: "",
@ -130,6 +132,15 @@ url: http://127.0.0.1:3100
return lokiSource.StreamingAcquisition(out, &lokiTomb) return lokiSource.StreamingAcquisition(out, &lokiTomb)
}) })
readTomb := tomb.Tomb{}
readTomb.Go(func() error {
for i := 0; i < 20; i++ {
evt := <-out
fmt.Println(evt)
}
return nil
})
writerTomb := tomb.Tomb{} writerTomb := tomb.Tomb{}
writerTomb.Go(func() error { writerTomb.Go(func() error {
streams := LogStreams{ streams := LogStreams{
@ -164,6 +175,7 @@ url: http://127.0.0.1:3100
log.Error(string(b)) log.Error(string(b))
return fmt.Errorf("Bad post status %d", resp.StatusCode) return fmt.Errorf("Bad post status %d", resp.StatusCode)
} }
subLogger.Info("20 Events sent")
return nil return nil
}) })
err = writerTomb.Wait() err = writerTomb.Wait()
@ -173,6 +185,13 @@ url: http://127.0.0.1:3100
err = streamTomb.Wait() err = streamTomb.Wait()
cstest.AssertErrorContains(t, err, ts.streamErr) cstest.AssertErrorContains(t, err, ts.streamErr)
if err == nil {
err = readTomb.Wait()
if err != nil {
t.Fatalf("Unexpected error : %s", err)
}
}
} }
} }