OneShot has its test

This commit is contained in:
Mathieu Lecarme 2022-06-15 18:49:42 +02:00 committed by lperdereau
parent ea860b1087
commit 6bf80ee083
3 changed files with 54 additions and 26 deletions

View file

@ -49,7 +49,12 @@ type LokiQuery struct {
} }
type Data struct { type Data struct {
ResultType string `json:"resultType"` ResultType string `json:"resultType"`
Result []Entry `json:"result"` // Warning, just stream value is handled Result []StreamResult `json:"result"` // Warning, just stream value is handled
Stats interface{} `json:"stats"` // Stats is boring, just ignore it Stats interface{} `json:"stats"` // Stats is boring, just ignore it
}
type StreamResult struct {
Stream map[string]string `json:"stream"`
Values []Entry `json:"values"`
} }

View file

@ -229,12 +229,13 @@ func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) erro
return errors.Wrap(err, "error while getting OneShotAcquisition") return errors.Wrap(err, "error while getting OneShotAcquisition")
} }
// FIXME: Paginate the responses with a loop // FIXME: Paginate the responses with a loop
// See /usr/local/etc/grafana/grafana.ini
params := &url.Values{} params := &url.Values{}
params.Set("query", l.Config.Query) params.Set("query", l.Config.Query)
params.Set("direction", "forward") params.Set("direction", "forward")
params.Set("time", time.Time(l.Config.Since).Format(time.RFC3339)) params.Set("since", time.Time(l.Config.Since).Format(time.RFC3339))
params.Set("limit", "1000") // FIXME params.Set("limit", "1000") // FIXME
url := fmt.Sprintf("%s/loki/api/v1/query?%s", url := fmt.Sprintf("%s/loki/api/v1/query_range?%s",
l.Config.URL, l.Config.URL,
params.Encode()) params.Encode())
logger := l.logger.WithField("url", url) logger := l.logger.WithField("url", url)
@ -253,38 +254,53 @@ func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) erro
return fmt.Errorf("Loki query return bad status : %d", resp.StatusCode) return fmt.Errorf("Loki query return bad status : %d", resp.StatusCode)
} }
var lq LokiQuery var lq LokiQuery
/*
fmt.Println(url)
msg, _ := io.ReadAll(resp.Body)
fmt.Println(string(msg))
*/
decoder := json.NewDecoder(resp.Body) decoder := json.NewDecoder(resp.Body)
err = decoder.Decode(&lq) err = decoder.Decode(&lq)
if err != nil { if err != nil {
return errors.Wrap(err, "can't parse JSON loki response") return errors.Wrap(err, "can't parse JSON loki response")
} }
for _, result := range lq.Data.Result {
logger.WithField("stream", result.Stream).Debug("Results", len(result.Values))
for _, entry := range result.Values {
l.readOneEntry(entry, result.Stream, out)
}
}
logger.Info("Loki queried")
t.Kill(nil)
return nil return nil
} }
func (l *LokiSource) readOneTail(resp Tail, out chan types.Event) { func (l *LokiSource) readOneTail(resp Tail, out chan types.Event) {
for _, stream := range resp.Streams { for _, stream := range resp.Streams {
for _, entry := range stream.Entries { for _, entry := range stream.Entries {
l.readOneEntry(entry, stream.Stream, out)
ll := types.Line{}
ll.Raw = entry.Line
ll.Time = entry.Timestamp
ll.Src = l.Config.URL
ll.Labels = stream.Stream
ll.Process = true
ll.Module = l.GetName()
linesRead.With(prometheus.Labels{"source": l.Config.URL}).Inc()
out <- types.Event{
Line: ll,
Process: true,
Type: types.LOG,
ExpectMode: leaky.TIMEMACHINE,
}
} }
} }
} }
func (l *LokiSource) readOneEntry(entry Entry, labels map[string]string, out chan types.Event) {
ll := types.Line{}
ll.Raw = entry.Line
ll.Time = entry.Timestamp
ll.Src = l.Config.URL
ll.Labels = labels
ll.Process = true
ll.Module = l.GetName()
linesRead.With(prometheus.Labels{"source": l.Config.URL}).Inc()
out <- types.Event{
Line: ll,
Process: true,
Type: types.LOG,
ExpectMode: leaky.TIMEMACHINE,
}
}
func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error { func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
err := l.ready() err := l.ready()
if err != nil { if err != nil {

View file

@ -168,18 +168,19 @@ func TestOneShotAcquisition(t *testing.T) {
log.SetOutput(os.Stdout) log.SetOutput(os.Stdout)
log.SetLevel(log.InfoLevel) log.SetLevel(log.InfoLevel)
log.Info("Test 'TestStreamingAcquisition'") log.Info("Test 'TestStreamingAcquisition'")
title := time.Now().String() title := time.Now().String() // Loki will be messy, with a lot of stuff, lets use a unique key
tests := []struct { tests := []struct {
config string config string
}{ }{
{ {
config: ` config: fmt.Sprintf(`
mode: cat mode: cat
source: loki source: loki
url: http://127.0.0.1:3100 url: http://127.0.0.1:3100
query: > query: >
{server="demo"} {server="demo",key="%s"}
`, since: 1h
`, title),
}, },
} }
@ -201,6 +202,7 @@ query: >
Stream: map[string]string{ Stream: map[string]string{
"server": "demo", "server": "demo",
"domain": "cw.example.com", "domain": "cw.example.com",
"key": title,
}, },
Values: make([]LogValue, 20), Values: make([]LogValue, 20),
}, },
@ -228,6 +230,11 @@ query: >
subLogger.Info("20 Events sent") subLogger.Info("20 Events sent")
out := make(chan types.Event) out := make(chan types.Event)
go func() {
for i := 0; i < 20; i++ {
<-out
}
}()
lokiTomb := tomb.Tomb{} lokiTomb := tomb.Tomb{}
err = lokiSource.OneShotAcquisition(out, &lokiTomb) err = lokiSource.OneShotAcquisition(out, &lokiTomb)
if err != nil { if err != nil {