diff --git a/pkg/acquisition/modules/loki/loki.go b/pkg/acquisition/modules/loki/loki.go index 5c8609e95..2f3307b0e 100644 --- a/pkg/acquisition/modules/loki/loki.go +++ b/pkg/acquisition/modules/loki/loki.go @@ -228,50 +228,61 @@ func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) erro if err != nil { return errors.Wrap(err, "error while getting OneShotAcquisition") } - // FIXME: Paginate the responses with a loop - // See /usr/local/etc/grafana/grafana.ini + + // See https://grafana.com/docs/loki/latest/api/#get-lokiapiv1query_range params := &url.Values{} params.Set("query", l.Config.Query) - params.Set("direction", "forward") - params.Set("since", time.Time(l.Config.Since).Format(time.RFC3339)) - params.Set("limit", "1000") // FIXME - url := fmt.Sprintf("%s/loki/api/v1/query_range?%s", - l.Config.URL, - params.Encode()) - logger := l.logger.WithField("url", url) - req, err := http.NewRequest("GET", url, nil) - req.Header = l.header + params.Set("direction", "forward") // FIXME + params.Set("limit", fmt.Sprintf("%d", lokiLimit)) + params.Set("end", time.Now().Format(time.RFC3339)) + start := time.Time(l.Config.Since) - resp, err := http.DefaultClient.Do(req) - if err != nil { - logger.WithError(err).Error("http error") - return errors.Wrap(err, "Error while querying loki") - } - defer resp.Body.Close() - if resp.StatusCode != 200 { - msg, _ := io.ReadAll(resp.Body) - logger.WithField("status", resp.StatusCode).Error(string(msg)) - return fmt.Errorf("Loki query return bad status : %d", resp.StatusCode) - } var lq LokiQuery - /* - fmt.Println(url) - msg, _ := io.ReadAll(resp.Body) - fmt.Println(string(msg)) - */ - decoder := json.NewDecoder(resp.Body) - err = decoder.Decode(&lq) - if err != nil { - return errors.Wrap(err, "can't parse JSON loki response") - } - for _, result := range lq.Data.Result { - logger.WithField("stream", result.Stream).Debug("Results", len(result.Values)) - for _, entry := range result.Values { - l.readOneEntry(entry, result.Stream, out) + defer t.Kill(nil) + defer l.logger.Info("Loki queried") + + for { + params.Set("start", start.Format(time.RFC3339)) + url := fmt.Sprintf("%s/loki/api/v1/query_range?%s", + l.Config.URL, + params.Encode()) + logger := l.logger.WithField("url", url) + req, err := http.NewRequest("GET", url, nil) + req.Header = l.header + + resp, err := http.DefaultClient.Do(req) + if err != nil { + logger.WithError(err).Error("http error") + return errors.Wrap(err, "Error while querying loki") + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + msg, _ := io.ReadAll(resp.Body) + logger.WithField("status", resp.StatusCode).Error(string(msg)) + return fmt.Errorf("Loki query return bad status : %d", resp.StatusCode) + } + decoder := json.NewDecoder(resp.Body) + err = decoder.Decode(&lq) + if err != nil { + return errors.Wrap(err, "can't parse JSON loki response") + } + if len(lq.Data.Result) == 0 { + return nil + } + for _, result := range lq.Data.Result { + if len(result.Values) == 0 { + return nil + } + start = result.Values[0].Timestamp + logger.WithField("stream", result.Stream).Debug("Results", len(result.Values)) + for _, entry := range result.Values { + l.readOneEntry(entry, result.Stream, out) + } + if len(result.Values) <= lokiLimit { + return nil + } } } - logger.Info("Loki queried") - t.Kill(nil) return nil }