From 437c2af8e292cd30936bc872c48198ca84c7af57 Mon Sep 17 00:00:00 2001 From: Mathieu Lecarme Date: Mon, 6 Jun 2022 16:33:21 +0200 Subject: [PATCH] Prometheus counter. Better URL handling. Streamin acquisition. --- pkg/acquisition/modules/loki/loki.go | 106 +++++++++++++++++++++------ 1 file changed, 82 insertions(+), 24 deletions(-) diff --git a/pkg/acquisition/modules/loki/loki.go b/pkg/acquisition/modules/loki/loki.go index 463db3922..87d329fce 100644 --- a/pkg/acquisition/modules/loki/loki.go +++ b/pkg/acquisition/modules/loki/loki.go @@ -30,12 +30,21 @@ const ( readyTimeout time.Duration = 3 * time.Second readyLoop int = 3 readySleep time.Duration = 10 * time.Second + lokiLimit int = 100 ) +var linesRead = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "cs_lokisource_hits_total", + Help: "Total lines that were read.", + }, + []string{"source"}) + type LokiConfiguration struct { configuration.DataSourceCommonCfg `yaml:",inline"` - URL string // websocket url + URL string // Loki url Query string // LogQL query + DelayFor time.Duration } type LokiSource struct { @@ -47,11 +56,11 @@ type LokiSource struct { } func (l *LokiSource) GetMetrics() []prometheus.Collector { - return nil + return []prometheus.Collector{linesRead} } func (l *LokiSource) GetAggregMetrics() []prometheus.Collector { - return nil + return []prometheus.Collector{linesRead} } func (l *LokiSource) Configure(config []byte, logger *log.Entry) error { @@ -62,7 +71,7 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error { return errors.Wrap(err, "Cannot parse LokiAcquisition configuration") } l.dialer = &websocket.Dialer{} - l.lokiWebsocket, l.lokiReady, err = websocketFromUrl(lokiConfig.URL) + err = l.buildUrl() if err != nil { return errors.Wrap(err, "Cannot parse Loki url") } @@ -70,11 +79,12 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error { return nil } -func websocketFromUrl(lokiUrl string) (string, string, error) { - u, err := url.Parse(lokiUrl) +func (l *LokiSource) buildUrl() error { + u, err := url.Parse(l.config.URL) if err != nil { - return "", "", errors.Wrap(err, "Cannot parse Loki URL") + return errors.Wrap(err, "Cannot parse Loki URL") } + l.lokiReady = fmt.Sprintf("%s://%s/ready", u.Scheme, u.Host) buff := bytes.Buffer{} switch u.Scheme { @@ -83,7 +93,7 @@ func websocketFromUrl(lokiUrl string) (string, string, error) { case "https": buff.WriteString("wss") default: - return "", "", fmt.Errorf("unknown scheme : %s", u.Scheme) + return fmt.Errorf("unknown scheme : %s", u.Scheme) } buff.WriteString("://") buff.WriteString(u.Host) @@ -92,7 +102,18 @@ func websocketFromUrl(lokiUrl string) (string, string, error) { } else { buff.WriteString(u.Path) } - return buff.String(), fmt.Sprintf("%s://%s/ready", u.Scheme, u.Host), nil + buff.WriteByte('?') + params := url.Values{} + params.Add("query", l.config.Query) + params.Add("limit", fmt.Sprintf("%d", lokiLimit)) + if l.config.DelayFor != 0 { + params.Add("delay_for", fmt.Sprintf("%d", int64(l.config.DelayFor.Seconds()))) + } + start := time.Now() // FIXME config + params.Add("start", fmt.Sprintf("%d", start.UnixNano())) + buff.WriteString(params.Encode()) + l.lokiWebsocket = buff.String() + return nil } func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error { @@ -104,6 +125,7 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger if !strings.HasPrefix(dsn, "loki://") { return fmt.Errorf("invalid DSN %s for loki source, must start with loki://", dsn) } + // FIXME DSN parsing return nil } @@ -134,24 +156,60 @@ func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) erro if err != nil { return errors.Wrap(err, "OneShotAcquisition error while reading JSON websocket") } - ll := types.Line{} - ll.Raw = resp.Streams[0].Entries[0].Line - ll.Time = resp.Streams[0].Entries[0].Timestamp - ll.Src = l.lokiReady - ll.Labels = resp.Streams[0].Stream - ll.Process = true - ll.Module = l.GetName() - - out <- types.Event{ - Line: ll, - Process: true, - Type: types.LOG, - ExpectMode: leaky.TIMEMACHINE, - } + l.readOneTail(resp, out) return nil } -func (l *LokiSource) StreamingAcquisition(chan types.Event, *tomb.Tomb) error { +func (l *LokiSource) readOneTail(resp Tail, out chan types.Event) { + for _, stream := range resp.Streams { + for _, entry := range stream.Entries { + + ll := types.Line{} + ll.Raw = entry.Line + ll.Time = entry.Timestamp + ll.Src = l.config.URL + ll.Labels = stream.Stream + ll.Process = true + ll.Module = l.GetName() + + linesRead.With(prometheus.Labels{"source": l.config.URL}).Inc() + out <- types.Event{ + Line: ll, + Process: true, + Type: types.LOG, + ExpectMode: leaky.TIMEMACHINE, + } + } + } +} + +func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error { + err := l.ready() + if err != nil { + return errors.Wrap(err, "error while getting OneShotAcquisition") + } + t.Go(func() error { + for { + ctx, cancel := context.WithTimeout(context.TODO(), readyTimeout) + defer cancel() + header := &http.Header{} + c, res, err := l.dialer.DialContext(ctx, l.lokiWebsocket, *header) + if err != nil { + buf, _ := ioutil.ReadAll(res.Body) + return fmt.Errorf("loki websocket (%s) error %v : %s", l.lokiWebsocket, err, string(buf)) + } + defer c.Close() + var resp Tail + for { // draining the websocket + err = c.ReadJSON(&resp) + if err != nil { + return errors.Wrap(err, "OneShotAcquisition error while reading JSON websocket") + } + l.readOneTail(resp, out) + } + } + return nil + }) return nil }