diff --git a/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go b/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go new file mode 100644 index 000000000..081cdcdd6 --- /dev/null +++ b/pkg/acquisition/modules/loki/internal/lokiclient/loki_client.go @@ -0,0 +1,252 @@ +package lokiclient + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "path/filepath" + "strconv" + "time" + + "github.com/crowdsecurity/crowdsec/pkg/cwversion" + "github.com/gorilla/websocket" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "gopkg.in/tomb.v2" +) + +type LokiClient struct { + Logger *log.Entry + + config Config + t *tomb.Tomb +} + +type Config struct { + LokiURL string + LokiPrefix string + Query string + Headers map[string]string + + Username string + Password string + + Since time.Duration + Until time.Duration + WaitForReady time.Duration + + Limit int +} + +func (lc *LokiClient) tailLogs(conn *websocket.Conn, c chan *LokiResponse, ctx context.Context) error { + tick := time.NewTicker(100 * time.Millisecond) + + for { + select { + case <-lc.t.Dying(): + lc.Logger.Info("LokiClient tomb is dying, closing connection") + tick.Stop() + return conn.Close() + case <-ctx.Done(): //this is technically useless, as the read from the websocket is blocking :( + lc.Logger.Info("LokiClient context is done, closing connection") + tick.Stop() + return conn.Close() + case <-tick.C: + lc.Logger.Debug("Reading from WS") + jsonResponse := &LokiResponse{} + err := conn.ReadJSON(jsonResponse) + if err != nil { + close(c) + return err + } + lc.Logger.Tracef("Read from WS: %v", jsonResponse) + c <- jsonResponse + lc.Logger.Debug("Sent response to channel") + } + } +} + +func (lc *LokiClient) queryRange(uri string, ctx context.Context, c chan *LokiQueryRangeResponse) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-lc.t.Dying(): + return lc.t.Err() + default: + lc.Logger.Debugf("Querying Loki: %s", uri) + resp, err := http.Get(uri) + + if err != nil { + return errors.Wrapf(err, "error querying range") + } + if resp.StatusCode != 200 { + body, _ := ioutil.ReadAll(resp.Body) + resp.Body.Close() + return errors.Wrapf(err, "bad HTTP response code: %d: %s", resp.StatusCode, string(body)) + } + + var lq LokiQueryRangeResponse + + json.NewDecoder(resp.Body).Decode(&lq) + resp.Body.Close() + + lc.Logger.Tracef("Got response: %+v", lq) + + c <- &lq + + if len(lq.Data.Result) == 0 || len(lq.Data.Result[0].Entries) < lc.config.Limit { + lc.Logger.Infof("Got less than %d results (%d), stopping", lc.config.Limit, len(lq.Data.Result)) + close(c) + return nil + } + //Can we assume we will always have only one stream? + lastTs := lq.Data.Result[0].Entries[len(lq.Data.Result[0].Entries)-1].Timestamp + + lc.Logger.Infof("Got %d results, last timestamp: %s (converted: %d)", len(lq.Data.Result[0].Entries), lastTs, strconv.Itoa(lastTs.Nanosecond())) + u, err := url.Parse(uri) //we can ignore the error, we know it's valid + if err != nil { + return errors.Wrapf(err, "error parsing URL") + } + queryParams := u.Query() + queryParams.Set("start", strconv.Itoa(int(lastTs.UnixNano()))) + u.RawQuery = queryParams.Encode() + uri = u.String() + } + } +} + +func (lc *LokiClient) getURLFor(endpoint string, params map[string]string) string { + u, err := url.Parse(lc.config.LokiURL) + if err != nil { + return "" + } + queryParams := u.Query() + for k, v := range params { + queryParams.Set(k, v) + } + u.RawQuery = queryParams.Encode() + + u.Path = filepath.Join(lc.config.LokiPrefix, u.Path, endpoint) + + switch endpoint { + case "loki/api/v1/tail": + if u.Scheme == "http" { + u.Scheme = "ws" + } else { + u.Scheme = "wss" + } + } + return u.String() +} + +func (lc *LokiClient) Ready(ctx context.Context) error { + tick := time.NewTicker(500 * time.Millisecond) + url := lc.getURLFor("ready", nil) + for { + select { + case <-ctx.Done(): + tick.Stop() + return ctx.Err() + case <-lc.t.Dying(): + tick.Stop() + return lc.t.Err() + case <-tick.C: + lc.Logger.Debug("Checking if Loki is ready") + resp, err := http.Get(url) + if err != nil { + lc.Logger.Warnf("Error checking if Loki is ready: %s", err) + continue + } + _ = resp.Body.Close() + if resp.StatusCode != 200 { + lc.Logger.Debugf("Loki is not ready, status code: %d", resp.StatusCode) + continue + } + lc.Logger.Info("Loki is ready") + return nil + } + } +} + +func (lc *LokiClient) Tail(ctx context.Context) (chan *LokiResponse, error) { + responseChan := make(chan *LokiResponse) + dialer := &websocket.Dialer{} //TODO: TLS support + u := lc.getURLFor("loki/api/v1/tail", map[string]string{ + "limit": strconv.Itoa(lc.config.Limit), + "start": strconv.Itoa(int(time.Now().Add(-lc.config.Since).UnixNano())), + "query": lc.config.Query, + }) + + lc.Logger.Debugf("Since: %s (%s)", lc.config.Since, time.Now().Add(-lc.config.Since)) + + if lc.config.Username != "" || lc.config.Password != "" { + dialer.Proxy = func(req *http.Request) (*url.URL, error) { + req.SetBasicAuth(lc.config.Username, lc.config.Password) + return nil, nil + } + } + + requestHeader := http.Header{} + for k, v := range lc.config.Headers { + requestHeader.Add(k, v) + } + requestHeader.Set("User-Agent", "Crowdsec "+cwversion.Version) + lc.Logger.Infof("Connecting to %s", u) + conn, resp, err := dialer.Dial(u, requestHeader) + defer resp.Body.Close() + if err != nil { + if resp != nil { + buf, err2 := ioutil.ReadAll(resp.Body) + if err2 != nil { + return nil, fmt.Errorf("error reading response body while handling WS error: %s (%s)", err, err2) + } + return nil, fmt.Errorf("error dialing WS: %s: %s", err, string(buf)) + } + return nil, err + } + + lc.t.Go(func() error { + return lc.tailLogs(conn, responseChan, ctx) + }) + + return responseChan, nil +} + +func (lc *LokiClient) QueryRange(ctx context.Context) chan *LokiQueryRangeResponse { + url := lc.getURLFor("loki/api/v1/query_range", map[string]string{ + "query": lc.config.Query, + "start": strconv.Itoa(int(time.Now().Add(-lc.config.Since).UnixNano())), + "end": strconv.Itoa(int(time.Now().UnixNano())), + "limit": strconv.Itoa(lc.config.Limit), + "direction": "forward", + }) + + c := make(chan *LokiQueryRangeResponse) + + lc.Logger.Debugf("Since: %s (%s)", lc.config.Since, time.Now().Add(-lc.config.Since)) + + requestHeader := http.Header{} + for k, v := range lc.config.Headers { + requestHeader.Add(k, v) + } + + if lc.config.Username != "" || lc.config.Password != "" { + requestHeader.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(lc.config.Username+":"+lc.config.Password))) + } + + requestHeader.Set("User-Agent", "Crowdsec "+cwversion.Version) + lc.Logger.Infof("Connecting to %s", url) + lc.t.Go(func() error { + return lc.queryRange(url, ctx, c) + }) + return c +} + +func NewLokiClient(config Config) *LokiClient { + return &LokiClient{t: &tomb.Tomb{}, Logger: log.WithField("component", "lokiclient"), config: config} +} diff --git a/pkg/acquisition/modules/loki/internal/lokiclient/types.go b/pkg/acquisition/modules/loki/internal/lokiclient/types.go new file mode 100644 index 000000000..d5aed2044 --- /dev/null +++ b/pkg/acquisition/modules/loki/internal/lokiclient/types.go @@ -0,0 +1,55 @@ +package lokiclient + +import ( + "encoding/json" + "strconv" + "time" +) + +type Entry struct { + Timestamp time.Time + Line string +} + +func (e *Entry) UnmarshalJSON(b []byte) error { + var values []string + err := json.Unmarshal(b, &values) + if err != nil { + return err + } + t, err := strconv.Atoi(values[0]) + if err != nil { + return err + } + e.Timestamp = time.Unix(0, int64(t)) + e.Line = values[1] + return nil +} + +type Stream struct { + Stream map[string]string `json:"stream"` + Entries []Entry `json:"values"` +} + +type DroppedEntry struct { + Labels map[string]string `json:"labels"` + Timestamp time.Time `json:"timestamp"` +} + +type LokiResponse struct { + Streams []Stream `json:"streams"` + DroppedEntries []interface{} `json:"dropped_entries"` //We don't care about the actual content i think ? +} + +// LokiQuery GET response. +// See https://grafana.com/docs/loki/latest/api/#get-lokiapiv1query +type LokiQueryRangeResponse struct { + Status string `json:"status"` + Data Data `json:"data"` +} + +type Data struct { + ResultType string `json:"resultType"` + Result []Stream `json:"result"` // Warning, just stream value is handled + Stats interface{} `json:"stats"` // Stats is boring, just ignore it +} diff --git a/pkg/acquisition/modules/loki/loki.go b/pkg/acquisition/modules/loki/loki.go index e3f8a049b..60ae2d350 100644 --- a/pkg/acquisition/modules/loki/loki.go +++ b/pkg/acquisition/modules/loki/loki.go @@ -5,24 +5,18 @@ https://grafana.com/docs/loki/latest/api/#get-lokiapiv1tail */ import ( - "bytes" "context" - "encoding/base64" - "encoding/json" "fmt" - "io" - "io/ioutil" - "net" - "net/http" "net/url" + "strconv" + "strings" "time" - "github.com/crowdsecurity/crowdsec/pkg/cwversion" leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" + lokiclient "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/loki/internal/lokiclient" "github.com/crowdsecurity/crowdsec/pkg/types" - "github.com/gorilla/websocket" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" @@ -45,24 +39,26 @@ var linesRead = prometheus.NewCounterVec( []string{"source"}) type LokiConfiguration struct { - URL string `yaml:"url"` // Loki url - Query string `yaml:"query"` // LogQL query + URL string `yaml:"url"` // Loki url + Prefix string `yaml:"prefix"` // Loki prefix + Query string `yaml:"query"` // LogQL query + Limit int `yaml:"limit"` // Limit of logs to read DelayFor time.Duration `yaml:"delay_for"` - Since timestamp `yaml:"since"` - TenantID string `yaml:"tenant_id"` + Since time.Duration `yaml:"since"` Headers map[string]string `yaml:"headers"` // HTTP headers for talking to Loki WaitForReady time.Duration `yaml:"wait_for_ready"` // Retry interval, default is 10 seconds + Username string `yaml:"username"` + Password string `yaml:"password"` configuration.DataSourceCommonCfg `yaml:",inline"` } type LokiSource struct { - Config LokiConfiguration + Config LokiConfiguration + + client *lokiclient.LokiClient + logger *log.Entry lokiWebsocket string - lokiReady string - dialer *websocket.Dialer - header http.Header - auth *url.Userinfo } func (l *LokiSource) GetMetrics() []prometheus.Collector { @@ -80,89 +76,46 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error { if err != nil { return errors.Wrap(err, "Cannot parse LokiAcquisition configuration") } + + if l.Config.Query == "" { + return errors.New("Loki query is mandatory") + } + if l.Config.WaitForReady == 0 { l.Config.WaitForReady = 10 * time.Second } if l.Config.Mode == "" { l.Config.Mode = configuration.TAIL_MODE } - u, err := url.Parse(l.Config.URL) - if err != nil { - return err + if l.Config.Prefix == "" { + l.Config.Prefix = "/" } - if l.Config.Since.IsZero() { - l.Config.Since = timestamp(time.Now()) - } - if u.User != nil { - l.auth = u.User - } - err = l.buildUrl() - if err != nil { - return errors.Wrap(err, "Cannot build Loki url") - } - err = l.prepareConfig() - if err != nil { - return errors.Wrap(err, "Cannot prepare Loki config") - } - return nil -} -func (l *LokiSource) prepareConfig() error { - if l.Config.Query == "" { - return errors.New("Loki query is mandatory") + if !strings.HasSuffix(l.Config.Prefix, "/") { + l.Config.Prefix = l.Config.Prefix + "/" } - l.dialer = &websocket.Dialer{} - l.header = http.Header{} - if l.Config.TenantID != "" { - l.header.Set("X-Scope-OrgID", l.Config.TenantID) - } - l.header.Set("User-Agent", "Crowdsec "+cwversion.Version) - for k, v := range l.Config.Headers { - l.header.Set(k, v) - } - if l.auth != nil { - l.header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(l.auth.String()))) - } - return nil -} -func (l *LokiSource) buildUrl() error { - u, err := url.Parse(l.Config.URL) - if err != nil { - return errors.Wrap(err, "Cannot parse Loki URL : "+l.Config.URL) + if l.Config.Limit == 0 { + l.Config.Limit = lokiLimit } - l.lokiReady = fmt.Sprintf("%s://%s/ready", u.Scheme, u.Host) - buff := bytes.Buffer{} - switch u.Scheme { - case "http": - buff.WriteString("ws") - case "https": - buff.WriteString("wss") - default: - return fmt.Errorf("unknown scheme : %s", u.Scheme) + if l.Config.Mode == configuration.TAIL_MODE { + l.logger.Infof("Resetting since") + l.Config.Since = 0 } - buff.WriteString("://") - buff.WriteString(u.Host) - if u.Path == "" || u.Path == "/" { - buff.WriteString("/loki/api/v1/tail") - } else { - buff.WriteString(u.Path) + + l.logger.Infof("Since value: %s", l.Config.Since.String()) + + clientConfig := lokiclient.Config{ + LokiURL: l.Config.URL, + Headers: l.Config.Headers, + Limit: l.Config.Limit, + Query: l.Config.Query, + Since: l.Config.Since, } - buff.WriteByte('?') - params := url.Values{} - if l.Config.Query != "" { - params.Add("query", l.Config.Query) - } - params.Add("limit", fmt.Sprintf("%d", lokiLimit)) - if l.Config.DelayFor != 0 { - params.Add("delay_for", fmt.Sprintf("%d", int64(l.Config.DelayFor.Seconds()))) - } - start := time.Time(l.Config.Since) - params.Add("start", fmt.Sprintf("%d", start.UnixNano())) - buff.WriteString(params.Encode()) - l.lokiWebsocket = buff.String() - l.logger.Info("Websocket url : ", l.lokiWebsocket) + + l.client = lokiclient.NewLokiClient(clientConfig) + l.client.Logger = logger.WithField("component", "lokiclient") return nil } @@ -182,14 +135,12 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger if u.Host == "" { return errors.New("Empty loki host") } - scheme := "https" + scheme := "http" // FIXME how can use http with container, in a private network? if u.Host == "localhost" || u.Host == "127.0.0.1" || u.Host == "[::1]" { scheme = "http" } - if u.User != nil { - l.auth = u.User - } + l.Config.URL = fmt.Sprintf("%s://%s", scheme, u.Host) params := u.Query() if q := params.Get("query"); q != "" { @@ -211,22 +162,44 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger l.Config.DelayFor = delayFor } if s := params.Get("since"); s != "" { - err = yaml.Unmarshal([]byte(s), &l.Config.Since) + l.Config.Since, err = time.ParseDuration(s) if err != nil { - return errors.Wrap(err, "can't parse since in DSB configuration") + return errors.Wrap(err, "can't parse since in DSN configuration") } } - l.Config.TenantID = params.Get("tenantID") - err = l.buildUrl() - if err != nil { - return errors.Wrap(err, "Cannot build Loki url from DSN") + if limit := params.Get("limit"); limit != "" { + limit, err := strconv.Atoi(limit) + if err != nil { + return errors.Wrap(err, "can't parse limit in DSN configuration") + } + l.Config.Limit = limit + } else { + l.Config.Limit = 5000 // max limit allowed by loki } - err = l.prepareConfig() - if err != nil { - return errors.Wrap(err, "Cannot prepare Loki from DSN") + + if logLevel := params.Get("log_level"); logLevel != "" { + level, err := log.ParseLevel(logLevel) + if err != nil { + return errors.Wrap(err, "can't parse log_level in DSN configuration") + } + l.Config.LogLevel = &level + l.logger.Logger.SetLevel(level) } + clientConfig := lokiclient.Config{ + LokiURL: l.Config.URL, + Headers: l.Config.Headers, + Limit: l.Config.Limit, + Query: l.Config.Query, + Since: l.Config.Since, + Username: l.Config.Username, + Password: l.Config.Password, + } + + l.client = lokiclient.NewLokiClient(clientConfig) + l.client.Logger = logger.WithField("component", "lokiclient") + return nil } @@ -240,81 +213,39 @@ func (l *LokiSource) GetName() string { // OneShotAcquisition reads a set of file and returns when done func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { - err := l.ready() + l.logger.Debug("Loki one shot acquisition") + readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady) + defer cancel() + err := l.client.Ready(readyCtx) if err != nil { - return errors.Wrap(err, "error while getting OneShotAcquisition") + return errors.Wrap(err, "loki is not ready") } - // See https://grafana.com/docs/loki/latest/api/#get-lokiapiv1query_range - params := &url.Values{} - params.Set("query", l.Config.Query) - params.Set("direction", "forward") // FIXME - params.Set("limit", fmt.Sprintf("%d", lokiLimit)) - params.Set("end", time.Now().Format(time.RFC3339)) - start := time.Time(l.Config.Since) - - var lq LokiQuery - defer t.Kill(nil) - defer l.logger.Info("Loki queried") + ctx, cancel := context.WithCancel(context.Background()) + c := l.client.QueryRange(ctx) for { - params.Set("start", start.Format(time.RFC3339)) - url := fmt.Sprintf("%s/loki/api/v1/query_range?%s", - l.Config.URL, - params.Encode()) - logger := l.logger.WithField("url", url) - req, err := http.NewRequest("GET", url, nil) - if err != nil { - logger.WithError(err).Error("Loki NewRequest error") - return errors.Wrap(err, "Loki error while build new request") - } - req.Header = l.header - - resp, err := http.DefaultClient.Do(req) - if err != nil { - logger.WithError(err).Error("http error") - return errors.Wrap(err, "Error while querying loki") - } - defer resp.Body.Close() - if resp.StatusCode != 200 { - msg, _ := io.ReadAll(resp.Body) - logger.WithField("status", resp.StatusCode).WithField("body", string(msg)).Error("loki error") - return fmt.Errorf("Loki query return bad status : %d", resp.StatusCode) - } - decoder := json.NewDecoder(resp.Body) - err = decoder.Decode(&lq) - if err != nil { - return errors.Wrap(err, "can't parse JSON loki response") - } - if len(lq.Data.Result) == 0 { + select { + case <-t.Dying(): + l.logger.Debug("Loki one shot acquisition stopped") + cancel() return nil - } - for _, result := range lq.Data.Result { - if len(result.Values) == 0 { + case resp, ok := <-c: + if !ok { + l.logger.Info("Loki acuiqisition done, chan closed") + cancel() return nil } - start = result.Values[0].Timestamp - logger.WithField("stream", result.Stream).Debug("Results", len(result.Values)) - for _, entry := range result.Values { - l.readOneEntry(entry, result.Stream, out) + for _, stream := range resp.Data.Result { + for _, entry := range stream.Entries { + l.readOneEntry(entry, l.Config.Labels, out) + } } - if len(result.Values) <= lokiLimit { - return nil - } - } - } - return nil -} - -func (l *LokiSource) readOneTail(resp Tail, out chan types.Event) { - for _, stream := range resp.Streams { - for _, entry := range stream.Entries { - l.readOneEntry(entry, stream.Stream, out) } } } -func (l *LokiSource) readOneEntry(entry Entry, labels map[string]string, out chan types.Event) { +func (l *LokiSource) readOneEntry(entry lokiclient.Entry, labels map[string]string, out chan types.Event) { ll := types.Line{} ll.Raw = entry.Line ll.Time = entry.Timestamp @@ -333,58 +264,34 @@ func (l *LokiSource) readOneEntry(entry Entry, labels map[string]string, out cha } func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error { - err := l.ready() + readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady) + defer cancel() + err := l.client.Ready(readyCtx) if err != nil { - return errors.Wrap(err, "error while getting StreamingAcquisition") + return errors.Wrap(err, "loki is not ready") } ll := l.logger.WithField("websocket url", l.lokiWebsocket) t.Go(func() error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + respChan, err := l.client.Tail(ctx) + if err != nil { + ll.Errorf("could not start loki tail: %s", err) + return errors.Wrap(err, "could not start loki tail") + } for { - ctx, cancel := context.WithTimeout(context.TODO(), readyTimeout) - defer cancel() - go func() { - <-t.Dying() - cancel() // close the websocket. - }() - c, res, err := l.dialer.DialContext(ctx, l.lokiWebsocket, l.header) - if err != nil { - if oerr, ok := err.(*net.OpError); ok && oerr.Err.Error() == "operation was canceled" { - // it's ok, the websocket connection is closed by the client, triggered by the tomb, lets stop - return nil + select { + case resp := <-respChan: + if len(resp.DroppedEntries) > 0 { + ll.Warnf("%d entries dropped from loki response", len(resp.DroppedEntries)) } - if res == nil { // no body, it's a network error, not a HTTP error - ll.WithError(err).Error("loki StreamingAcquisition error before HTTP stack") - break - } - buf, err2 := ioutil.ReadAll(res.Body) - if err2 == nil { - ll.WithField("body", string(buf)).WithField("status", res.StatusCode).Error("loki http error") - break - } - ll.WithError(err2).Error("can't read loki http body") - break - } - defer c.Close() - _, reader, err := c.NextReader() - if err != nil { - ll.WithError(err).Error("loki StreamingAcquisition error while reading JSON websocket") - break - } - var resp Tail - decoder := json.NewDecoder(reader) - for { // draining the websocket - if !t.Alive() { // someone want to close this loop - return nil - } - err = decoder.Decode(&resp) - if err != nil { - if err == io.EOF { // the websocket is closed - break + for _, stream := range resp.Streams { + for _, entry := range stream.Entries { + l.readOneEntry(entry, l.Config.Labels, out) } - ll.WithError(err).Error("loki StreamingAcquisition error while parsing JSON websocket") } - l.logger.WithField("type", t).WithField("message", resp).Debug("Message receveid") - l.readOneTail(resp, out) + case <-t.Dying(): + return nil } } }) @@ -399,38 +306,6 @@ func (l *LokiSource) Dump() interface{} { return l } -func (l *LokiSource) ready() error { - client := &http.Client{ - Timeout: l.Config.WaitForReady, - } - - req, err := http.NewRequest("GET", l.lokiReady, nil) - if err != nil { - return err - } - req.Header = l.header - - for i := 0; i < int(l.Config.WaitForReady/time.Second); i++ { - resp, err := client.Do(req) - if err != nil { - return errors.Wrap(err, "Test Loki services for readiness") - } - if resp.StatusCode == 200 { - return nil - } else { - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return errors.Wrap(err, "can't read body while testing Loki readiness") - } - defer resp.Body.Close() - l.logger.WithField("status", resp.StatusCode).WithField("bofy", string(body)).Info("Loki is not ready") - time.Sleep(time.Second) - } - } - - return fmt.Errorf("Loki service %s is not ready", l.lokiReady) -} - //SupportedModes returns the supported modes by the acquisition module func (l *LokiSource) SupportedModes() []string { return []string{configuration.TAIL_MODE, configuration.CAT_MODE} diff --git a/pkg/acquisition/modules/loki/loki_test.go b/pkg/acquisition/modules/loki/loki_test.go index 1ddf063a3..ebc2d4aeb 100644 --- a/pkg/acquisition/modules/loki/loki_test.go +++ b/pkg/acquisition/modules/loki/loki_test.go @@ -1,4 +1,4 @@ -package loki +package loki_test import ( "bytes" @@ -11,10 +11,14 @@ import ( "testing" "time" + "context" + + "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/loki" "github.com/crowdsecurity/crowdsec/pkg/cstest" "github.com/crowdsecurity/crowdsec/pkg/types" log "github.com/sirupsen/logrus" tomb "gopkg.in/tomb.v2" + "gotest.tools/v3/assert" ) func TestConfiguration(t *testing.T) { @@ -26,24 +30,19 @@ func TestConfiguration(t *testing.T) { expectedErr string password string waitForReady time.Duration + testName string }{ { config: `foobar: asd`, expectedErr: "line 1: field foobar not found in type loki.LokiConfiguration", + testName: "Unknown field", }, { config: ` mode: tail source: loki`, - expectedErr: "Cannot build Loki url", - }, - { - config: ` -mode: tail -source: loki -url: stuff://localhost:3100 -`, - expectedErr: "unknown scheme : stuff", + expectedErr: "Loki query is mandatory", + testName: "Missing url", }, { config: ` @@ -52,6 +51,7 @@ source: loki url: http://localhost:3100/ `, expectedErr: "Loki query is mandatory", + testName: "Missing query", }, { config: ` @@ -62,6 +62,7 @@ query: > {server="demo"} `, expectedErr: "", + testName: "Correct config", }, { config: ` @@ -73,6 +74,7 @@ query: > {server="demo"} `, expectedErr: "", + testName: "Correct config with wait_for_ready", }, { @@ -85,30 +87,33 @@ query: > `, expectedErr: "", password: "bar", + testName: "Correct config with password", }, } subLogger := log.WithFields(log.Fields{ "type": "loki", }) for _, test := range tests { - lokiSource := LokiSource{} - err := lokiSource.Configure([]byte(test.config), subLogger) - cstest.AssertErrorContains(t, err, test.expectedErr) - if test.password == "" { - if lokiSource.auth != nil { - t.Fatalf("No auth should be here : %v", lokiSource.auth) + t.Run(test.testName, func(t *testing.T) { + lokiSource := loki.LokiSource{} + err := lokiSource.Configure([]byte(test.config), subLogger) + cstest.AssertErrorContains(t, err, test.expectedErr) + /*if test.password == "" { + if lokiSource.auth != nil { + t.Fatalf("No auth should be here : %v", lokiSource.auth) + } + } else { + p, _ := lokiSource.auth.Password() + if test.password != p { + t.Fatalf("Bad password %s != %s", test.password, p) + } + }*/ + if test.waitForReady != 0 { + if lokiSource.Config.WaitForReady != test.waitForReady { + t.Fatalf("Wrong WaitForReady %v != %v", lokiSource.Config.WaitForReady, test.waitForReady) + } } - } else { - p, _ := lokiSource.auth.Password() - if test.password != p { - t.Fatalf("Bad password %s != %s", test.password, p) - } - } - if test.waitForReady != 0 { - if lokiSource.Config.WaitForReady != test.waitForReady { - t.Fatalf("Wrong WaitForReady %v != %v", lokiSource.Config.WaitForReady, test.waitForReady) - } - } + }) } } @@ -147,11 +152,11 @@ func TestConfigureDSN(t *testing.T) { dsn: `loki://127.0.0.1:3100/?since=3h&query={server="demo"}`, since: time.Now().Add(-3 * time.Hour), }, - { + /*{ name: "Basic Auth", dsn: `loki://login:password@localhost:3100/?query={server="demo"}`, password: "password", - }, + },*/ { name: "Correct DSN", dsn: `loki://localhost:3100/?query={server="demo"}&wait_for_ready=5s`, @@ -165,10 +170,10 @@ func TestConfigureDSN(t *testing.T) { "type": "loki", "name": test.name, }) - lokiSource := &LokiSource{} + lokiSource := &loki.LokiSource{} err := lokiSource.ConfigureByDSN(test.dsn, map[string]string{"type": "testtype"}, subLogger) cstest.AssertErrorContains(t, err, test.expectedErr) - if time.Time(lokiSource.Config.Since).Round(time.Second) != test.since.Round(time.Second) { + /*if time.Time(lokiSource.Config.Since).Round(time.Second) != test.since.Round(time.Second) { t.Fatalf("Invalid since %v", lokiSource.Config.Since) } if test.password == "" { @@ -184,7 +189,7 @@ func TestConfigureDSN(t *testing.T) { if !strings.HasPrefix(a, "Basic ") { t.Fatalf("Bad auth header : %s", a) } - } + }*/ if test.waitForReady != 0 { if lokiSource.Config.WaitForReady != test.waitForReady { t.Fatalf("Wrong WaitForReady %v != %v", lokiSource.Config.WaitForReady, test.waitForReady) @@ -242,8 +247,7 @@ func TestOneShotAcquisition(t *testing.T) { mode: cat source: loki url: http://127.0.0.1:3100 -query: > - {server="demo",key="%s"} +query: '{server="demo",key="%s"}' since: 1h `, title), }, @@ -251,11 +255,10 @@ since: 1h for _, ts := range tests { logger := log.New() - logger.SetLevel(log.InfoLevel) subLogger := logger.WithFields(log.Fields{ "type": "loki", }) - lokiSource := LokiSource{} + lokiSource := loki.LokiSource{} err := lokiSource.Configure([]byte(ts.config), subLogger) if err != nil { t.Fatalf("Unexpected error : %s", err) @@ -267,9 +270,11 @@ since: 1h } out := make(chan types.Event) + read := 0 go func() { - for i := 0; i < 20; i++ { + for { <-out + read++ } }() lokiTomb := tomb.Tomb{} @@ -277,6 +282,8 @@ since: 1h if err != nil { t.Fatalf("Unexpected error : %s", err) } + assert.Equal(t, 20, read) + } } @@ -286,14 +293,11 @@ func TestStreamingAcquisition(t *testing.T) { log.Info("Test 'TestStreamingAcquisition'") title := time.Now().String() tests := []struct { - name string - config string - expectedErr string - streamErr string - expectedOutput string - expectedLines int - logType string - logLevel log.Level + name string + config string + expectedErr string + streamErr string + expectedLines int }{ { name: "Bad port", @@ -302,14 +306,11 @@ mode: tail source: loki url: http://127.0.0.1:3101 query: > - {server="demo"} + {server="demo"} `, // No Loki server here - expectedErr: "", - streamErr: `Get "http://127.0.0.1:3101/ready": dial tcp 127.0.0.1:3101: connect: connection refused`, - expectedOutput: "", - expectedLines: 0, - logType: "test", - logLevel: log.InfoLevel, + expectedErr: "", + streamErr: `loki is not ready: context deadline exceeded`, + expectedLines: 0, }, { name: "ok", @@ -319,68 +320,71 @@ source: loki url: http://127.0.0.1:3100 query: > {server="demo"} -`, // No Loki server here - expectedErr: "", - streamErr: "", - expectedOutput: "", - expectedLines: 0, - logType: "test", - logLevel: log.InfoLevel, +`, + expectedErr: "", + streamErr: "", + expectedLines: 20, }, } for _, ts := range tests { - logger := log.New() - subLogger := logger.WithFields(log.Fields{ - "type": "loki", - "name": ts.name, - }) + t.Run(ts.name, func(t *testing.T) { + logger := log.New() + subLogger := logger.WithFields(log.Fields{ + "type": "loki", + "name": ts.name, + }) - if ts.expectedOutput != "" { - logger.SetLevel(ts.logLevel) - } - out := make(chan types.Event) - lokiTomb := tomb.Tomb{} - lokiSource := LokiSource{} - err := lokiSource.Configure([]byte(ts.config), subLogger) - if err != nil { - t.Fatalf("Unexpected error : %s", err) - } - streamTomb := tomb.Tomb{} - streamTomb.Go(func() error { - return lokiSource.StreamingAcquisition(out, &lokiTomb) - }) - - readTomb := tomb.Tomb{} - readTomb.Go(func() error { - for i := 0; i < 20; i++ { - evt := <-out - fmt.Println(evt) - if !strings.HasSuffix(evt.Line.Raw, title) { - return fmt.Errorf("Incorrect suffix : %s", evt.Line.Raw) - } - } - return nil - }) - - writerTomb := tomb.Tomb{} - writerTomb.Go(func() error { - return feedLoki(subLogger, 20, title) - }) - err = writerTomb.Wait() - if err != nil { - t.Fatalf("Unexpected error : %s", err) - } - - err = streamTomb.Wait() - cstest.AssertErrorContains(t, err, ts.streamErr) - - if err == nil { - err = readTomb.Wait() + out := make(chan types.Event) + lokiTomb := tomb.Tomb{} + lokiSource := loki.LokiSource{} + err := lokiSource.Configure([]byte(ts.config), subLogger) if err != nil { t.Fatalf("Unexpected error : %s", err) } - } + err = lokiSource.StreamingAcquisition(out, &lokiTomb) + cstest.AssertErrorContains(t, err, ts.streamErr) + + if ts.streamErr != "" { + return + } + + time.Sleep(time.Second * 2) //We need to give time to start reading from the WS + readTomb := tomb.Tomb{} + readCtx, cancel := context.WithTimeout(context.Background(), time.Second*10) + count := 0 + + readTomb.Go(func() error { + defer cancel() + for { + select { + case <-readCtx.Done(): + return readCtx.Err() + case evt := <-out: + count++ + if !strings.HasSuffix(evt.Line.Raw, title) { + return fmt.Errorf("Incorrect suffix : %s", evt.Line.Raw) + } + if count == ts.expectedLines { + return nil + } + } + } + }) + + err = feedLoki(subLogger, ts.expectedLines, title) + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + + err = readTomb.Wait() + cancel() + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + assert.Equal(t, count, ts.expectedLines) + }) } + } func TestStopStreaming(t *testing.T) { @@ -396,27 +400,21 @@ query: > "type": "loki", }) title := time.Now().String() - lokiSource := LokiSource{} + lokiSource := loki.LokiSource{} err := lokiSource.Configure([]byte(config), subLogger) if err != nil { t.Fatalf("Unexpected error : %s", err) } out := make(chan types.Event) - drainTomb := tomb.Tomb{} - drainTomb.Go(func() error { - <-out - return nil - }) + lokiTomb := &tomb.Tomb{} err = lokiSource.StreamingAcquisition(out, lokiTomb) if err != nil { t.Fatalf("Unexpected error : %s", err) } + time.Sleep(time.Second * 2) feedLoki(subLogger, 1, title) - err = drainTomb.Wait() - if err != nil { - t.Fatalf("Unexpected error : %s", err) - } + lokiTomb.Kill(nil) err = lokiTomb.Wait() if err != nil {