add support for modular ticker, and ensure we use the same tomb so acquisition failure can bubble up

This commit is contained in:
bui 2023-10-17 16:24:20 +02:00
parent 1857ef4e74
commit 1bcd57ce29
2 changed files with 103 additions and 16 deletions

View file

@ -21,8 +21,10 @@ import (
type LokiClient struct {
Logger *log.Entry
config Config
t *tomb.Tomb
config Config
t *tomb.Tomb
fail_start time.Time
currentTickerInterval time.Duration
}
type Config struct {
@ -37,6 +39,8 @@ type Config struct {
Since time.Duration
Until time.Duration
FailMaxDuration time.Duration
DelayFor int
Limit int
}
@ -59,8 +63,51 @@ func updateURI(uri string, lq LokiQueryRangeResponse, infinite bool) string {
return u.String()
}
func (lc *LokiClient) SetTomb(t *tomb.Tomb) {
lc.t = t
}
func (lc *LokiClient) resetFailStart() {
if !lc.fail_start.IsZero() {
log.Infof("loki is back after %s", time.Since(lc.fail_start))
}
lc.fail_start = time.Time{}
}
func (lc *LokiClient) shouldRetry() bool {
if lc.fail_start.IsZero() {
lc.Logger.Warningf("loki is not available, will retry for %s", lc.config.FailMaxDuration)
lc.fail_start = time.Now()
return true
}
if time.Since(lc.fail_start) > lc.config.FailMaxDuration {
lc.Logger.Errorf("loki didn't manage to recover after %s, giving up", lc.config.FailMaxDuration)
return false
}
return true
}
func (lc *LokiClient) increaseTicker(ticker *time.Ticker) {
maxTicker := 10 * time.Second
if lc.currentTickerInterval < maxTicker {
lc.currentTickerInterval *= 2
if lc.currentTickerInterval > maxTicker {
lc.currentTickerInterval = maxTicker
}
ticker.Reset(lc.currentTickerInterval)
}
}
func (lc *LokiClient) decreaseTicker(ticker *time.Ticker) {
minTicker := 100 * time.Millisecond
if lc.currentTickerInterval != minTicker {
lc.currentTickerInterval = minTicker
ticker.Reset(lc.currentTickerInterval)
}
}
func (lc *LokiClient) queryRange(uri string, ctx context.Context, c chan *LokiQueryRangeResponse, infinite bool) error {
ticker := time.NewTicker(100 * time.Millisecond)
lc.currentTickerInterval = 100 * time.Millisecond
ticker := time.NewTicker(lc.currentTickerInterval)
defer ticker.Stop()
for {
select {
@ -71,32 +118,54 @@ func (lc *LokiClient) queryRange(uri string, ctx context.Context, c chan *LokiQu
case <-ticker.C:
lc.Logger.Debugf("Querying Loki: %s", uri)
resp, err := http.Get(uri)
if err != nil {
return errors.Wrapf(err, "error querying range")
if ok := lc.shouldRetry(); !ok {
return errors.Wrapf(err, "error querying range")
} else {
lc.increaseTicker(ticker)
continue
}
}
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
return errors.Wrapf(err, "bad HTTP response code: %d: %s", resp.StatusCode, string(body))
if ok := lc.shouldRetry(); !ok {
return errors.Wrapf(err, "bad HTTP response code: %d: %s", resp.StatusCode, string(body))
} else {
lc.increaseTicker(ticker)
continue
}
}
var lq LokiQueryRangeResponse
if err := json.NewDecoder(resp.Body).Decode(&lq); err != nil {
resp.Body.Close()
return errors.Wrapf(err, "error decoding Loki response")
if ok := lc.shouldRetry(); !ok {
return errors.Wrapf(err, "error decoding Loki response")
} else {
lc.increaseTicker(ticker)
continue
}
}
resp.Body.Close()
lc.Logger.Tracef("Got response: %+v", lq)
c <- &lq
lc.resetFailStart()
if !infinite && (len(lq.Data.Result) == 0 || len(lq.Data.Result[0].Entries) < lc.config.Limit) {
lc.Logger.Infof("Got less than %d results (%d), stopping", lc.config.Limit, len(lq.Data.Result))
close(c)
return nil
}
if infinite {
if len(lq.Data.Result) > 0 { //as long as we get results, we keep decreasing the ticker
lc.decreaseTicker(ticker)
} else {
lc.increaseTicker(ticker)
}
}
uri = updateURI(uri, lq, infinite)
}
}
@ -239,5 +308,5 @@ func (lc *LokiClient) QueryRange(ctx context.Context, infinite bool) chan *LokiQ
}
func NewLokiClient(config Config) *LokiClient {
return &LokiClient{t: &tomb.Tomb{}, Logger: log.WithField("component", "lokiclient"), config: config}
return &LokiClient{Logger: log.WithField("component", "lokiclient"), config: config}
}

View file

@ -52,6 +52,7 @@ type LokiConfiguration struct {
Headers map[string]string `yaml:"headers"` // HTTP headers for talking to Loki
WaitForReady time.Duration `yaml:"wait_for_ready"` // Retry interval, default is 10 seconds
Auth LokiAuthConfiguration `yaml:"auth"`
MaxFailureDuration time.Duration `yaml:"max_failure_duration"` // Max duration of failure before stopping the source
configuration.DataSourceCommonCfg `yaml:",inline"`
}
@ -110,6 +111,10 @@ func (l *LokiSource) UnmarshalConfig(yamlConfig []byte) error {
l.Config.Since = 0
}
if l.Config.MaxFailureDuration == 0 {
l.Config.MaxFailureDuration = 30 * time.Second
}
return nil
}
@ -124,13 +129,14 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error {
l.logger.Infof("Since value: %s", l.Config.Since.String())
clientConfig := lokiclient.Config{
LokiURL: l.Config.URL,
Headers: l.Config.Headers,
Limit: l.Config.Limit,
Query: l.Config.Query,
Since: l.Config.Since,
Username: l.Config.Auth.Username,
Password: l.Config.Auth.Password,
LokiURL: l.Config.URL,
Headers: l.Config.Headers,
Limit: l.Config.Limit,
Query: l.Config.Query,
Since: l.Config.Since,
Username: l.Config.Auth.Username,
Password: l.Config.Auth.Password,
FailMaxDuration: l.Config.MaxFailureDuration,
}
l.Client = lokiclient.NewLokiClient(clientConfig)
@ -192,6 +198,16 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger
}
}
if max_failure_duration := params.Get("max_failure_duration"); max_failure_duration != "" {
duration, err := time.ParseDuration(max_failure_duration)
if err != nil {
return fmt.Errorf("invalid max_failure_duration in dsn: %w", err)
}
l.Config.MaxFailureDuration = duration
} else {
l.Config.MaxFailureDuration = 5 * time.Second // for OneShot mode it doesn't make sense to have longer duration
}
if limit := params.Get("limit"); limit != "" {
limit, err := strconv.Atoi(limit)
if err != nil {
@ -245,6 +261,7 @@ func (l *LokiSource) GetName() string {
// OneShotAcquisition reads a set of file and returns when done
func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
l.logger.Debug("Loki one shot acquisition")
l.Client.SetTomb(t)
readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady)
defer cancel()
err := l.Client.Ready(readyCtx)
@ -299,6 +316,7 @@ func (l *LokiSource) readOneEntry(entry lokiclient.Entry, labels map[string]stri
}
func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
l.Client.SetTomb(t)
readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady)
defer cancel()
err := l.Client.Ready(readyCtx)