From bc6f3279984c007d8b22c9beb16165b8dd093a54 Mon Sep 17 00:00:00 2001 From: Mathieu Lecarme Date: Mon, 6 Jun 2022 15:32:24 +0200 Subject: [PATCH] Ready and Read once. --- pkg/acquisition/modules/loki/entry.go | 23 +++++++ pkg/acquisition/modules/loki/loki.go | 94 +++++++++++++++++++++++++-- 2 files changed, 110 insertions(+), 7 deletions(-) create mode 100644 pkg/acquisition/modules/loki/entry.go diff --git a/pkg/acquisition/modules/loki/entry.go b/pkg/acquisition/modules/loki/entry.go new file mode 100644 index 000000000..0ac89bb4c --- /dev/null +++ b/pkg/acquisition/modules/loki/entry.go @@ -0,0 +1,23 @@ +package loki + +import "time" + +type Entry struct { + Timestamp time.Time + Line string +} + +type Stream struct { + Stream map[string]string `json:"stream"` + Entries []Entry `json:"values"` +} + +type DroppedEntry struct { + Labels map[string]string `json:"labels"` + Timestamp time.Time `json:"timestamp"` +} + +type Tail struct { + Streams []Stream `json:"streams"` + DroppedEntries []DroppedEntry `json:"dropped_entries"` +} diff --git a/pkg/acquisition/modules/loki/loki.go b/pkg/acquisition/modules/loki/loki.go index f7235a752..463db3922 100644 --- a/pkg/acquisition/modules/loki/loki.go +++ b/pkg/acquisition/modules/loki/loki.go @@ -1,10 +1,20 @@ package loki +/* +https://grafana.com/docs/loki/latest/api/#get-lokiapiv1tail +*/ + import ( "bytes" + "context" "fmt" + "io/ioutil" + "net/http" "net/url" "strings" + "time" + + leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" "github.com/crowdsecurity/crowdsec/pkg/types" @@ -16,15 +26,23 @@ import ( "gopkg.in/yaml.v2" ) +const ( + readyTimeout time.Duration = 3 * time.Second + readyLoop int = 3 + readySleep time.Duration = 10 * time.Second +) + type LokiConfiguration struct { configuration.DataSourceCommonCfg `yaml:",inline"` URL string // websocket url + Query string // LogQL query } type LokiSource struct { config LokiConfiguration logger *log.Entry lokiWebsocket string + lokiReady string dialer *websocket.Dialer } @@ -44,7 +62,7 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error { return errors.Wrap(err, "Cannot parse LokiAcquisition configuration") } l.dialer = &websocket.Dialer{} - l.lokiWebsocket, err = websocketFromUrl(lokiConfig.URL) + l.lokiWebsocket, l.lokiReady, err = websocketFromUrl(lokiConfig.URL) if err != nil { return errors.Wrap(err, "Cannot parse Loki url") } @@ -52,11 +70,12 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error { return nil } -func websocketFromUrl(lokiUrl string) (string, error) { +func websocketFromUrl(lokiUrl string) (string, string, error) { u, err := url.Parse(lokiUrl) if err != nil { - return "", errors.Wrap(err, "Cannot parse Loki URL") + return "", "", errors.Wrap(err, "Cannot parse Loki URL") } + buff := bytes.Buffer{} switch u.Scheme { case "http": @@ -64,7 +83,7 @@ func websocketFromUrl(lokiUrl string) (string, error) { case "https": buff.WriteString("wss") default: - return "", fmt.Errorf("unknown scheme : %s", u.Scheme) + return "", "", fmt.Errorf("unknown scheme : %s", u.Scheme) } buff.WriteString("://") buff.WriteString(u.Host) @@ -73,7 +92,7 @@ func websocketFromUrl(lokiUrl string) (string, error) { } else { buff.WriteString(u.Path) } - return buff.String(), nil + return buff.String(), fmt.Sprintf("%s://%s/ready", u.Scheme, u.Host), nil } func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error { @@ -96,7 +115,39 @@ func (l *LokiSource) GetName() string { return "loki" } -func (l *LokiSource) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { +func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { + err := l.ready() + if err != nil { + return errors.Wrap(err, "error while getting OneShotAcquisition") + } + ctx, cancel := context.WithTimeout(context.TODO(), readyTimeout) + defer cancel() + header := &http.Header{} + c, res, err := l.dialer.DialContext(ctx, l.lokiWebsocket, *header) + if err != nil { + buf, _ := ioutil.ReadAll(res.Body) + return fmt.Errorf("loki websocket (%s) error %v : %s", l.lokiWebsocket, err, string(buf)) + } + defer c.Close() + var resp Tail + err = c.ReadJSON(&resp) + if err != nil { + return errors.Wrap(err, "OneShotAcquisition error while reading JSON websocket") + } + ll := types.Line{} + ll.Raw = resp.Streams[0].Entries[0].Line + ll.Time = resp.Streams[0].Entries[0].Timestamp + ll.Src = l.lokiReady + ll.Labels = resp.Streams[0].Stream + ll.Process = true + ll.Module = l.GetName() + + out <- types.Event{ + Line: ll, + Process: true, + Type: types.LOG, + ExpectMode: leaky.TIMEMACHINE, + } return nil } @@ -105,9 +156,38 @@ func (l *LokiSource) StreamingAcquisition(chan types.Event, *tomb.Tomb) error { } func (l *LokiSource) CanRun() error { - return nil + return nil // it's ok, even BSD can use Loki } func (l *LokiSource) Dump() interface{} { return l } + +func (l *LokiSource) ready() error { + client := &http.Client{ + Timeout: readyTimeout, + } + + for i := 0; i < readyLoop; i++ { + resp, err := client.Get(l.lokiReady) + if err != nil { + return errors.Wrap(err, "Test Loki services for readiness") + } + if resp.StatusCode == 200 { + return nil + } else { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return errors.Wrap(err, "can't read body while testing Loki readiness") + } + err = resp.Body.Close() + if err != nil { + return err + } + l.logger.Println("Loki is not ready :", string(body)) + time.Sleep(10 * time.Second) + } + } + + return fmt.Errorf("Loki service %s is not ready", l.lokiReady) +}