diff --git a/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go b/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go index d5cc54e3d..270a5d346 100644 --- a/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go +++ b/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go @@ -21,8 +21,10 @@ import ( type LokiClient struct { Logger *log.Entry - config Config - t *tomb.Tomb + config Config + t *tomb.Tomb + fail_start time.Time + currentTickerInterval time.Duration } type Config struct { @@ -37,6 +39,8 @@ type Config struct { Since time.Duration Until time.Duration + FailMaxDuration time.Duration + DelayFor int Limit int } @@ -59,8 +63,51 @@ func updateURI(uri string, lq LokiQueryRangeResponse, infinite bool) string { return u.String() } +func (lc *LokiClient) SetTomb(t *tomb.Tomb) { + lc.t = t +} + +func (lc *LokiClient) resetFailStart() { + if !lc.fail_start.IsZero() { + log.Infof("loki is back after %s", time.Since(lc.fail_start)) + } + lc.fail_start = time.Time{} +} +func (lc *LokiClient) shouldRetry() bool { + if lc.fail_start.IsZero() { + lc.Logger.Warningf("loki is not available, will retry for %s", lc.config.FailMaxDuration) + lc.fail_start = time.Now() + return true + } + if time.Since(lc.fail_start) > lc.config.FailMaxDuration { + lc.Logger.Errorf("loki didn't manage to recover after %s, giving up", lc.config.FailMaxDuration) + return false + } + return true +} + +func (lc *LokiClient) increaseTicker(ticker *time.Ticker) { + maxTicker := 10 * time.Second + if lc.currentTickerInterval < maxTicker { + lc.currentTickerInterval *= 2 + if lc.currentTickerInterval > maxTicker { + lc.currentTickerInterval = maxTicker + } + ticker.Reset(lc.currentTickerInterval) + } +} + +func (lc *LokiClient) decreaseTicker(ticker *time.Ticker) { + minTicker := 100 * time.Millisecond + if lc.currentTickerInterval != minTicker { + lc.currentTickerInterval = minTicker + ticker.Reset(lc.currentTickerInterval) + } +} + func (lc *LokiClient) queryRange(uri string, ctx context.Context, c chan *LokiQueryRangeResponse, infinite bool) error { - ticker := time.NewTicker(100 * time.Millisecond) + lc.currentTickerInterval = 100 * time.Millisecond + ticker := time.NewTicker(lc.currentTickerInterval) defer ticker.Stop() for { select { @@ -71,32 +118,54 @@ func (lc *LokiClient) queryRange(uri string, ctx context.Context, c chan *LokiQu case <-ticker.C: lc.Logger.Debugf("Querying Loki: %s", uri) resp, err := http.Get(uri) - if err != nil { - return errors.Wrapf(err, "error querying range") + if ok := lc.shouldRetry(); !ok { + return errors.Wrapf(err, "error querying range") + } else { + lc.increaseTicker(ticker) + continue + } } if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) resp.Body.Close() - return errors.Wrapf(err, "bad HTTP response code: %d: %s", resp.StatusCode, string(body)) + if ok := lc.shouldRetry(); !ok { + return errors.Wrapf(err, "bad HTTP response code: %d: %s", resp.StatusCode, string(body)) + } else { + lc.increaseTicker(ticker) + continue + } } var lq LokiQueryRangeResponse if err := json.NewDecoder(resp.Body).Decode(&lq); err != nil { resp.Body.Close() - return errors.Wrapf(err, "error decoding Loki response") + if ok := lc.shouldRetry(); !ok { + return errors.Wrapf(err, "error decoding Loki response") + } else { + lc.increaseTicker(ticker) + continue + } } resp.Body.Close() lc.Logger.Tracef("Got response: %+v", lq) c <- &lq - + lc.resetFailStart() if !infinite && (len(lq.Data.Result) == 0 || len(lq.Data.Result[0].Entries) < lc.config.Limit) { lc.Logger.Infof("Got less than %d results (%d), stopping", lc.config.Limit, len(lq.Data.Result)) close(c) return nil } + if infinite { + if len(lq.Data.Result) > 0 { //as long as we get results, we keep decreasing the ticker + lc.decreaseTicker(ticker) + } else { + lc.increaseTicker(ticker) + } + } + uri = updateURI(uri, lq, infinite) } } @@ -239,5 +308,5 @@ func (lc *LokiClient) QueryRange(ctx context.Context, infinite bool) chan *LokiQ } func NewLokiClient(config Config) *LokiClient { - return &LokiClient{t: &tomb.Tomb{}, Logger: log.WithField("component", "lokiclient"), config: config} + return &LokiClient{Logger: log.WithField("component", "lokiclient"), config: config} } diff --git a/pkg/acquisition/modules/loki/loki.go b/pkg/acquisition/modules/loki/loki.go index 3b801c7dc..555deefe2 100644 --- a/pkg/acquisition/modules/loki/loki.go +++ b/pkg/acquisition/modules/loki/loki.go @@ -52,6 +52,7 @@ type LokiConfiguration struct { Headers map[string]string `yaml:"headers"` // HTTP headers for talking to Loki WaitForReady time.Duration `yaml:"wait_for_ready"` // Retry interval, default is 10 seconds Auth LokiAuthConfiguration `yaml:"auth"` + MaxFailureDuration time.Duration `yaml:"max_failure_duration"` // Max duration of failure before stopping the source configuration.DataSourceCommonCfg `yaml:",inline"` } @@ -110,6 +111,10 @@ func (l *LokiSource) UnmarshalConfig(yamlConfig []byte) error { l.Config.Since = 0 } + if l.Config.MaxFailureDuration == 0 { + l.Config.MaxFailureDuration = 30 * time.Second + } + return nil } @@ -124,13 +129,14 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error { l.logger.Infof("Since value: %s", l.Config.Since.String()) clientConfig := lokiclient.Config{ - LokiURL: l.Config.URL, - Headers: l.Config.Headers, - Limit: l.Config.Limit, - Query: l.Config.Query, - Since: l.Config.Since, - Username: l.Config.Auth.Username, - Password: l.Config.Auth.Password, + LokiURL: l.Config.URL, + Headers: l.Config.Headers, + Limit: l.Config.Limit, + Query: l.Config.Query, + Since: l.Config.Since, + Username: l.Config.Auth.Username, + Password: l.Config.Auth.Password, + FailMaxDuration: l.Config.MaxFailureDuration, } l.Client = lokiclient.NewLokiClient(clientConfig) @@ -192,6 +198,16 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger } } + if max_failure_duration := params.Get("max_failure_duration"); max_failure_duration != "" { + duration, err := time.ParseDuration(max_failure_duration) + if err != nil { + return fmt.Errorf("invalid max_failure_duration in dsn: %w", err) + } + l.Config.MaxFailureDuration = duration + } else { + l.Config.MaxFailureDuration = 5 * time.Second // for OneShot mode it doesn't make sense to have longer duration + } + if limit := params.Get("limit"); limit != "" { limit, err := strconv.Atoi(limit) if err != nil { @@ -245,6 +261,7 @@ func (l *LokiSource) GetName() string { // OneShotAcquisition reads a set of file and returns when done func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { l.logger.Debug("Loki one shot acquisition") + l.Client.SetTomb(t) readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady) defer cancel() err := l.Client.Ready(readyCtx) @@ -299,6 +316,7 @@ func (l *LokiSource) readOneEntry(entry lokiclient.Entry, labels map[string]stri } func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error { + l.Client.SetTomb(t) readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady) defer cancel() err := l.Client.Ready(readyCtx)