diff --git a/pkg/acquisition/modules/loki/loki_test.go b/pkg/acquisition/modules/loki/loki_test.go index 73a164955..7a18c8e17 100644 --- a/pkg/acquisition/modules/loki/loki_test.go +++ b/pkg/acquisition/modules/loki/loki_test.go @@ -1,8 +1,14 @@ package loki import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" "os" "testing" + "time" "github.com/crowdsecurity/crowdsec/pkg/cstest" "github.com/crowdsecurity/crowdsec/pkg/types" @@ -85,6 +91,19 @@ url: http://127.0.0.1:3101 logType: "test", logLevel: log.InfoLevel, }, + { + config: ` +mode: tail +source: loki +url: http://127.0.0.1:3100 +`, // No Loki server here + expectedErr: "", + streamErr: "", + expectedOutput: "", + expectedLines: 0, + logType: "test", + logLevel: log.InfoLevel, + }, } for _, ts := range tests { var logger *log.Logger @@ -111,7 +130,70 @@ url: http://127.0.0.1:3101 return lokiSource.StreamingAcquisition(out, &lokiTomb) }) + writerTomb := tomb.Tomb{} + writerTomb.Go(func() error { + streams := LogStreams{ + Streams: []LogStream{ + { + Stream: map[string]string{ + "server": "demo", + "domain": "cw.example.com", + }, + Values: make([]LogValue, 20), + }, + }, + } + for i := 0; i < 20; i++ { + streams.Streams[0].Values[i] = LogValue{ + Time: time.Now(), + Line: fmt.Sprintf("Log line #%d", i), + } + } + buff := &bytes.Buffer{} + encoder := json.NewEncoder(buff) + err := encoder.Encode(streams) + if err != nil { + return err + } + resp, err := http.Post("http://127.0.0.1:3100/loki/api/v1/push", "application/json", buff) + if err != nil { + return err + } + if resp.StatusCode != 204 { + b, _ := ioutil.ReadAll(resp.Body) + log.Error(string(b)) + return fmt.Errorf("Bad post status %d", resp.StatusCode) + } + return nil + }) + err = writerTomb.Wait() + if err != nil { + t.Fatalf("Unexpected error : %s", err) + } + err = streamTomb.Wait() cstest.AssertErrorContains(t, err, ts.streamErr) } } + +type LogStreams struct { + Streams []LogStream `json:"streams"` +} + +type LogStream struct { + Stream map[string]string `json:"stream"` + Values []LogValue `json:"values"` +} + +type LogValue struct { + Time time.Time + Line string +} + +func (l *LogValue) MarshalJSON() ([]byte, error) { + line, err := json.Marshal(l.Line) + if err != nil { + return nil, err + } + return []byte(fmt.Sprintf(`[%d,%s]`, l.Time.UnixNano(), string(line))), nil +}