From 92f923cfa81bbcd1972199421b206da2a86fa740 Mon Sep 17 00:00:00 2001 From: lperdereau <45263601+lperdereau@users.noreply.github.com> Date: Wed, 22 Nov 2023 13:31:39 +0100 Subject: [PATCH] Loki integration #2 (#2306) * Add support for loki datasource --------- Co-authored-by: Mathieu Lecarme Co-authored-by: Sebastien Blot Co-authored-by: Thibault "bui" Koechlin --- .github/workflows/go-tests.yml | 12 + go.mod | 4 +- pkg/acquisition/acquisition.go | 5 +- pkg/acquisition/modules/loki/entry.go | 60 ++ .../loki/internal/lokiclient/loki_client.go | 315 +++++++++++ .../modules/loki/internal/lokiclient/types.go | 55 ++ pkg/acquisition/modules/loki/loki.go | 370 +++++++++++++ pkg/acquisition/modules/loki/loki_test.go | 512 ++++++++++++++++++ pkg/acquisition/modules/loki/timestamp.go | 29 + .../modules/loki/timestamp_test.go | 47 ++ pkg/setup/detect_test.go | 10 + test/localstack/docker-compose.yml | 5 + 12 files changed, 1420 insertions(+), 4 deletions(-) create mode 100644 pkg/acquisition/modules/loki/entry.go create mode 100644 pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go create mode 100644 pkg/acquisition/modules/loki/internal/lokiclient/types.go create mode 100644 pkg/acquisition/modules/loki/loki.go create mode 100644 pkg/acquisition/modules/loki/loki_test.go create mode 100644 pkg/acquisition/modules/loki/timestamp.go create mode 100644 pkg/acquisition/modules/loki/timestamp_test.go diff --git a/.github/workflows/go-tests.yml b/.github/workflows/go-tests.yml index 4cd09ba63..599285a63 100644 --- a/.github/workflows/go-tests.yml +++ b/.github/workflows/go-tests.yml @@ -108,6 +108,18 @@ jobs: --health-timeout 10s --health-retries 5 + loki: + image: grafana/loki:2.8.0 + ports: + - "3100:3100" + options: >- + --name=loki1 + --health-cmd "wget -q -O - http://localhost:3100/ready | grep 'ready'" + --health-interval 30s + --health-timeout 10s + --health-retries 5 + --health-start-period 30s + steps: - name: Check out CrowdSec repository diff --git a/go.mod b/go.mod index 9cb89a14c..db223c9a1 100644 --- a/go.mod +++ b/go.mod @@ -49,6 +49,7 @@ require ( github.com/google/uuid v1.3.0 github.com/google/winops v0.0.0-20230712152054-af9b550d0601 github.com/goombaio/namegenerator v0.0.0-20181006234301-989e774b106e + github.com/gorilla/websocket v1.5.0 github.com/hashicorp/go-hclog v1.5.0 github.com/hashicorp/go-plugin v1.4.10 github.com/hashicorp/go-version v1.2.1 @@ -87,6 +88,7 @@ require ( gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 + gotest.tools/v3 v3.5.0 k8s.io/apiserver v0.27.3 ) @@ -126,7 +128,6 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/google/gofuzz v1.2.0 // indirect - github.com/gorilla/websocket v1.5.0 // indirect github.com/hashicorp/hcl/v2 v2.13.0 // indirect github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb // indirect github.com/huandu/xstrings v1.3.2 // indirect @@ -201,7 +202,6 @@ require ( google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect - gotest.tools/v3 v3.5.0 // indirect k8s.io/api v0.27.3 // indirect k8s.io/apimachinery v0.27.3 // indirect k8s.io/klog/v2 v2.90.1 // indirect diff --git a/pkg/acquisition/acquisition.go b/pkg/acquisition/acquisition.go index e2996ea4f..65164b9c4 100644 --- a/pkg/acquisition/acquisition.go +++ b/pkg/acquisition/acquisition.go @@ -25,6 +25,7 @@ import ( kafkaacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/kafka" kinesisacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/kinesis" k8sauditacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/kubernetesaudit" + lokiacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/loki" s3acquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/s3" syslogacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog" wineventlogacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/wineventlog" @@ -36,7 +37,7 @@ import ( type DataSourceUnavailableError struct { Name string - Err error + Err error } func (e *DataSourceUnavailableError) Error() string { @@ -47,7 +48,6 @@ func (e *DataSourceUnavailableError) Unwrap() error { return e.Err } - // The interface each datasource must implement type DataSource interface { GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module @@ -74,6 +74,7 @@ var AcquisitionSources = map[string]func() DataSource{ "wineventlog": func() DataSource { return &wineventlogacquisition.WinEventLogSource{} }, "kafka": func() DataSource { return &kafkaacquisition.KafkaSource{} }, "k8s-audit": func() DataSource { return &k8sauditacquisition.KubernetesAuditSource{} }, + "loki": func() DataSource { return &lokiacquisition.LokiSource{} }, "s3": func() DataSource { return &s3acquisition.S3Source{} }, } diff --git a/pkg/acquisition/modules/loki/entry.go b/pkg/acquisition/modules/loki/entry.go new file mode 100644 index 000000000..c0ff857ea --- /dev/null +++ b/pkg/acquisition/modules/loki/entry.go @@ -0,0 +1,60 @@ +package loki + +import ( + "encoding/json" + "strconv" + "time" +) + +type Entry struct { + Timestamp time.Time + Line string +} + +func (e *Entry) UnmarshalJSON(b []byte) error { + var values []string + err := json.Unmarshal(b, &values) + if err != nil { + return err + } + t, err := strconv.Atoi(values[0]) + if err != nil { + return err + } + e.Timestamp = time.Unix(int64(t), 0) + e.Line = values[1] + return nil +} + +type Stream struct { + Stream map[string]string `json:"stream"` + Entries []Entry `json:"values"` +} + +type DroppedEntry struct { + Labels map[string]string `json:"labels"` + Timestamp time.Time `json:"timestamp"` +} + +type Tail struct { + Streams []Stream `json:"streams"` + DroppedEntries []DroppedEntry `json:"dropped_entries"` +} + +// LokiQuery GET response. +// See https://grafana.com/docs/loki/latest/api/#get-lokiapiv1query +type LokiQuery struct { + Status string `json:"status"` + Data Data `json:"data"` +} + +type Data struct { + ResultType string `json:"resultType"` + Result []StreamResult `json:"result"` // Warning, just stream value is handled + Stats interface{} `json:"stats"` // Stats is boring, just ignore it +} + +type StreamResult struct { + Stream map[string]string `json:"stream"` + Values []Entry `json:"values"` +} diff --git a/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go b/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go new file mode 100644 index 000000000..8451a86fc --- /dev/null +++ b/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go @@ -0,0 +1,315 @@ +package lokiclient + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + "time" + + "github.com/crowdsecurity/crowdsec/pkg/cwversion" + "github.com/gorilla/websocket" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "gopkg.in/tomb.v2" +) + +type LokiClient struct { + Logger *log.Entry + + config Config + t *tomb.Tomb + fail_start time.Time + currentTickerInterval time.Duration +} + +type Config struct { + LokiURL string + LokiPrefix string + Query string + Headers map[string]string + + Username string + Password string + + Since time.Duration + Until time.Duration + + FailMaxDuration time.Duration + + DelayFor int + Limit int +} + +func updateURI(uri string, lq LokiQueryRangeResponse, infinite bool) string { + u, _ := url.Parse(uri) + queryParams := u.Query() + + if len(lq.Data.Result) > 0 { + lastTs := lq.Data.Result[0].Entries[len(lq.Data.Result[0].Entries)-1].Timestamp + // +1 the last timestamp to avoid getting the same result again. + queryParams.Set("start", strconv.Itoa(int(lastTs.UnixNano()+1))) + } + + if infinite { + queryParams.Set("end", strconv.Itoa(int(time.Now().UnixNano()))) + } + + u.RawQuery = queryParams.Encode() + return u.String() +} + +func (lc *LokiClient) SetTomb(t *tomb.Tomb) { + lc.t = t +} + +func (lc *LokiClient) resetFailStart() { + if !lc.fail_start.IsZero() { + log.Infof("loki is back after %s", time.Since(lc.fail_start)) + } + lc.fail_start = time.Time{} +} +func (lc *LokiClient) shouldRetry() bool { + if lc.fail_start.IsZero() { + lc.Logger.Warningf("loki is not available, will retry for %s", lc.config.FailMaxDuration) + lc.fail_start = time.Now() + return true + } + if time.Since(lc.fail_start) > lc.config.FailMaxDuration { + lc.Logger.Errorf("loki didn't manage to recover after %s, giving up", lc.config.FailMaxDuration) + return false + } + return true +} + +func (lc *LokiClient) increaseTicker(ticker *time.Ticker) { + maxTicker := 10 * time.Second + if lc.currentTickerInterval < maxTicker { + lc.currentTickerInterval *= 2 + if lc.currentTickerInterval > maxTicker { + lc.currentTickerInterval = maxTicker + } + ticker.Reset(lc.currentTickerInterval) + } +} + +func (lc *LokiClient) decreaseTicker(ticker *time.Ticker) { + minTicker := 100 * time.Millisecond + if lc.currentTickerInterval != minTicker { + lc.currentTickerInterval = minTicker + ticker.Reset(lc.currentTickerInterval) + } +} + +func (lc *LokiClient) queryRange(uri string, ctx context.Context, c chan *LokiQueryRangeResponse, infinite bool) error { + lc.currentTickerInterval = 100 * time.Millisecond + ticker := time.NewTicker(lc.currentTickerInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-lc.t.Dying(): + return lc.t.Err() + case <-ticker.C: + resp, err := http.Get(uri) + if err != nil { + if ok := lc.shouldRetry(); !ok { + return errors.Wrapf(err, "error querying range") + } else { + lc.increaseTicker(ticker) + continue + } + } + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + if ok := lc.shouldRetry(); !ok { + return errors.Wrapf(err, "bad HTTP response code: %d: %s", resp.StatusCode, string(body)) + } else { + lc.increaseTicker(ticker) + continue + } + } + + var lq LokiQueryRangeResponse + if err := json.NewDecoder(resp.Body).Decode(&lq); err != nil { + resp.Body.Close() + if ok := lc.shouldRetry(); !ok { + return errors.Wrapf(err, "error decoding Loki response") + } else { + lc.increaseTicker(ticker) + continue + } + } + resp.Body.Close() + lc.Logger.Tracef("Got response: %+v", lq) + c <- &lq + lc.resetFailStart() + if !infinite && (len(lq.Data.Result) == 0 || len(lq.Data.Result[0].Entries) < lc.config.Limit) { + lc.Logger.Infof("Got less than %d results (%d), stopping", lc.config.Limit, len(lq.Data.Result)) + close(c) + return nil + } + if len(lq.Data.Result) > 0 { + lc.Logger.Debugf("(timer:%v) %d results / %d entries result[0] (uri:%s)", lc.currentTickerInterval, len(lq.Data.Result), len(lq.Data.Result[0].Entries), uri) + } else { + lc.Logger.Debugf("(timer:%v) no results (uri:%s)", lc.currentTickerInterval, uri) + } + if infinite { + if len(lq.Data.Result) > 0 { //as long as we get results, we keep lowest ticker + lc.decreaseTicker(ticker) + } else { + lc.increaseTicker(ticker) + } + } + + uri = updateURI(uri, lq, infinite) + } + } +} + +func (lc *LokiClient) getURLFor(endpoint string, params map[string]string) string { + u, err := url.Parse(lc.config.LokiURL) + if err != nil { + return "" + } + queryParams := u.Query() + for k, v := range params { + queryParams.Set(k, v) + } + u.RawQuery = queryParams.Encode() + + u.Path, err = url.JoinPath(lc.config.LokiPrefix, u.Path, endpoint) + + if err != nil { + return "" + } + + if endpoint == "loki/api/v1/tail" { + if u.Scheme == "http" { + u.Scheme = "ws" + } else { + u.Scheme = "wss" + } + } + + return u.String() +} + +func (lc *LokiClient) Ready(ctx context.Context) error { + tick := time.NewTicker(500 * time.Millisecond) + url := lc.getURLFor("ready", nil) + for { + select { + case <-ctx.Done(): + tick.Stop() + return ctx.Err() + case <-lc.t.Dying(): + tick.Stop() + return lc.t.Err() + case <-tick.C: + lc.Logger.Debug("Checking if Loki is ready") + resp, err := http.Get(url) + if err != nil { + lc.Logger.Warnf("Error checking if Loki is ready: %s", err) + continue + } + _ = resp.Body.Close() + if resp.StatusCode != http.StatusOK { + lc.Logger.Debugf("Loki is not ready, status code: %d", resp.StatusCode) + continue + } + lc.Logger.Info("Loki is ready") + return nil + } + } +} + +func (lc *LokiClient) Tail(ctx context.Context) (chan *LokiResponse, error) { + responseChan := make(chan *LokiResponse) + dialer := &websocket.Dialer{} + u := lc.getURLFor("loki/api/v1/tail", map[string]string{ + "limit": strconv.Itoa(lc.config.Limit), + "start": strconv.Itoa(int(time.Now().Add(-lc.config.Since).UnixNano())), + "query": lc.config.Query, + "delay_for": strconv.Itoa(lc.config.DelayFor), + }) + + lc.Logger.Debugf("Since: %s (%s)", lc.config.Since, time.Now().Add(-lc.config.Since)) + + if lc.config.Username != "" || lc.config.Password != "" { + dialer.Proxy = func(req *http.Request) (*url.URL, error) { + req.SetBasicAuth(lc.config.Username, lc.config.Password) + return nil, nil + } + } + + requestHeader := http.Header{} + for k, v := range lc.config.Headers { + requestHeader.Add(k, v) + } + requestHeader.Set("User-Agent", "Crowdsec "+cwversion.VersionStr()) + lc.Logger.Infof("Connecting to %s", u) + conn, _, err := dialer.Dial(u, requestHeader) + + if err != nil { + lc.Logger.Errorf("Error connecting to websocket, err: %s", err) + return responseChan, fmt.Errorf("error connecting to websocket") + } + + lc.t.Go(func() error { + for { + jsonResponse := &LokiResponse{} + err = conn.ReadJSON(jsonResponse) + + if err != nil { + lc.Logger.Errorf("Error reading from websocket: %s", err) + return fmt.Errorf("websocket error: %w", err) + } + + responseChan <- jsonResponse + } + }) + + return responseChan, nil +} + +func (lc *LokiClient) QueryRange(ctx context.Context, infinite bool) chan *LokiQueryRangeResponse { + url := lc.getURLFor("loki/api/v1/query_range", map[string]string{ + "query": lc.config.Query, + "start": strconv.Itoa(int(time.Now().Add(-lc.config.Since).UnixNano())), + "end": strconv.Itoa(int(time.Now().UnixNano())), + "limit": strconv.Itoa(lc.config.Limit), + "direction": "forward", + }) + + c := make(chan *LokiQueryRangeResponse) + + lc.Logger.Debugf("Since: %s (%s)", lc.config.Since, time.Now().Add(-lc.config.Since)) + + requestHeader := http.Header{} + for k, v := range lc.config.Headers { + requestHeader.Add(k, v) + } + + if lc.config.Username != "" || lc.config.Password != "" { + requestHeader.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(lc.config.Username+":"+lc.config.Password))) + } + + requestHeader.Set("User-Agent", "Crowdsec "+cwversion.VersionStr()) + lc.Logger.Infof("Connecting to %s", url) + lc.t.Go(func() error { + return lc.queryRange(url, ctx, c, infinite) + }) + return c +} + +func NewLokiClient(config Config) *LokiClient { + return &LokiClient{Logger: log.WithField("component", "lokiclient"), config: config} +} diff --git a/pkg/acquisition/modules/loki/internal/lokiclient/types.go b/pkg/acquisition/modules/loki/internal/lokiclient/types.go new file mode 100644 index 000000000..d5aed2044 --- /dev/null +++ b/pkg/acquisition/modules/loki/internal/lokiclient/types.go @@ -0,0 +1,55 @@ +package lokiclient + +import ( + "encoding/json" + "strconv" + "time" +) + +type Entry struct { + Timestamp time.Time + Line string +} + +func (e *Entry) UnmarshalJSON(b []byte) error { + var values []string + err := json.Unmarshal(b, &values) + if err != nil { + return err + } + t, err := strconv.Atoi(values[0]) + if err != nil { + return err + } + e.Timestamp = time.Unix(0, int64(t)) + e.Line = values[1] + return nil +} + +type Stream struct { + Stream map[string]string `json:"stream"` + Entries []Entry `json:"values"` +} + +type DroppedEntry struct { + Labels map[string]string `json:"labels"` + Timestamp time.Time `json:"timestamp"` +} + +type LokiResponse struct { + Streams []Stream `json:"streams"` + DroppedEntries []interface{} `json:"dropped_entries"` //We don't care about the actual content i think ? +} + +// LokiQuery GET response. +// See https://grafana.com/docs/loki/latest/api/#get-lokiapiv1query +type LokiQueryRangeResponse struct { + Status string `json:"status"` + Data Data `json:"data"` +} + +type Data struct { + ResultType string `json:"resultType"` + Result []Stream `json:"result"` // Warning, just stream value is handled + Stats interface{} `json:"stats"` // Stats is boring, just ignore it +} diff --git a/pkg/acquisition/modules/loki/loki.go b/pkg/acquisition/modules/loki/loki.go new file mode 100644 index 000000000..555deefe2 --- /dev/null +++ b/pkg/acquisition/modules/loki/loki.go @@ -0,0 +1,370 @@ +package loki + +/* +https://grafana.com/docs/loki/latest/api/#get-lokiapiv1tail +*/ + +import ( + "context" + "fmt" + "net/url" + "strconv" + "strings" + "time" + + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" + tomb "gopkg.in/tomb.v2" + yaml "gopkg.in/yaml.v2" + + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" + lokiclient "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/loki/internal/lokiclient" + "github.com/crowdsecurity/crowdsec/pkg/types" +) + +const ( + readyTimeout time.Duration = 3 * time.Second + readyLoop int = 3 + readySleep time.Duration = 10 * time.Second + lokiLimit int = 100 +) + +var linesRead = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "cs_lokisource_hits_total", + Help: "Total lines that were read.", + }, + []string{"source"}) + +type LokiAuthConfiguration struct { + Username string `yaml:"username"` + Password string `yaml:"password"` +} + +type LokiConfiguration struct { + URL string `yaml:"url"` // Loki url + Prefix string `yaml:"prefix"` // Loki prefix + Query string `yaml:"query"` // LogQL query + Limit int `yaml:"limit"` // Limit of logs to read + DelayFor time.Duration `yaml:"delay_for"` + Since time.Duration `yaml:"since"` + Headers map[string]string `yaml:"headers"` // HTTP headers for talking to Loki + WaitForReady time.Duration `yaml:"wait_for_ready"` // Retry interval, default is 10 seconds + Auth LokiAuthConfiguration `yaml:"auth"` + MaxFailureDuration time.Duration `yaml:"max_failure_duration"` // Max duration of failure before stopping the source + configuration.DataSourceCommonCfg `yaml:",inline"` +} + +type LokiSource struct { + Config LokiConfiguration + + Client *lokiclient.LokiClient + + logger *log.Entry + lokiWebsocket string +} + +func (l *LokiSource) GetMetrics() []prometheus.Collector { + return []prometheus.Collector{linesRead} +} + +func (l *LokiSource) GetAggregMetrics() []prometheus.Collector { + return []prometheus.Collector{linesRead} +} + +func (l *LokiSource) UnmarshalConfig(yamlConfig []byte) error { + err := yaml.UnmarshalStrict(yamlConfig, &l.Config) + if err != nil { + return fmt.Errorf("cannot parse loki acquisition configuration: %w", err) + } + + if l.Config.Query == "" { + return errors.New("loki query is mandatory") + } + + if l.Config.WaitForReady == 0 { + l.Config.WaitForReady = 10 * time.Second + } + + if l.Config.DelayFor < 0*time.Second || l.Config.DelayFor > 5*time.Second { + return errors.New("delay_for should be a value between 1s and 5s") + } + + if l.Config.Mode == "" { + l.Config.Mode = configuration.TAIL_MODE + } + if l.Config.Prefix == "" { + l.Config.Prefix = "/" + } + + if !strings.HasSuffix(l.Config.Prefix, "/") { + l.Config.Prefix += "/" + } + + if l.Config.Limit == 0 { + l.Config.Limit = lokiLimit + } + + if l.Config.Mode == configuration.TAIL_MODE { + l.logger.Infof("Resetting since") + l.Config.Since = 0 + } + + if l.Config.MaxFailureDuration == 0 { + l.Config.MaxFailureDuration = 30 * time.Second + } + + return nil +} + +func (l *LokiSource) Configure(config []byte, logger *log.Entry) error { + l.Config = LokiConfiguration{} + l.logger = logger + err := l.UnmarshalConfig(config) + if err != nil { + return err + } + + l.logger.Infof("Since value: %s", l.Config.Since.String()) + + clientConfig := lokiclient.Config{ + LokiURL: l.Config.URL, + Headers: l.Config.Headers, + Limit: l.Config.Limit, + Query: l.Config.Query, + Since: l.Config.Since, + Username: l.Config.Auth.Username, + Password: l.Config.Auth.Password, + FailMaxDuration: l.Config.MaxFailureDuration, + } + + l.Client = lokiclient.NewLokiClient(clientConfig) + l.Client.Logger = logger.WithFields(log.Fields{"component": "lokiclient", "source": l.Config.URL}) + return nil +} + +func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error { + l.logger = logger + l.Config = LokiConfiguration{} + l.Config.Mode = configuration.CAT_MODE + l.Config.Labels = labels + l.Config.UniqueId = uuid + + u, err := url.Parse(dsn) + if err != nil { + return fmt.Errorf("while parsing dsn '%s': %w", dsn, err) + } + if u.Scheme != "loki" { + return fmt.Errorf("invalid DSN %s for loki source, must start with loki://", dsn) + } + if u.Host == "" { + return errors.New("empty loki host") + } + scheme := "http" + + params := u.Query() + if q := params.Get("ssl"); q != "" { + scheme = "https" + } + if q := params.Get("query"); q != "" { + l.Config.Query = q + } + if w := params.Get("wait_for_ready"); w != "" { + l.Config.WaitForReady, err = time.ParseDuration(w) + if err != nil { + return err + } + } else { + l.Config.WaitForReady = 10 * time.Second + } + + if d := params.Get("delay_for"); d != "" { + l.Config.DelayFor, err = time.ParseDuration(d) + if err != nil { + return fmt.Errorf("invalid duration: %w", err) + } + if l.Config.DelayFor < 0*time.Second || l.Config.DelayFor > 5*time.Second { + return errors.New("delay_for should be a value between 1s and 5s") + } + } else { + l.Config.DelayFor = 0 * time.Second + } + + if s := params.Get("since"); s != "" { + l.Config.Since, err = time.ParseDuration(s) + if err != nil { + return fmt.Errorf("invalid since in dsn: %w", err) + } + } + + if max_failure_duration := params.Get("max_failure_duration"); max_failure_duration != "" { + duration, err := time.ParseDuration(max_failure_duration) + if err != nil { + return fmt.Errorf("invalid max_failure_duration in dsn: %w", err) + } + l.Config.MaxFailureDuration = duration + } else { + l.Config.MaxFailureDuration = 5 * time.Second // for OneShot mode it doesn't make sense to have longer duration + } + + if limit := params.Get("limit"); limit != "" { + limit, err := strconv.Atoi(limit) + if err != nil { + return fmt.Errorf("invalid limit in dsn: %w", err) + } + l.Config.Limit = limit + } else { + l.Config.Limit = 5000 // max limit allowed by loki + } + + if logLevel := params.Get("log_level"); logLevel != "" { + level, err := log.ParseLevel(logLevel) + if err != nil { + return fmt.Errorf("invalid log_level in dsn: %w", err) + } + l.Config.LogLevel = &level + l.logger.Logger.SetLevel(level) + } + + l.Config.URL = fmt.Sprintf("%s://%s", scheme, u.Host) + if u.User != nil { + l.Config.Auth.Username = u.User.Username() + l.Config.Auth.Password, _ = u.User.Password() + } + + clientConfig := lokiclient.Config{ + LokiURL: l.Config.URL, + Headers: l.Config.Headers, + Limit: l.Config.Limit, + Query: l.Config.Query, + Since: l.Config.Since, + Username: l.Config.Auth.Username, + Password: l.Config.Auth.Password, + DelayFor: int(l.Config.DelayFor / time.Second), + } + + l.Client = lokiclient.NewLokiClient(clientConfig) + l.Client.Logger = logger.WithFields(log.Fields{"component": "lokiclient", "source": l.Config.URL}) + + return nil +} + +func (l *LokiSource) GetMode() string { + return l.Config.Mode +} + +func (l *LokiSource) GetName() string { + return "loki" +} + +// OneShotAcquisition reads a set of file and returns when done +func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { + l.logger.Debug("Loki one shot acquisition") + l.Client.SetTomb(t) + readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady) + defer cancel() + err := l.Client.Ready(readyCtx) + if err != nil { + return fmt.Errorf("loki is not ready: %w", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + c := l.Client.QueryRange(ctx, false) + + for { + select { + case <-t.Dying(): + l.logger.Debug("Loki one shot acquisition stopped") + cancel() + return nil + case resp, ok := <-c: + if !ok { + l.logger.Info("Loki acquisition done, chan closed") + cancel() + return nil + } + for _, stream := range resp.Data.Result { + for _, entry := range stream.Entries { + l.readOneEntry(entry, l.Config.Labels, out) + } + } + } + } +} + +func (l *LokiSource) readOneEntry(entry lokiclient.Entry, labels map[string]string, out chan types.Event) { + ll := types.Line{} + ll.Raw = entry.Line + ll.Time = entry.Timestamp + ll.Src = l.Config.URL + ll.Labels = labels + ll.Process = true + ll.Module = l.GetName() + + linesRead.With(prometheus.Labels{"source": l.Config.URL}).Inc() + expectMode := types.LIVE + if l.Config.UseTimeMachine { + expectMode = types.TIMEMACHINE + } + out <- types.Event{ + Line: ll, + Process: true, + Type: types.LOG, + ExpectMode: expectMode, + } +} + +func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error { + l.Client.SetTomb(t) + readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady) + defer cancel() + err := l.Client.Ready(readyCtx) + if err != nil { + return fmt.Errorf("loki is not ready: %w", err) + } + ll := l.logger.WithField("websocket_url", l.lokiWebsocket) + t.Go(func() error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + respChan := l.Client.QueryRange(ctx, true) + if err != nil { + ll.Errorf("could not start loki tail: %s", err) + return fmt.Errorf("while starting loki tail: %w", err) + } + for { + select { + case resp, ok := <-respChan: + if !ok { + ll.Warnf("loki channel closed") + return err + } + for _, stream := range resp.Data.Result { + for _, entry := range stream.Entries { + l.readOneEntry(entry, l.Config.Labels, out) + } + } + case <-t.Dying(): + return nil + } + } + }) + return nil +} + +func (l *LokiSource) CanRun() error { + return nil +} + +func (l *LokiSource) GetUuid() string { + return l.Config.UniqueId +} + +func (l *LokiSource) Dump() interface{} { + return l +} + +// SupportedModes returns the supported modes by the acquisition module +func (l *LokiSource) SupportedModes() []string { + return []string{configuration.TAIL_MODE, configuration.CAT_MODE} +} diff --git a/pkg/acquisition/modules/loki/loki_test.go b/pkg/acquisition/modules/loki/loki_test.go new file mode 100644 index 000000000..fae2e3aa9 --- /dev/null +++ b/pkg/acquisition/modules/loki/loki_test.go @@ -0,0 +1,512 @@ +package loki_test + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "os" + "runtime" + "strings" + "testing" + "time" + + "context" + + "github.com/crowdsecurity/go-cs-lib/cstest" + + "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/loki" + "github.com/crowdsecurity/crowdsec/pkg/types" + log "github.com/sirupsen/logrus" + tomb "gopkg.in/tomb.v2" + "gotest.tools/v3/assert" +) + +func TestConfiguration(t *testing.T) { + + log.Infof("Test 'TestConfigure'") + + tests := []struct { + config string + expectedErr string + password string + waitForReady time.Duration + delayFor time.Duration + testName string + }{ + { + config: `foobar: asd`, + expectedErr: "line 1: field foobar not found in type loki.LokiConfiguration", + testName: "Unknown field", + }, + { + config: ` +mode: tail +source: loki`, + expectedErr: "loki query is mandatory", + testName: "Missing url", + }, + { + config: ` +mode: tail +source: loki +url: http://localhost:3100/ +`, + expectedErr: "loki query is mandatory", + testName: "Missing query", + }, + { + config: ` +mode: tail +source: loki +url: http://localhost:3100/ +query: > + {server="demo"} +`, + expectedErr: "", + testName: "Correct config", + }, + { + config: ` +mode: tail +source: loki +url: http://localhost:3100/ +wait_for_ready: 5s +query: > + {server="demo"} +`, + expectedErr: "", + testName: "Correct config with wait_for_ready", + waitForReady: 5 * time.Second, + }, + { + config: ` +mode: tail +source: loki +url: http://localhost:3100/ +delay_for: 1s +query: > + {server="demo"} +`, + expectedErr: "", + testName: "Correct config with delay_for", + delayFor: 1 * time.Second, + }, + { + + config: ` +mode: tail +source: loki +url: http://localhost:3100/ +auth: + username: foo + password: bar +query: > + {server="demo"} +`, + expectedErr: "", + password: "bar", + testName: "Correct config with password", + }, + { + + config: ` +mode: tail +source: loki +url: http://localhost:3100/ +delay_for: 10s +query: > + {server="demo"} +`, + expectedErr: "delay_for should be a value between 1s and 5s", + testName: "Invalid DelayFor", + }, + } + subLogger := log.WithFields(log.Fields{ + "type": "loki", + }) + for _, test := range tests { + t.Run(test.testName, func(t *testing.T) { + lokiSource := loki.LokiSource{} + err := lokiSource.Configure([]byte(test.config), subLogger) + cstest.AssertErrorContains(t, err, test.expectedErr) + if test.password != "" { + p := lokiSource.Config.Auth.Password + if test.password != p { + t.Fatalf("Password mismatch : %s != %s", test.password, p) + } + } + if test.waitForReady != 0 { + if lokiSource.Config.WaitForReady != test.waitForReady { + t.Fatalf("Wrong WaitForReady %v != %v", lokiSource.Config.WaitForReady, test.waitForReady) + } + } + if test.delayFor != 0 { + if lokiSource.Config.DelayFor != test.delayFor { + t.Fatalf("Wrong DelayFor %v != %v", lokiSource.Config.DelayFor, test.delayFor) + } + } + }) + } +} + +func TestConfigureDSN(t *testing.T) { + log.Infof("Test 'TestConfigureDSN'") + tests := []struct { + name string + dsn string + expectedErr string + since time.Time + password string + scheme string + waitForReady time.Duration + delayFor time.Duration + }{ + { + name: "Wrong scheme", + dsn: "wrong://", + expectedErr: "invalid DSN wrong:// for loki source, must start with loki://", + }, + { + name: "Correct DSN", + dsn: `loki://localhost:3100/?query={server="demo"}`, + expectedErr: "", + }, + { + name: "Empty host", + dsn: "loki://", + expectedErr: "empty loki host", + }, + { + name: "Invalid DSN", + dsn: "loki", + expectedErr: "invalid DSN loki for loki source, must start with loki://", + }, + { + name: "Invalid Delay", + dsn: `loki://localhost:3100/?query={server="demo"}&delay_for=10s`, + expectedErr: "delay_for should be a value between 1s and 5s", + }, + { + name: "Bad since param", + dsn: `loki://127.0.0.1:3100/?since=3h&query={server="demo"}`, + since: time.Now().Add(-3 * time.Hour), + }, + { + name: "Basic Auth", + dsn: `loki://login:password@localhost:3102/?query={server="demo"}`, + password: "password", + }, + { + name: "Correct DSN", + dsn: `loki://localhost:3100/?query={server="demo"}&wait_for_ready=5s&delay_for=1s`, + expectedErr: "", + waitForReady: 5 * time.Second, + delayFor: 1 * time.Second, + }, + { + name: "SSL DSN", + dsn: `loki://localhost:3100/?ssl=true`, + scheme: "https", + }, + } + + for _, test := range tests { + subLogger := log.WithFields(log.Fields{ + "type": "loki", + "name": test.name, + }) + t.Logf("Test : %s", test.name) + lokiSource := &loki.LokiSource{} + err := lokiSource.ConfigureByDSN(test.dsn, map[string]string{"type": "testtype"}, subLogger, "") + cstest.AssertErrorContains(t, err, test.expectedErr) + + noDuration, _ := time.ParseDuration("0s") + if lokiSource.Config.Since != noDuration && lokiSource.Config.Since.Round(time.Second) != time.Since(test.since).Round(time.Second) { + t.Fatalf("Invalid since %v", lokiSource.Config.Since) + } + + if test.password != "" { + p := lokiSource.Config.Auth.Password + if test.password != p { + t.Fatalf("Password mismatch : %s != %s", test.password, p) + } + } + if test.scheme != "" { + url, _ := url.Parse(lokiSource.Config.URL) + if test.scheme != url.Scheme { + t.Fatalf("Schema mismatch : %s != %s", test.scheme, url.Scheme) + } + } + if test.waitForReady != 0 { + if lokiSource.Config.WaitForReady != test.waitForReady { + t.Fatalf("Wrong WaitForReady %v != %v", lokiSource.Config.WaitForReady, test.waitForReady) + } + } + if test.delayFor != 0 { + if lokiSource.Config.DelayFor != test.delayFor { + t.Fatalf("Wrong DelayFor %v != %v", lokiSource.Config.DelayFor, test.delayFor) + } + } + } +} + +func feedLoki(logger *log.Entry, n int, title string) error { + streams := LogStreams{ + Streams: []LogStream{ + { + Stream: map[string]string{ + "server": "demo", + "domain": "cw.example.com", + "key": title, + }, + Values: make([]LogValue, n), + }, + }, + } + for i := 0; i < n; i++ { + streams.Streams[0].Values[i] = LogValue{ + Time: time.Now(), + Line: fmt.Sprintf("Log line #%d %v", i, title), + } + } + buff, err := json.Marshal(streams) + if err != nil { + return err + } + resp, err := http.Post("http://127.0.0.1:3100/loki/api/v1/push", "application/json", bytes.NewBuffer(buff)) + if err != nil { + return err + } + if resp.StatusCode != http.StatusNoContent { + b, _ := io.ReadAll(resp.Body) + logger.Error(string(b)) + return fmt.Errorf("Bad post status %d", resp.StatusCode) + } + logger.Info(n, " Events sent") + return nil +} + +func TestOneShotAcquisition(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Skipping test on windows") + } + log.SetOutput(os.Stdout) + log.SetLevel(log.InfoLevel) + log.Info("Test 'TestStreamingAcquisition'") + title := time.Now().String() // Loki will be messy, with a lot of stuff, lets use a unique key + tests := []struct { + config string + }{ + { + config: fmt.Sprintf(` +mode: cat +source: loki +url: http://127.0.0.1:3100 +query: '{server="demo",key="%s"}' +since: 1h +`, title), + }, + } + + for _, ts := range tests { + logger := log.New() + subLogger := logger.WithFields(log.Fields{ + "type": "loki", + }) + lokiSource := loki.LokiSource{} + err := lokiSource.Configure([]byte(ts.config), subLogger) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + + err = feedLoki(subLogger, 20, title) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + + out := make(chan types.Event) + read := 0 + go func() { + for { + <-out + read++ + } + }() + lokiTomb := tomb.Tomb{} + err = lokiSource.OneShotAcquisition(out, &lokiTomb) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + assert.Equal(t, 20, read) + + } +} + +func TestStreamingAcquisition(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Skipping test on windows") + } + log.SetOutput(os.Stdout) + log.SetLevel(log.InfoLevel) + log.Info("Test 'TestStreamingAcquisition'") + title := time.Now().String() + tests := []struct { + name string + config string + expectedErr string + streamErr string + expectedLines int + }{ + { + name: "Bad port", + config: ` +mode: tail +source: loki +url: http://127.0.0.1:3101 +query: > + {server="demo"} +`, // No Loki server here + expectedErr: "", + streamErr: `loki is not ready: context deadline exceeded`, + expectedLines: 0, + }, + { + name: "ok", + config: ` +mode: tail +source: loki +url: http://127.0.0.1:3100 +query: > + {server="demo"} +`, + expectedErr: "", + streamErr: "", + expectedLines: 20, + }, + } + for _, ts := range tests { + t.Run(ts.name, func(t *testing.T) { + logger := log.New() + subLogger := logger.WithFields(log.Fields{ + "type": "loki", + "name": ts.name, + }) + + out := make(chan types.Event) + lokiTomb := tomb.Tomb{} + lokiSource := loki.LokiSource{} + err := lokiSource.Configure([]byte(ts.config), subLogger) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + err = lokiSource.StreamingAcquisition(out, &lokiTomb) + cstest.AssertErrorContains(t, err, ts.streamErr) + + if ts.streamErr != "" { + return + } + + time.Sleep(time.Second * 2) //We need to give time to start reading from the WS + readTomb := tomb.Tomb{} + readCtx, cancel := context.WithTimeout(context.Background(), time.Second*10) + count := 0 + + readTomb.Go(func() error { + defer cancel() + for { + select { + case <-readCtx.Done(): + return readCtx.Err() + case evt := <-out: + count++ + if !strings.HasSuffix(evt.Line.Raw, title) { + return fmt.Errorf("Incorrect suffix : %s", evt.Line.Raw) + } + if count == ts.expectedLines { + return nil + } + } + } + }) + + err = feedLoki(subLogger, ts.expectedLines, title) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + + err = readTomb.Wait() + cancel() + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + assert.Equal(t, count, ts.expectedLines) + }) + } + +} + +func TestStopStreaming(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Skipping test on windows") + } + config := ` +mode: tail +source: loki +url: http://127.0.0.1:3100 +query: > + {server="demo"} +` + logger := log.New() + subLogger := logger.WithFields(log.Fields{ + "type": "loki", + }) + title := time.Now().String() + lokiSource := loki.LokiSource{} + err := lokiSource.Configure([]byte(config), subLogger) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + out := make(chan types.Event) + + lokiTomb := &tomb.Tomb{} + err = lokiSource.StreamingAcquisition(out, lokiTomb) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + time.Sleep(time.Second * 2) + err = feedLoki(subLogger, 1, title) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + + lokiTomb.Kill(nil) + err = lokiTomb.Wait() + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } +} + +type LogStreams struct { + Streams []LogStream `json:"streams"` +} + +type LogStream struct { + Stream map[string]string `json:"stream"` + Values []LogValue `json:"values"` +} + +type LogValue struct { + Time time.Time + Line string +} + +func (l *LogValue) MarshalJSON() ([]byte, error) { + line, err := json.Marshal(l.Line) + if err != nil { + return nil, err + } + return []byte(fmt.Sprintf(`["%d",%s]`, l.Time.UnixNano(), string(line))), nil +} diff --git a/pkg/acquisition/modules/loki/timestamp.go b/pkg/acquisition/modules/loki/timestamp.go new file mode 100644 index 000000000..f1ec246ea --- /dev/null +++ b/pkg/acquisition/modules/loki/timestamp.go @@ -0,0 +1,29 @@ +package loki + +import ( + "fmt" + "time" +) + +type timestamp time.Time + +func (t *timestamp) UnmarshalYAML(unmarshal func(interface{}) error) error { + var tt time.Time + err := unmarshal(&tt) + if err == nil { + *t = timestamp(tt) + return nil + } + var d time.Duration + err = unmarshal(&d) + if err == nil { + *t = timestamp(time.Now().Add(-d)) + fmt.Println("t", time.Time(*t).Format(time.RFC3339)) + return nil + } + return err +} + +func (t *timestamp) IsZero() bool { + return time.Time(*t).IsZero() +} diff --git a/pkg/acquisition/modules/loki/timestamp_test.go b/pkg/acquisition/modules/loki/timestamp_test.go new file mode 100644 index 000000000..a583cc057 --- /dev/null +++ b/pkg/acquisition/modules/loki/timestamp_test.go @@ -0,0 +1,47 @@ +package loki + +import ( + "testing" + "time" + + "gopkg.in/yaml.v2" +) + +func TestTimestampFail(t *testing.T) { + var tt timestamp + err := yaml.Unmarshal([]byte("plop"), tt) + if err == nil { + t.Fail() + } +} + +func TestTimestampTime(t *testing.T) { + var tt timestamp + const ts string = "2022-06-14T12:56:39+02:00" + err := yaml.Unmarshal([]byte(ts), &tt) + if err != nil { + t.Error(err) + t.Fail() + } + if ts != time.Time(tt).Format(time.RFC3339) { + t.Fail() + } +} + +func TestTimestampDuration(t *testing.T) { + var tt timestamp + err := yaml.Unmarshal([]byte("3h"), &tt) + if err != nil { + t.Error(err) + t.Fail() + } + d, err := time.ParseDuration("3h") + if err != nil { + t.Error(err) + t.Fail() + } + z := time.Now().Add(-d) + if z.Round(time.Second) != time.Time(tt).Round(time.Second) { + t.Fail() + } +} diff --git a/pkg/setup/detect_test.go b/pkg/setup/detect_test.go index 162df0db2..98a22db0d 100644 --- a/pkg/setup/detect_test.go +++ b/pkg/setup/detect_test.go @@ -983,6 +983,16 @@ func TestDetectDatasourceValidation(t *testing.T) { source: kafka`, expected: setup.Setup{Setup: []setup.ServiceSetup{}}, expectedErr: "invalid datasource for foobar: cannot create a kafka reader with an empty list of broker addresses", + }, { + name: "source loki: required fields", + config: ` + version: 1.0 + detect: + foobar: + datasource: + source: loki`, + expected: setup.Setup{Setup: []setup.ServiceSetup{}}, + expectedErr: "invalid datasource for foobar: loki query is mandatory", }, } diff --git a/test/localstack/docker-compose.yml b/test/localstack/docker-compose.yml index 66a820da3..65dd1ca8e 100644 --- a/test/localstack/docker-compose.yml +++ b/test/localstack/docker-compose.yml @@ -77,3 +77,8 @@ services: interval: 10s retries: 5 timeout: 10s + + loki: + image: grafana/loki:2.8.0 + ports: + - "3100:3100"