diff --git a/pkg/acquisition/modules/loki/loki.go b/pkg/acquisition/modules/loki/loki.go index 647da656b..e3f8a049b 100644 --- a/pkg/acquisition/modules/loki/loki.go +++ b/pkg/acquisition/modules/loki/loki.go @@ -337,6 +337,7 @@ func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er if err != nil { return errors.Wrap(err, "error while getting StreamingAcquisition") } + ll := l.logger.WithField("websocket url", l.lokiWebsocket) t.Go(func() error { for { ctx, cancel := context.WithTimeout(context.TODO(), readyTimeout) @@ -352,19 +353,22 @@ func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er return nil } if res == nil { // no body, it's a network error, not a HTTP error - return errors.Wrap(err, "loki StreamingAcquisition error before HTTP stack") + ll.WithError(err).Error("loki StreamingAcquisition error before HTTP stack") + break } buf, err2 := ioutil.ReadAll(res.Body) if err2 == nil { - return fmt.Errorf("loki websocket (%s) error %v : %s", l.lokiWebsocket, err, string(buf)) + ll.WithField("body", string(buf)).WithField("status", res.StatusCode).Error("loki http error") + break } - - return err2 + ll.WithError(err2).Error("can't read loki http body") + break } defer c.Close() _, reader, err := c.NextReader() if err != nil { - return errors.Wrap(err, "loki StreamingAcquisition error while reading JSON websocket") + ll.WithError(err).Error("loki StreamingAcquisition error while reading JSON websocket") + break } var resp Tail decoder := json.NewDecoder(reader) @@ -377,7 +381,7 @@ func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er if err == io.EOF { // the websocket is closed break } - return errors.Wrap(err, "loki StreamingAcquisition error while parsing JSON websocket") + ll.WithError(err).Error("loki StreamingAcquisition error while parsing JSON websocket") } l.logger.WithField("type", t).WithField("message", resp).Debug("Message receveid") l.readOneTail(resp, out)