From 48431f20ed963396d412757b22f3282339ea9caa Mon Sep 17 00:00:00 2001 From: Mathieu Lecarme Date: Tue, 14 Jun 2022 18:58:51 +0200 Subject: [PATCH] Specific time.Time type for YAML parsing. --- pkg/acquisition/modules/loki/loki.go | 14 +++++++------- pkg/acquisition/modules/loki/loki_test.go | 6 +++--- pkg/acquisition/modules/loki/timestamp.go | 4 ++++ 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/pkg/acquisition/modules/loki/loki.go b/pkg/acquisition/modules/loki/loki.go index 7a2851026..db5b199b0 100644 --- a/pkg/acquisition/modules/loki/loki.go +++ b/pkg/acquisition/modules/loki/loki.go @@ -46,7 +46,7 @@ type LokiConfiguration struct { URL string `yaml:"url"` // Loki url Query string `yaml:"query"` // LogQL query DelayFor time.Duration `yaml:"delay_for"` - Since time.Duration `yaml:"since"` + Since timestamp `yaml:"since"` TenantID string `yaml:"tenant_id"` Headers map[string]string `yaml:"headers"` // HTTP headers for talking to Loki configuration.DataSourceCommonCfg `yaml:",inline"` @@ -81,6 +81,9 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error { if err != nil { return err } + if l.Config.Since.IsZero() { + l.Config.Since = timestamp(time.Now()) + } if u.User != nil { l.auth = u.User } @@ -143,10 +146,7 @@ func (l *LokiSource) buildUrl() error { if l.Config.DelayFor != 0 { params.Add("delay_for", fmt.Sprintf("%d", int64(l.Config.DelayFor.Seconds()))) } - start := time.Now() // FIXME config - if l.Config.Since != 0 { - start = start.Add(-l.Config.Since) - } + start := time.Time(l.Config.Since) params.Add("start", fmt.Sprintf("%d", start.UnixNano())) buff.WriteString(params.Encode()) l.lokiWebsocket = buff.String() @@ -191,11 +191,10 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger l.Config.DelayFor = delayFor } if s := params.Get("since"); s != "" { - since, err := time.ParseDuration(s) + err = yaml.Unmarshal([]byte(s), &l.Config.Since) if err != nil { return errors.Wrap(err, "can't parse since in DSB configuration") } - l.Config.Since = since } l.Config.TenantID = params.Get("tenantID") @@ -219,6 +218,7 @@ func (l *LokiSource) GetName() string { return "loki" } +// OneShotAcquisition reads a set of file and returns when done func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { err := l.ready() if err != nil { diff --git a/pkg/acquisition/modules/loki/loki_test.go b/pkg/acquisition/modules/loki/loki_test.go index 98f1b6de9..683a3c310 100644 --- a/pkg/acquisition/modules/loki/loki_test.go +++ b/pkg/acquisition/modules/loki/loki_test.go @@ -89,7 +89,7 @@ func TestConfigureDSN(t *testing.T) { name string dsn string expectedErr string - since time.Duration + since time.Time password string }{ { @@ -115,7 +115,7 @@ func TestConfigureDSN(t *testing.T) { { name: "Bad since param", dsn: "loki://127.0.0.1:3100/?since=3h", - since: 3 * time.Hour, + since: time.Now().Add(-3 * time.Hour), }, { name: "Basic Auth", @@ -132,7 +132,7 @@ func TestConfigureDSN(t *testing.T) { lokiSource := &LokiSource{} err := lokiSource.ConfigureByDSN(test.dsn, map[string]string{"type": "testtype"}, subLogger) cstest.AssertErrorContains(t, err, test.expectedErr) - if lokiSource.Config.Since != test.since { + if time.Time(lokiSource.Config.Since).Round(time.Second) != test.since.Round(time.Second) { t.Fatalf("Invalid since %v", lokiSource.Config.Since) } if test.password == "" { diff --git a/pkg/acquisition/modules/loki/timestamp.go b/pkg/acquisition/modules/loki/timestamp.go index 4665095f1..f1ec246ea 100644 --- a/pkg/acquisition/modules/loki/timestamp.go +++ b/pkg/acquisition/modules/loki/timestamp.go @@ -23,3 +23,7 @@ func (t *timestamp) UnmarshalYAML(unmarshal func(interface{}) error) error { } return err } + +func (t *timestamp) IsZero() bool { + return time.Time(*t).IsZero() +}