diff --git a/pkg/acquisition/modules/loki/loki.go b/pkg/acquisition/modules/loki/loki.go index 1a6d63b0c..7b69d92ef 100644 --- a/pkg/acquisition/modules/loki/loki.go +++ b/pkg/acquisition/modules/loki/loki.go @@ -14,6 +14,7 @@ import ( "net/url" "time" + "github.com/crowdsecurity/crowdsec/pkg/cwversion" leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" @@ -45,6 +46,7 @@ type LokiConfiguration struct { Query string `yaml:"query"` // LogQL query DelayFor time.Duration `yaml:"delay_for"` Since time.Duration `yaml:"since"` + TenantID string `yaml:"tenant_id"` configuration.DataSourceCommonCfg `yaml:",inline"` } @@ -54,6 +56,7 @@ type LokiSource struct { lokiWebsocket string lokiReady string dialer *websocket.Dialer + header http.Header } func (l *LokiSource) GetMetrics() []prometheus.Collector { @@ -71,15 +74,28 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error { if err != nil { return errors.Wrap(err, "Cannot parse LokiAcquisition configuration") } - l.dialer = &websocket.Dialer{} err = l.buildUrl() if err != nil { return errors.Wrap(err, "Cannot build Loki url") } + err = l.prepareConfig() + if err != nil { + return errors.Wrap(err, "Cannot prepare Loki config") + } return nil } +func (l *LokiSource) prepareConfig() error { + l.dialer = &websocket.Dialer{} + l.header = http.Header{} + if l.Config.TenantID != "" { + l.header.Set("X-Scope-OrgID", l.Config.TenantID) + } + l.header.Set("User-Agent", "Crowdsec "+cwversion.Version) + return nil +} + func (l *LokiSource) buildUrl() error { u, err := url.Parse(l.Config.URL) if err != nil { @@ -125,7 +141,6 @@ func (l *LokiSource) buildUrl() error { func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error { l.logger = logger - l.dialer = &websocket.Dialer{} l.Config = LokiConfiguration{} l.Config.Mode = configuration.CAT_MODE l.Config.Labels = labels @@ -164,11 +179,17 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger } l.Config.Since = since } + l.Config.TenantID = params.Get("tenantID") err = l.buildUrl() if err != nil { return errors.Wrap(err, "Cannot build Loki url from DSN") } + err = l.prepareConfig() + if err != nil { + return errors.Wrap(err, "Cannot prepare Loki from DSN") + } + return nil } @@ -187,8 +208,7 @@ func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) erro } ctx, cancel := context.WithTimeout(context.TODO(), readyTimeout) defer cancel() - header := &http.Header{} - c, res, err := l.dialer.DialContext(ctx, l.lokiWebsocket, *header) + c, res, err := l.dialer.DialContext(ctx, l.lokiWebsocket, l.header) if err != nil { buf, _ := ioutil.ReadAll(res.Body) return fmt.Errorf("loki websocket (%s) error %v : %s", l.lokiWebsocket, err, string(buf)) @@ -235,8 +255,7 @@ func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er for { ctx, cancel := context.WithTimeout(context.TODO(), readyTimeout) defer cancel() - header := &http.Header{} - c, res, err := l.dialer.DialContext(ctx, l.lokiWebsocket, *header) + c, res, err := l.dialer.DialContext(ctx, l.lokiWebsocket, l.header) if err != nil { buf, err2 := ioutil.ReadAll(res.Body) if err2 == nil { @@ -278,8 +297,14 @@ func (l *LokiSource) ready() error { Timeout: readyTimeout, } + req, err := http.NewRequest("GET", l.lokiReady, nil) + if err != nil { + return err + } + req.Header = l.header + for i := 0; i < readyLoop; i++ { - resp, err := client.Get(l.lokiReady) + resp, err := client.Do(req) if err != nil { return errors.Wrap(err, "Test Loki services for readiness") }