From a81d52a1ff51d5746434d43fa9ab67b35785c3f8 Mon Sep 17 00:00:00 2001 From: Mathieu Lecarme Date: Thu, 16 Jun 2022 16:10:33 +0200 Subject: [PATCH] FeedLoki routine. --- pkg/acquisition/modules/loki/loki_test.go | 99 +++++++++-------------- 1 file changed, 38 insertions(+), 61 deletions(-) diff --git a/pkg/acquisition/modules/loki/loki_test.go b/pkg/acquisition/modules/loki/loki_test.go index d735594ce..1d45a2ec0 100644 --- a/pkg/acquisition/modules/loki/loki_test.go +++ b/pkg/acquisition/modules/loki/loki_test.go @@ -164,6 +164,42 @@ func TestConfigureDSN(t *testing.T) { } } +func feedLoki(logger *log.Entry, n int, title string) error { + streams := LogStreams{ + Streams: []LogStream{ + { + Stream: map[string]string{ + "server": "demo", + "domain": "cw.example.com", + "key": title, + }, + Values: make([]LogValue, n), + }, + }, + } + for i := 0; i < n; i++ { + streams.Streams[0].Values[i] = LogValue{ + Time: time.Now(), + Line: fmt.Sprintf("Log line #%d %v", i, title), + } + } + buff, err := json.Marshal(streams) + if err != nil { + return err + } + resp, err := http.Post("http://127.0.0.1:3100/loki/api/v1/push", "application/json", bytes.NewBuffer(buff)) + if err != nil { + return err + } + if resp.StatusCode != 204 { + b, _ := ioutil.ReadAll(resp.Body) + logger.Error(string(b)) + return fmt.Errorf("Bad post status %d", resp.StatusCode) + } + logger.Info(n, " Events sent") + return nil +} + func TestOneShotAcquisition(t *testing.T) { log.SetOutput(os.Stdout) log.SetLevel(log.InfoLevel) @@ -196,38 +232,10 @@ since: 1h t.Fatalf("Unexpected error : %s", err) } - streams := LogStreams{ - Streams: []LogStream{ - { - Stream: map[string]string{ - "server": "demo", - "domain": "cw.example.com", - "key": title, - }, - Values: make([]LogValue, 20), - }, - }, - } - for i := 0; i < 20; i++ { - streams.Streams[0].Values[i] = LogValue{ - Time: time.Now(), - Line: fmt.Sprintf("Log line #%d %v", i, title), - } - } - buff, err := json.Marshal(streams) + err = feedLoki(subLogger, 20, title) if err != nil { t.Fatalf("Unexpected error : %s", err) } - resp, err := http.Post("http://127.0.0.1:3100/loki/api/v1/push", "application/json", bytes.NewBuffer(buff)) - if err != nil { - t.Fatalf("Unexpected error : %s", err) - } - if resp.StatusCode != 204 { - b, _ := ioutil.ReadAll(resp.Body) - log.Error(string(b)) - t.Fatalf("Bad post status %d", resp.StatusCode) - } - subLogger.Info("20 Events sent") out := make(chan types.Event) go func() { @@ -327,38 +335,7 @@ query: > writerTomb := tomb.Tomb{} writerTomb.Go(func() error { - streams := LogStreams{ - Streams: []LogStream{ - { - Stream: map[string]string{ - "server": "demo", - "domain": "cw.example.com", - }, - Values: make([]LogValue, 20), - }, - }, - } - for i := 0; i < 20; i++ { - streams.Streams[0].Values[i] = LogValue{ - Time: time.Now(), - Line: fmt.Sprintf("Log line #%d %v", i, title), - } - } - buff, err := json.Marshal(streams) - if err != nil { - return err - } - resp, err := http.Post("http://127.0.0.1:3100/loki/api/v1/push", "application/json", bytes.NewBuffer(buff)) - if err != nil { - return err - } - if resp.StatusCode != 204 { - b, _ := ioutil.ReadAll(resp.Body) - log.Error(string(b)) - return fmt.Errorf("Bad post status %d", resp.StatusCode) - } - subLogger.Info("20 Events sent") - return nil + return feedLoki(subLogger, 20, title) }) err = writerTomb.Wait() if err != nil {