feat: Refactor Tail and add metrics
This commit is contained in:
parent
e750412a39
commit
a3eff54665
|
@ -37,7 +37,7 @@ import (
|
|||
|
||||
type DataSourceUnavailableError struct {
|
||||
Name string
|
||||
Err error
|
||||
Err error
|
||||
}
|
||||
|
||||
func (e *DataSourceUnavailableError) Error() string {
|
||||
|
@ -48,7 +48,6 @@ func (e *DataSourceUnavailableError) Unwrap() error {
|
|||
return e.Err
|
||||
}
|
||||
|
||||
|
||||
// The interface each datasource must implement
|
||||
type DataSource interface {
|
||||
GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module
|
||||
|
@ -75,7 +74,7 @@ var AcquisitionSources = map[string]func() DataSource{
|
|||
"wineventlog": func() DataSource { return &wineventlogacquisition.WinEventLogSource{} },
|
||||
"kafka": func() DataSource { return &kafkaacquisition.KafkaSource{} },
|
||||
"k8s-audit": func() DataSource { return &k8sauditacquisition.KubernetesAuditSource{} },
|
||||
"loki": func() DataSource { return &lokiacquisition.LokiSource{} },
|
||||
"loki": func() DataSource { return &lokiacquisition.LokiSource{} },
|
||||
"s3": func() DataSource { return &s3acquisition.S3Source{} },
|
||||
}
|
||||
|
||||
|
|
|
@ -41,34 +41,6 @@ type Config struct {
|
|||
Limit int
|
||||
}
|
||||
|
||||
func (lc *LokiClient) tailLogs(conn *websocket.Conn, c chan *LokiResponse, ctx context.Context) error {
|
||||
tick := time.NewTicker(100 * time.Millisecond)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-lc.t.Dying():
|
||||
lc.Logger.Info("LokiClient tomb is dying, closing connection")
|
||||
tick.Stop()
|
||||
return conn.Close()
|
||||
case <-ctx.Done(): //this is technically useless, as the read from the websocket is blocking :(
|
||||
lc.Logger.Info("LokiClient context is done, closing connection")
|
||||
tick.Stop()
|
||||
return conn.Close()
|
||||
case <-tick.C:
|
||||
lc.Logger.Debug("Reading from WS")
|
||||
jsonResponse := &LokiResponse{}
|
||||
err := conn.ReadJSON(jsonResponse)
|
||||
if err != nil {
|
||||
close(c)
|
||||
return err
|
||||
}
|
||||
lc.Logger.Tracef("Read from WS: %v", jsonResponse)
|
||||
c <- jsonResponse
|
||||
lc.Logger.Debug("Sent response to channel")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (lc *LokiClient) queryRange(uri string, ctx context.Context, c chan *LokiQueryRangeResponse) error {
|
||||
for {
|
||||
select {
|
||||
|
@ -178,7 +150,7 @@ func (lc *LokiClient) Ready(ctx context.Context) error {
|
|||
|
||||
func (lc *LokiClient) Tail(ctx context.Context) (chan *LokiResponse, error) {
|
||||
responseChan := make(chan *LokiResponse)
|
||||
dialer := &websocket.Dialer{} //TODO: TLS support
|
||||
dialer := &websocket.Dialer{}
|
||||
u := lc.getURLFor("loki/api/v1/tail", map[string]string{
|
||||
"limit": strconv.Itoa(lc.config.Limit),
|
||||
"start": strconv.Itoa(int(time.Now().Add(-lc.config.Since).UnixNano())),
|
||||
|
@ -201,21 +173,25 @@ func (lc *LokiClient) Tail(ctx context.Context) (chan *LokiResponse, error) {
|
|||
}
|
||||
requestHeader.Set("User-Agent", "Crowdsec "+cwversion.VersionStr())
|
||||
lc.Logger.Infof("Connecting to %s", u)
|
||||
conn, resp, err := dialer.Dial(u, requestHeader)
|
||||
defer resp.Body.Close()
|
||||
conn, _, err := dialer.Dial(u, requestHeader)
|
||||
|
||||
if err != nil {
|
||||
if resp != nil {
|
||||
buf, err2 := io.ReadAll(resp.Body)
|
||||
if err2 != nil {
|
||||
return nil, fmt.Errorf("error reading response body while handling WS error: %s (%s)", err, err2)
|
||||
}
|
||||
return nil, fmt.Errorf("error dialing WS: %s: %s", err, string(buf))
|
||||
}
|
||||
return nil, err
|
||||
lc.Logger.Errorf("Error connecting to websocket, err: %s", err)
|
||||
return responseChan, fmt.Errorf("error connecting to websocket")
|
||||
}
|
||||
|
||||
lc.t.Go(func() error {
|
||||
return lc.tailLogs(conn, responseChan, ctx)
|
||||
for {
|
||||
jsonResponse := &LokiResponse{}
|
||||
err = conn.ReadJSON(jsonResponse)
|
||||
|
||||
if err != nil {
|
||||
lc.Logger.Errorf("Websocket write: %s, raw: %s", err, jsonResponse)
|
||||
return fmt.Errorf("Websocket write: %s, raw: %s", err, jsonResponse)
|
||||
}
|
||||
|
||||
responseChan <- jsonResponse
|
||||
}
|
||||
})
|
||||
|
||||
return responseChan, nil
|
||||
|
|
|
@ -30,8 +30,6 @@ const (
|
|||
lokiLimit int = 100
|
||||
)
|
||||
|
||||
var countDroppedEntries int = 0
|
||||
|
||||
var linesRead = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "cs_lokisource_hits_total",
|
||||
|
@ -144,6 +142,7 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error {
|
|||
|
||||
l.Client = lokiclient.NewLokiClient(clientConfig)
|
||||
l.Client.Logger = logger.WithField("component", "lokiclient")
|
||||
l.Client.Logger.WithField("source", l.Config.URL)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -327,9 +326,8 @@ func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er
|
|||
return err
|
||||
}
|
||||
if len(resp.DroppedEntries) > 0 {
|
||||
countDroppedEntries += len(resp.DroppedEntries)
|
||||
entriesDropped.With(prometheus.Labels{"source": l.Config.URL}).Add(float64(len(resp.DroppedEntries)))
|
||||
ll.WithField("query", l.Config.Query)
|
||||
ll.WithField("source", l.GetName())
|
||||
ll.Warnf("%d entries dropped from loki response", len(resp.DroppedEntries))
|
||||
}
|
||||
for _, stream := range resp.Streams {
|
||||
|
|
Loading…
Reference in a new issue