diff --git a/pkg/acquisition/modules/loki/loki.go b/pkg/acquisition/modules/loki/loki.go index 9683205d6..d5a3a80c2 100644 --- a/pkg/acquisition/modules/loki/loki.go +++ b/pkg/acquisition/modules/loki/loki.go @@ -82,11 +82,11 @@ func (l *LokiSource) GetAggregMetrics() []prometheus.Collector { func (l *LokiSource) UnmarshalConfig(yamlConfig []byte) error { err := yaml.UnmarshalStrict(yamlConfig, &l.Config) if err != nil { - return errors.Wrap(err, "Cannot parse LokiAcquisition configuration") + return fmt.Errorf("cannot parse loki acquisition configuration: %w", err) } if l.Config.Query == "" { - return errors.New("Loki query is mandatory") + return errors.New("loki query is mandatory") } if l.Config.WaitForReady == 0 { @@ -141,8 +141,7 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error { } l.Client = lokiclient.NewLokiClient(clientConfig) - l.Client.Logger = logger.WithField("component", "lokiclient") - l.Client.Logger.WithField("source", l.Config.URL) + l.Client.Logger = logger.WithFields(log.Fields{"component": "lokiclient", "source": l.Config.URL}) return nil } @@ -155,13 +154,13 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger u, err := url.Parse(dsn) if err != nil { - return errors.Wrap(err, "can't parse dsn configuration : "+dsn) + return fmt.Errorf("while parsing dsn '%s': %w", dsn, err) } if u.Scheme != "loki" { return fmt.Errorf("invalid DSN %s for loki source, must start with loki://", dsn) } if u.Host == "" { - return errors.New("Empty loki host") + return errors.New("empty loki host") } scheme := "http" @@ -184,7 +183,7 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger if d := params.Get("delay_for"); d != "" { l.Config.DelayFor, err = time.ParseDuration(d) if err != nil { - return errors.Wrap(err, "can't parse since in DSN configuration") + return fmt.Errorf("invalid duration: %w", err) } if l.Config.DelayFor < 0*time.Second || l.Config.DelayFor > 5*time.Second { return errors.New("delay_for should be a value between 1s and 5s") @@ -196,14 +195,14 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger if s := params.Get("since"); s != "" { l.Config.Since, err = time.ParseDuration(s) if err != nil { - return errors.Wrap(err, "can't parse since in DSN configuration") + return fmt.Errorf("invalid since in dsn: %w", err) } } if limit := params.Get("limit"); limit != "" { limit, err := strconv.Atoi(limit) if err != nil { - return errors.Wrap(err, "can't parse limit in DSN configuration") + return fmt.Errorf("invalid limit in dsn: %w", err) } l.Config.Limit = limit } else { @@ -213,7 +212,7 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger if logLevel := params.Get("log_level"); logLevel != "" { level, err := log.ParseLevel(logLevel) if err != nil { - return errors.Wrap(err, "can't parse log_level in DSN configuration") + return fmt.Errorf("invalid log_level in dsn: %w", err) } l.Config.LogLevel = &level l.logger.Logger.SetLevel(level) @@ -237,7 +236,7 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger } l.Client = lokiclient.NewLokiClient(clientConfig) - l.Client.Logger = logger.WithField("component", "lokiclient") + l.Client.Logger = logger.WithFields(log.Fields{"component": "lokiclient", "source": l.Config.URL}) return nil } @@ -257,7 +256,7 @@ func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) erro defer cancel() err := l.Client.Ready(readyCtx) if err != nil { - return errors.Wrap(err, "loki is not ready") + return fmt.Errorf("loki is not ready: %w", err) } ctx, cancel := context.WithCancel(context.Background()) @@ -307,16 +306,16 @@ func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er defer cancel() err := l.Client.Ready(readyCtx) if err != nil { - return errors.Wrap(err, "loki is not ready") + return fmt.Errorf("loki is not ready: %w", err) } - ll := l.logger.WithField("websocket url", l.lokiWebsocket) + ll := l.logger.WithField("websocket_url", l.lokiWebsocket) t.Go(func() error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() respChan, err := l.Client.Tail(ctx) if err != nil { ll.Errorf("could not start loki tail: %s", err) - return errors.Wrap(err, "could not start loki tail") + return fmt.Errorf("while starting loki tail: %w", err) } for { select { @@ -327,8 +326,7 @@ func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er } if len(resp.DroppedEntries) > 0 { entriesDropped.With(prometheus.Labels{"source": l.Config.URL}).Add(float64(len(resp.DroppedEntries))) - ll.WithField("query", l.Config.Query) - ll.Warnf("%d entries dropped from loki response", len(resp.DroppedEntries)) + ll.WithField("query", l.Config.Query).Warnf("%d entries dropped from loki response", len(resp.DroppedEntries)) } for _, stream := range resp.Streams { for _, entry := range stream.Entries {