diff --git a/pkg/acquisition/modules/loki/loki.go b/pkg/acquisition/modules/loki/loki.go index 0f05e4cc5..1a6d63b0c 100644 --- a/pkg/acquisition/modules/loki/loki.go +++ b/pkg/acquisition/modules/loki/loki.go @@ -12,7 +12,6 @@ import ( "io/ioutil" "net/http" "net/url" - "strings" "time" leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" @@ -126,14 +125,50 @@ func (l *LokiSource) buildUrl() error { func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error { l.logger = logger + l.dialer = &websocket.Dialer{} l.Config = LokiConfiguration{} l.Config.Mode = configuration.CAT_MODE l.Config.Labels = labels - if !strings.HasPrefix(dsn, "loki://") { + u, err := url.Parse(dsn) + if err != nil { + return errors.Wrap(err, "can't parse dsn configuration : "+dsn) + } + if u.Scheme != "loki" { return fmt.Errorf("invalid DSN %s for loki source, must start with loki://", dsn) } - // FIXME DSN parsing + if u.Host == "" { + return errors.New("Empty loki host") + } + scheme := "https" + // FIXME how can use http with container, in a private network? + if u.Host == "localhost" || u.Host == "127.0.0.1" || u.Host == "[::1]" { + scheme = "http" + } + l.Config.URL = fmt.Sprintf("%s://%s", scheme, u.Host) + params := u.Query() + if q := params.Get("query"); q != "" { + l.Config.Query = q + } + if d := params.Get("delay_for"); d != "" { + delayFor, err := time.ParseDuration(d) + if err != nil { + return err + } + l.Config.DelayFor = delayFor + } + if s := params.Get("since"); s != "" { + since, err := time.ParseDuration(s) + if err != nil { + return errors.Wrap(err, "can't parse since in DSB configuration") + } + l.Config.Since = since + } + + err = l.buildUrl() + if err != nil { + return errors.Wrap(err, "Cannot build Loki url from DSN") + } return nil } diff --git a/pkg/acquisition/modules/loki/loki_test.go b/pkg/acquisition/modules/loki/loki_test.go index 1298863c3..1a245bd53 100644 --- a/pkg/acquisition/modules/loki/loki_test.go +++ b/pkg/acquisition/modules/loki/loki_test.go @@ -56,14 +56,59 @@ url: http://localhost:3100/ "type": "loki", }) for _, test := range tests { - f := LokiSource{} - err := f.Configure([]byte(test.config), subLogger) + lokiSource := LokiSource{} + err := lokiSource.Configure([]byte(test.config), subLogger) cstest.AssertErrorContains(t, err, test.expectedErr) } } func TestConfigureDSN(t *testing.T) { - // TODO + log.Infof("Test 'TestConfigureDSN'") + tests := []struct { + name string + dsn string + expectedErr string + since time.Duration + }{ + { + name: "Wrong scheme", + dsn: "wrong://", + expectedErr: "invalid DSN wrong:// for loki source, must start with loki://", + }, + { + name: "Correct DSN", + dsn: "loki://localhost:3100/", + expectedErr: "", + }, + { + name: "Empty host", + dsn: "loki://", + expectedErr: "Empty loki host", + }, + { + name: "Invalid DSN", + dsn: "loki", + expectedErr: "invalid DSN loki for loki source, must start with loki://", + }, + { + name: "Bad since param", + dsn: "loki://127.0.0.1:3100/?since=3h", + since: 3 * time.Hour, + }, + } + + for _, test := range tests { + subLogger := log.WithFields(log.Fields{ + "type": "loki", + "name": test.name, + }) + lokiSource := &LokiSource{} + err := lokiSource.ConfigureByDSN(test.dsn, map[string]string{"type": "testtype"}, subLogger) + cstest.AssertErrorContains(t, err, test.expectedErr) + if lokiSource.Config.Since != test.since { + t.Fatalf("Invalid since %v", lokiSource.Config.Since) + } + } } func TestStreamingAcquisition(t *testing.T) {