From a3eff5466517f0728d63a7db7a3b930c03a55717 Mon Sep 17 00:00:00 2001 From: Louis PERDEREAU Date: Mon, 9 Oct 2023 19:10:20 +0200 Subject: [PATCH] feat: Refactor Tail and add metrics --- pkg/acquisition/acquisition.go | 5 +- .../loki/internal/lokiclient/loki_client.go | 56 ++++++------------- pkg/acquisition/modules/loki/loki.go | 6 +- 3 files changed, 20 insertions(+), 47 deletions(-) diff --git a/pkg/acquisition/acquisition.go b/pkg/acquisition/acquisition.go index 176d6d519..65164b9c4 100644 --- a/pkg/acquisition/acquisition.go +++ b/pkg/acquisition/acquisition.go @@ -37,7 +37,7 @@ import ( type DataSourceUnavailableError struct { Name string - Err error + Err error } func (e *DataSourceUnavailableError) Error() string { @@ -48,7 +48,6 @@ func (e *DataSourceUnavailableError) Unwrap() error { return e.Err } - // The interface each datasource must implement type DataSource interface { GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module @@ -75,7 +74,7 @@ var AcquisitionSources = map[string]func() DataSource{ "wineventlog": func() DataSource { return &wineventlogacquisition.WinEventLogSource{} }, "kafka": func() DataSource { return &kafkaacquisition.KafkaSource{} }, "k8s-audit": func() DataSource { return &k8sauditacquisition.KubernetesAuditSource{} }, - "loki": func() DataSource { return &lokiacquisition.LokiSource{} }, + "loki": func() DataSource { return &lokiacquisition.LokiSource{} }, "s3": func() DataSource { return &s3acquisition.S3Source{} }, } diff --git a/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go b/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go index 264611f42..1c22e4727 100644 --- a/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go +++ b/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go @@ -41,34 +41,6 @@ type Config struct { Limit int } -func (lc *LokiClient) tailLogs(conn *websocket.Conn, c chan *LokiResponse, ctx context.Context) error { - tick := time.NewTicker(100 * time.Millisecond) - - for { - select { - case <-lc.t.Dying(): - lc.Logger.Info("LokiClient tomb is dying, closing connection") - tick.Stop() - return conn.Close() - case <-ctx.Done(): //this is technically useless, as the read from the websocket is blocking :( - lc.Logger.Info("LokiClient context is done, closing connection") - tick.Stop() - return conn.Close() - case <-tick.C: - lc.Logger.Debug("Reading from WS") - jsonResponse := &LokiResponse{} - err := conn.ReadJSON(jsonResponse) - if err != nil { - close(c) - return err - } - lc.Logger.Tracef("Read from WS: %v", jsonResponse) - c <- jsonResponse - lc.Logger.Debug("Sent response to channel") - } - } -} - func (lc *LokiClient) queryRange(uri string, ctx context.Context, c chan *LokiQueryRangeResponse) error { for { select { @@ -178,7 +150,7 @@ func (lc *LokiClient) Ready(ctx context.Context) error { func (lc *LokiClient) Tail(ctx context.Context) (chan *LokiResponse, error) { responseChan := make(chan *LokiResponse) - dialer := &websocket.Dialer{} //TODO: TLS support + dialer := &websocket.Dialer{} u := lc.getURLFor("loki/api/v1/tail", map[string]string{ "limit": strconv.Itoa(lc.config.Limit), "start": strconv.Itoa(int(time.Now().Add(-lc.config.Since).UnixNano())), @@ -201,21 +173,25 @@ func (lc *LokiClient) Tail(ctx context.Context) (chan *LokiResponse, error) { } requestHeader.Set("User-Agent", "Crowdsec "+cwversion.VersionStr()) lc.Logger.Infof("Connecting to %s", u) - conn, resp, err := dialer.Dial(u, requestHeader) - defer resp.Body.Close() + conn, _, err := dialer.Dial(u, requestHeader) + if err != nil { - if resp != nil { - buf, err2 := io.ReadAll(resp.Body) - if err2 != nil { - return nil, fmt.Errorf("error reading response body while handling WS error: %s (%s)", err, err2) - } - return nil, fmt.Errorf("error dialing WS: %s: %s", err, string(buf)) - } - return nil, err + lc.Logger.Errorf("Error connecting to websocket, err: %s", err) + return responseChan, fmt.Errorf("error connecting to websocket") } lc.t.Go(func() error { - return lc.tailLogs(conn, responseChan, ctx) + for { + jsonResponse := &LokiResponse{} + err = conn.ReadJSON(jsonResponse) + + if err != nil { + lc.Logger.Errorf("Websocket write: %s, raw: %s", err, jsonResponse) + return fmt.Errorf("Websocket write: %s, raw: %s", err, jsonResponse) + } + + responseChan <- jsonResponse + } }) return responseChan, nil diff --git a/pkg/acquisition/modules/loki/loki.go b/pkg/acquisition/modules/loki/loki.go index 2655d72d5..9683205d6 100644 --- a/pkg/acquisition/modules/loki/loki.go +++ b/pkg/acquisition/modules/loki/loki.go @@ -30,8 +30,6 @@ const ( lokiLimit int = 100 ) -var countDroppedEntries int = 0 - var linesRead = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "cs_lokisource_hits_total", @@ -144,6 +142,7 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error { l.Client = lokiclient.NewLokiClient(clientConfig) l.Client.Logger = logger.WithField("component", "lokiclient") + l.Client.Logger.WithField("source", l.Config.URL) return nil } @@ -327,9 +326,8 @@ func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er return err } if len(resp.DroppedEntries) > 0 { - countDroppedEntries += len(resp.DroppedEntries) + entriesDropped.With(prometheus.Labels{"source": l.Config.URL}).Add(float64(len(resp.DroppedEntries))) ll.WithField("query", l.Config.Query) - ll.WithField("source", l.GetName()) ll.Warnf("%d entries dropped from loki response", len(resp.DroppedEntries)) } for _, stream := range resp.Streams {