diff --git a/pkg/acquisition/modules/loki/entry.go b/pkg/acquisition/modules/loki/entry.go index 42f4c73c6..c0ff857ea 100644 --- a/pkg/acquisition/modules/loki/entry.go +++ b/pkg/acquisition/modules/loki/entry.go @@ -49,7 +49,12 @@ type LokiQuery struct { } type Data struct { - ResultType string `json:"resultType"` - Result []Entry `json:"result"` // Warning, just stream value is handled - Stats interface{} `json:"stats"` // Stats is boring, just ignore it + ResultType string `json:"resultType"` + Result []StreamResult `json:"result"` // Warning, just stream value is handled + Stats interface{} `json:"stats"` // Stats is boring, just ignore it +} + +type StreamResult struct { + Stream map[string]string `json:"stream"` + Values []Entry `json:"values"` } diff --git a/pkg/acquisition/modules/loki/loki.go b/pkg/acquisition/modules/loki/loki.go index 534afaaf2..5c8609e95 100644 --- a/pkg/acquisition/modules/loki/loki.go +++ b/pkg/acquisition/modules/loki/loki.go @@ -229,12 +229,13 @@ func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) erro return errors.Wrap(err, "error while getting OneShotAcquisition") } // FIXME: Paginate the responses with a loop + // See /usr/local/etc/grafana/grafana.ini params := &url.Values{} params.Set("query", l.Config.Query) params.Set("direction", "forward") - params.Set("time", time.Time(l.Config.Since).Format(time.RFC3339)) + params.Set("since", time.Time(l.Config.Since).Format(time.RFC3339)) params.Set("limit", "1000") // FIXME - url := fmt.Sprintf("%s/loki/api/v1/query?%s", + url := fmt.Sprintf("%s/loki/api/v1/query_range?%s", l.Config.URL, params.Encode()) logger := l.logger.WithField("url", url) @@ -253,38 +254,53 @@ func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) erro return fmt.Errorf("Loki query return bad status : %d", resp.StatusCode) } var lq LokiQuery + /* + fmt.Println(url) + msg, _ := io.ReadAll(resp.Body) + fmt.Println(string(msg)) + */ decoder := json.NewDecoder(resp.Body) err = decoder.Decode(&lq) if err != nil { return errors.Wrap(err, "can't parse JSON loki response") } - + for _, result := range lq.Data.Result { + logger.WithField("stream", result.Stream).Debug("Results", len(result.Values)) + for _, entry := range result.Values { + l.readOneEntry(entry, result.Stream, out) + } + } + logger.Info("Loki queried") + t.Kill(nil) return nil } func (l *LokiSource) readOneTail(resp Tail, out chan types.Event) { for _, stream := range resp.Streams { for _, entry := range stream.Entries { - - ll := types.Line{} - ll.Raw = entry.Line - ll.Time = entry.Timestamp - ll.Src = l.Config.URL - ll.Labels = stream.Stream - ll.Process = true - ll.Module = l.GetName() - - linesRead.With(prometheus.Labels{"source": l.Config.URL}).Inc() - out <- types.Event{ - Line: ll, - Process: true, - Type: types.LOG, - ExpectMode: leaky.TIMEMACHINE, - } + l.readOneEntry(entry, stream.Stream, out) } } } +func (l *LokiSource) readOneEntry(entry Entry, labels map[string]string, out chan types.Event) { + ll := types.Line{} + ll.Raw = entry.Line + ll.Time = entry.Timestamp + ll.Src = l.Config.URL + ll.Labels = labels + ll.Process = true + ll.Module = l.GetName() + + linesRead.With(prometheus.Labels{"source": l.Config.URL}).Inc() + out <- types.Event{ + Line: ll, + Process: true, + Type: types.LOG, + ExpectMode: leaky.TIMEMACHINE, + } +} + func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error { err := l.ready() if err != nil { diff --git a/pkg/acquisition/modules/loki/loki_test.go b/pkg/acquisition/modules/loki/loki_test.go index 5b505c505..d735594ce 100644 --- a/pkg/acquisition/modules/loki/loki_test.go +++ b/pkg/acquisition/modules/loki/loki_test.go @@ -168,18 +168,19 @@ func TestOneShotAcquisition(t *testing.T) { log.SetOutput(os.Stdout) log.SetLevel(log.InfoLevel) log.Info("Test 'TestStreamingAcquisition'") - title := time.Now().String() + title := time.Now().String() // Loki will be messy, with a lot of stuff, lets use a unique key tests := []struct { config string }{ { - config: ` + config: fmt.Sprintf(` mode: cat source: loki url: http://127.0.0.1:3100 query: > - {server="demo"} -`, + {server="demo",key="%s"} +since: 1h +`, title), }, } @@ -201,6 +202,7 @@ query: > Stream: map[string]string{ "server": "demo", "domain": "cw.example.com", + "key": title, }, Values: make([]LogValue, 20), }, @@ -228,6 +230,11 @@ query: > subLogger.Info("20 Events sent") out := make(chan types.Event) + go func() { + for i := 0; i < 20; i++ { + <-out + } + }() lokiTomb := tomb.Tomb{} err = lokiSource.OneShotAcquisition(out, &lokiTomb) if err != nil {