Paginate Loki query.

This commit is contained in:
Mathieu Lecarme 2022-06-16 20:04:30 +02:00 committed by lperdereau
parent a81d52a1ff
commit d7a1c6a938

View file

@ -228,50 +228,61 @@ func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) erro
if err != nil {
return errors.Wrap(err, "error while getting OneShotAcquisition")
}
// FIXME: Paginate the responses with a loop
// See /usr/local/etc/grafana/grafana.ini
// See https://grafana.com/docs/loki/latest/api/#get-lokiapiv1query_range
params := &url.Values{}
params.Set("query", l.Config.Query)
params.Set("direction", "forward")
params.Set("since", time.Time(l.Config.Since).Format(time.RFC3339))
params.Set("limit", "1000") // FIXME
url := fmt.Sprintf("%s/loki/api/v1/query_range?%s",
l.Config.URL,
params.Encode())
logger := l.logger.WithField("url", url)
req, err := http.NewRequest("GET", url, nil)
req.Header = l.header
params.Set("direction", "forward") // FIXME
params.Set("limit", fmt.Sprintf("%d", lokiLimit))
params.Set("end", time.Now().Format(time.RFC3339))
start := time.Time(l.Config.Since)
resp, err := http.DefaultClient.Do(req)
if err != nil {
logger.WithError(err).Error("http error")
return errors.Wrap(err, "Error while querying loki")
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
msg, _ := io.ReadAll(resp.Body)
logger.WithField("status", resp.StatusCode).Error(string(msg))
return fmt.Errorf("Loki query return bad status : %d", resp.StatusCode)
}
var lq LokiQuery
/*
fmt.Println(url)
msg, _ := io.ReadAll(resp.Body)
fmt.Println(string(msg))
*/
decoder := json.NewDecoder(resp.Body)
err = decoder.Decode(&lq)
if err != nil {
return errors.Wrap(err, "can't parse JSON loki response")
}
for _, result := range lq.Data.Result {
logger.WithField("stream", result.Stream).Debug("Results", len(result.Values))
for _, entry := range result.Values {
l.readOneEntry(entry, result.Stream, out)
defer t.Kill(nil)
defer l.logger.Info("Loki queried")
for {
params.Set("start", start.Format(time.RFC3339))
url := fmt.Sprintf("%s/loki/api/v1/query_range?%s",
l.Config.URL,
params.Encode())
logger := l.logger.WithField("url", url)
req, err := http.NewRequest("GET", url, nil)
req.Header = l.header
resp, err := http.DefaultClient.Do(req)
if err != nil {
logger.WithError(err).Error("http error")
return errors.Wrap(err, "Error while querying loki")
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
msg, _ := io.ReadAll(resp.Body)
logger.WithField("status", resp.StatusCode).Error(string(msg))
return fmt.Errorf("Loki query return bad status : %d", resp.StatusCode)
}
decoder := json.NewDecoder(resp.Body)
err = decoder.Decode(&lq)
if err != nil {
return errors.Wrap(err, "can't parse JSON loki response")
}
if len(lq.Data.Result) == 0 {
return nil
}
for _, result := range lq.Data.Result {
if len(result.Values) == 0 {
return nil
}
start = result.Values[0].Timestamp
logger.WithField("stream", result.Stream).Debug("Results", len(result.Values))
for _, entry := range result.Values {
l.readOneEntry(entry, result.Stream, out)
}
if len(result.Values) <= lokiLimit {
return nil
}
}
}
logger.Info("Loki queried")
t.Kill(nil)
return nil
}