This commit is contained in:
Mathieu Lecarme 2022-06-07 17:29:18 +02:00 committed by lperdereau
parent c46fb2be6c
commit c2b3d752a2
2 changed files with 86 additions and 6 deletions

View file

@ -12,7 +12,6 @@ import (
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
@ -126,14 +125,50 @@ func (l *LokiSource) buildUrl() error {
func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
l.logger = logger
l.dialer = &websocket.Dialer{}
l.Config = LokiConfiguration{}
l.Config.Mode = configuration.CAT_MODE
l.Config.Labels = labels
if !strings.HasPrefix(dsn, "loki://") {
u, err := url.Parse(dsn)
if err != nil {
return errors.Wrap(err, "can't parse dsn configuration : "+dsn)
}
if u.Scheme != "loki" {
return fmt.Errorf("invalid DSN %s for loki source, must start with loki://", dsn)
}
// FIXME DSN parsing
if u.Host == "" {
return errors.New("Empty loki host")
}
scheme := "https"
// FIXME how can use http with container, in a private network?
if u.Host == "localhost" || u.Host == "127.0.0.1" || u.Host == "[::1]" {
scheme = "http"
}
l.Config.URL = fmt.Sprintf("%s://%s", scheme, u.Host)
params := u.Query()
if q := params.Get("query"); q != "" {
l.Config.Query = q
}
if d := params.Get("delay_for"); d != "" {
delayFor, err := time.ParseDuration(d)
if err != nil {
return err
}
l.Config.DelayFor = delayFor
}
if s := params.Get("since"); s != "" {
since, err := time.ParseDuration(s)
if err != nil {
return errors.Wrap(err, "can't parse since in DSB configuration")
}
l.Config.Since = since
}
err = l.buildUrl()
if err != nil {
return errors.Wrap(err, "Cannot build Loki url from DSN")
}
return nil
}

View file

@ -56,14 +56,59 @@ url: http://localhost:3100/
"type": "loki",
})
for _, test := range tests {
f := LokiSource{}
err := f.Configure([]byte(test.config), subLogger)
lokiSource := LokiSource{}
err := lokiSource.Configure([]byte(test.config), subLogger)
cstest.AssertErrorContains(t, err, test.expectedErr)
}
}
func TestConfigureDSN(t *testing.T) {
// TODO
log.Infof("Test 'TestConfigureDSN'")
tests := []struct {
name string
dsn string
expectedErr string
since time.Duration
}{
{
name: "Wrong scheme",
dsn: "wrong://",
expectedErr: "invalid DSN wrong:// for loki source, must start with loki://",
},
{
name: "Correct DSN",
dsn: "loki://localhost:3100/",
expectedErr: "",
},
{
name: "Empty host",
dsn: "loki://",
expectedErr: "Empty loki host",
},
{
name: "Invalid DSN",
dsn: "loki",
expectedErr: "invalid DSN loki for loki source, must start with loki://",
},
{
name: "Bad since param",
dsn: "loki://127.0.0.1:3100/?since=3h",
since: 3 * time.Hour,
},
}
for _, test := range tests {
subLogger := log.WithFields(log.Fields{
"type": "loki",
"name": test.name,
})
lokiSource := &LokiSource{}
err := lokiSource.ConfigureByDSN(test.dsn, map[string]string{"type": "testtype"}, subLogger)
cstest.AssertErrorContains(t, err, test.expectedErr)
if lokiSource.Config.Since != test.since {
t.Fatalf("Invalid since %v", lokiSource.Config.Since)
}
}
}
func TestStreamingAcquisition(t *testing.T) {