improve bubble up errors and fix a few logging messages
This commit is contained in:
parent
1a8c127529
commit
755ca0238b
|
@ -82,11 +82,11 @@ func (l *LokiSource) GetAggregMetrics() []prometheus.Collector {
|
||||||
func (l *LokiSource) UnmarshalConfig(yamlConfig []byte) error {
|
func (l *LokiSource) UnmarshalConfig(yamlConfig []byte) error {
|
||||||
err := yaml.UnmarshalStrict(yamlConfig, &l.Config)
|
err := yaml.UnmarshalStrict(yamlConfig, &l.Config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "Cannot parse LokiAcquisition configuration")
|
return fmt.Errorf("cannot parse loki acquisition configuration: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if l.Config.Query == "" {
|
if l.Config.Query == "" {
|
||||||
return errors.New("Loki query is mandatory")
|
return errors.New("loki query is mandatory")
|
||||||
}
|
}
|
||||||
|
|
||||||
if l.Config.WaitForReady == 0 {
|
if l.Config.WaitForReady == 0 {
|
||||||
|
@ -141,8 +141,7 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
l.Client = lokiclient.NewLokiClient(clientConfig)
|
l.Client = lokiclient.NewLokiClient(clientConfig)
|
||||||
l.Client.Logger = logger.WithField("component", "lokiclient")
|
l.Client.Logger = logger.WithFields(log.Fields{"component": "lokiclient", "source": l.Config.URL})
|
||||||
l.Client.Logger.WithField("source", l.Config.URL)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,13 +154,13 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger
|
||||||
|
|
||||||
u, err := url.Parse(dsn)
|
u, err := url.Parse(dsn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "can't parse dsn configuration : "+dsn)
|
return fmt.Errorf("while parsing dsn '%s': %w", dsn, err)
|
||||||
}
|
}
|
||||||
if u.Scheme != "loki" {
|
if u.Scheme != "loki" {
|
||||||
return fmt.Errorf("invalid DSN %s for loki source, must start with loki://", dsn)
|
return fmt.Errorf("invalid DSN %s for loki source, must start with loki://", dsn)
|
||||||
}
|
}
|
||||||
if u.Host == "" {
|
if u.Host == "" {
|
||||||
return errors.New("Empty loki host")
|
return errors.New("empty loki host")
|
||||||
}
|
}
|
||||||
scheme := "http"
|
scheme := "http"
|
||||||
|
|
||||||
|
@ -184,7 +183,7 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger
|
||||||
if d := params.Get("delay_for"); d != "" {
|
if d := params.Get("delay_for"); d != "" {
|
||||||
l.Config.DelayFor, err = time.ParseDuration(d)
|
l.Config.DelayFor, err = time.ParseDuration(d)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "can't parse since in DSN configuration")
|
return fmt.Errorf("invalid duration: %w", err)
|
||||||
}
|
}
|
||||||
if l.Config.DelayFor < 0*time.Second || l.Config.DelayFor > 5*time.Second {
|
if l.Config.DelayFor < 0*time.Second || l.Config.DelayFor > 5*time.Second {
|
||||||
return errors.New("delay_for should be a value between 1s and 5s")
|
return errors.New("delay_for should be a value between 1s and 5s")
|
||||||
|
@ -196,14 +195,14 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger
|
||||||
if s := params.Get("since"); s != "" {
|
if s := params.Get("since"); s != "" {
|
||||||
l.Config.Since, err = time.ParseDuration(s)
|
l.Config.Since, err = time.ParseDuration(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "can't parse since in DSN configuration")
|
return fmt.Errorf("invalid since in dsn: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if limit := params.Get("limit"); limit != "" {
|
if limit := params.Get("limit"); limit != "" {
|
||||||
limit, err := strconv.Atoi(limit)
|
limit, err := strconv.Atoi(limit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "can't parse limit in DSN configuration")
|
return fmt.Errorf("invalid limit in dsn: %w", err)
|
||||||
}
|
}
|
||||||
l.Config.Limit = limit
|
l.Config.Limit = limit
|
||||||
} else {
|
} else {
|
||||||
|
@ -213,7 +212,7 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger
|
||||||
if logLevel := params.Get("log_level"); logLevel != "" {
|
if logLevel := params.Get("log_level"); logLevel != "" {
|
||||||
level, err := log.ParseLevel(logLevel)
|
level, err := log.ParseLevel(logLevel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "can't parse log_level in DSN configuration")
|
return fmt.Errorf("invalid log_level in dsn: %w", err)
|
||||||
}
|
}
|
||||||
l.Config.LogLevel = &level
|
l.Config.LogLevel = &level
|
||||||
l.logger.Logger.SetLevel(level)
|
l.logger.Logger.SetLevel(level)
|
||||||
|
@ -237,7 +236,7 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger
|
||||||
}
|
}
|
||||||
|
|
||||||
l.Client = lokiclient.NewLokiClient(clientConfig)
|
l.Client = lokiclient.NewLokiClient(clientConfig)
|
||||||
l.Client.Logger = logger.WithField("component", "lokiclient")
|
l.Client.Logger = logger.WithFields(log.Fields{"component": "lokiclient", "source": l.Config.URL})
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -257,7 +256,7 @@ func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) erro
|
||||||
defer cancel()
|
defer cancel()
|
||||||
err := l.Client.Ready(readyCtx)
|
err := l.Client.Ready(readyCtx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "loki is not ready")
|
return fmt.Errorf("loki is not ready: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
@ -307,16 +306,16 @@ func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er
|
||||||
defer cancel()
|
defer cancel()
|
||||||
err := l.Client.Ready(readyCtx)
|
err := l.Client.Ready(readyCtx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "loki is not ready")
|
return fmt.Errorf("loki is not ready: %w", err)
|
||||||
}
|
}
|
||||||
ll := l.logger.WithField("websocket url", l.lokiWebsocket)
|
ll := l.logger.WithField("websocket_url", l.lokiWebsocket)
|
||||||
t.Go(func() error {
|
t.Go(func() error {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
respChan, err := l.Client.Tail(ctx)
|
respChan, err := l.Client.Tail(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ll.Errorf("could not start loki tail: %s", err)
|
ll.Errorf("could not start loki tail: %s", err)
|
||||||
return errors.Wrap(err, "could not start loki tail")
|
return fmt.Errorf("while starting loki tail: %w", err)
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -327,8 +326,7 @@ func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er
|
||||||
}
|
}
|
||||||
if len(resp.DroppedEntries) > 0 {
|
if len(resp.DroppedEntries) > 0 {
|
||||||
entriesDropped.With(prometheus.Labels{"source": l.Config.URL}).Add(float64(len(resp.DroppedEntries)))
|
entriesDropped.With(prometheus.Labels{"source": l.Config.URL}).Add(float64(len(resp.DroppedEntries)))
|
||||||
ll.WithField("query", l.Config.Query)
|
ll.WithField("query", l.Config.Query).Warnf("%d entries dropped from loki response", len(resp.DroppedEntries))
|
||||||
ll.Warnf("%d entries dropped from loki response", len(resp.DroppedEntries))
|
|
||||||
}
|
}
|
||||||
for _, stream := range resp.Streams {
|
for _, stream := range resp.Streams {
|
||||||
for _, entry := range stream.Entries {
|
for _, entry := range stream.Entries {
|
||||||
|
|
Loading…
Reference in a new issue