Tenant ID.

This commit is contained in:
Mathieu Lecarme 2022-06-07 18:03:15 +02:00 committed by lperdereau
parent c2b3d752a2
commit d99feefe2a

View file

@ -14,6 +14,7 @@ import (
"net/url" "net/url"
"time" "time"
"github.com/crowdsecurity/crowdsec/pkg/cwversion"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
@ -45,6 +46,7 @@ type LokiConfiguration struct {
Query string `yaml:"query"` // LogQL query Query string `yaml:"query"` // LogQL query
DelayFor time.Duration `yaml:"delay_for"` DelayFor time.Duration `yaml:"delay_for"`
Since time.Duration `yaml:"since"` Since time.Duration `yaml:"since"`
TenantID string `yaml:"tenant_id"`
configuration.DataSourceCommonCfg `yaml:",inline"` configuration.DataSourceCommonCfg `yaml:",inline"`
} }
@ -54,6 +56,7 @@ type LokiSource struct {
lokiWebsocket string lokiWebsocket string
lokiReady string lokiReady string
dialer *websocket.Dialer dialer *websocket.Dialer
header http.Header
} }
func (l *LokiSource) GetMetrics() []prometheus.Collector { func (l *LokiSource) GetMetrics() []prometheus.Collector {
@ -71,15 +74,28 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error {
if err != nil { if err != nil {
return errors.Wrap(err, "Cannot parse LokiAcquisition configuration") return errors.Wrap(err, "Cannot parse LokiAcquisition configuration")
} }
l.dialer = &websocket.Dialer{}
err = l.buildUrl() err = l.buildUrl()
if err != nil { if err != nil {
return errors.Wrap(err, "Cannot build Loki url") return errors.Wrap(err, "Cannot build Loki url")
} }
err = l.prepareConfig()
if err != nil {
return errors.Wrap(err, "Cannot prepare Loki config")
}
return nil return nil
} }
func (l *LokiSource) prepareConfig() error {
l.dialer = &websocket.Dialer{}
l.header = http.Header{}
if l.Config.TenantID != "" {
l.header.Set("X-Scope-OrgID", l.Config.TenantID)
}
l.header.Set("User-Agent", "Crowdsec "+cwversion.Version)
return nil
}
func (l *LokiSource) buildUrl() error { func (l *LokiSource) buildUrl() error {
u, err := url.Parse(l.Config.URL) u, err := url.Parse(l.Config.URL)
if err != nil { if err != nil {
@ -125,7 +141,6 @@ func (l *LokiSource) buildUrl() error {
func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error { func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
l.logger = logger l.logger = logger
l.dialer = &websocket.Dialer{}
l.Config = LokiConfiguration{} l.Config = LokiConfiguration{}
l.Config.Mode = configuration.CAT_MODE l.Config.Mode = configuration.CAT_MODE
l.Config.Labels = labels l.Config.Labels = labels
@ -164,11 +179,17 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger
} }
l.Config.Since = since l.Config.Since = since
} }
l.Config.TenantID = params.Get("tenantID")
err = l.buildUrl() err = l.buildUrl()
if err != nil { if err != nil {
return errors.Wrap(err, "Cannot build Loki url from DSN") return errors.Wrap(err, "Cannot build Loki url from DSN")
} }
err = l.prepareConfig()
if err != nil {
return errors.Wrap(err, "Cannot prepare Loki from DSN")
}
return nil return nil
} }
@ -187,8 +208,7 @@ func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) erro
} }
ctx, cancel := context.WithTimeout(context.TODO(), readyTimeout) ctx, cancel := context.WithTimeout(context.TODO(), readyTimeout)
defer cancel() defer cancel()
header := &http.Header{} c, res, err := l.dialer.DialContext(ctx, l.lokiWebsocket, l.header)
c, res, err := l.dialer.DialContext(ctx, l.lokiWebsocket, *header)
if err != nil { if err != nil {
buf, _ := ioutil.ReadAll(res.Body) buf, _ := ioutil.ReadAll(res.Body)
return fmt.Errorf("loki websocket (%s) error %v : %s", l.lokiWebsocket, err, string(buf)) return fmt.Errorf("loki websocket (%s) error %v : %s", l.lokiWebsocket, err, string(buf))
@ -235,8 +255,7 @@ func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er
for { for {
ctx, cancel := context.WithTimeout(context.TODO(), readyTimeout) ctx, cancel := context.WithTimeout(context.TODO(), readyTimeout)
defer cancel() defer cancel()
header := &http.Header{} c, res, err := l.dialer.DialContext(ctx, l.lokiWebsocket, l.header)
c, res, err := l.dialer.DialContext(ctx, l.lokiWebsocket, *header)
if err != nil { if err != nil {
buf, err2 := ioutil.ReadAll(res.Body) buf, err2 := ioutil.ReadAll(res.Body)
if err2 == nil { if err2 == nil {
@ -278,8 +297,14 @@ func (l *LokiSource) ready() error {
Timeout: readyTimeout, Timeout: readyTimeout,
} }
req, err := http.NewRequest("GET", l.lokiReady, nil)
if err != nil {
return err
}
req.Header = l.header
for i := 0; i < readyLoop; i++ { for i := 0; i < readyLoop; i++ {
resp, err := client.Get(l.lokiReady) resp, err := client.Do(req)
if err != nil { if err != nil {
return errors.Wrap(err, "Test Loki services for readiness") return errors.Wrap(err, "Test Loki services for readiness")
} }