Push log to Loki.
This commit is contained in:
parent
7037463fcc
commit
b17711ed99
|
@ -1,8 +1,14 @@
|
||||||
package loki
|
package loki
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/cstest"
|
"github.com/crowdsecurity/crowdsec/pkg/cstest"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||||
|
@ -85,6 +91,19 @@ url: http://127.0.0.1:3101
|
||||||
logType: "test",
|
logType: "test",
|
||||||
logLevel: log.InfoLevel,
|
logLevel: log.InfoLevel,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
config: `
|
||||||
|
mode: tail
|
||||||
|
source: loki
|
||||||
|
url: http://127.0.0.1:3100
|
||||||
|
`, // No Loki server here
|
||||||
|
expectedErr: "",
|
||||||
|
streamErr: "",
|
||||||
|
expectedOutput: "",
|
||||||
|
expectedLines: 0,
|
||||||
|
logType: "test",
|
||||||
|
logLevel: log.InfoLevel,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
for _, ts := range tests {
|
for _, ts := range tests {
|
||||||
var logger *log.Logger
|
var logger *log.Logger
|
||||||
|
@ -111,7 +130,70 @@ url: http://127.0.0.1:3101
|
||||||
return lokiSource.StreamingAcquisition(out, &lokiTomb)
|
return lokiSource.StreamingAcquisition(out, &lokiTomb)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
writerTomb := tomb.Tomb{}
|
||||||
|
writerTomb.Go(func() error {
|
||||||
|
streams := LogStreams{
|
||||||
|
Streams: []LogStream{
|
||||||
|
{
|
||||||
|
Stream: map[string]string{
|
||||||
|
"server": "demo",
|
||||||
|
"domain": "cw.example.com",
|
||||||
|
},
|
||||||
|
Values: make([]LogValue, 20),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
streams.Streams[0].Values[i] = LogValue{
|
||||||
|
Time: time.Now(),
|
||||||
|
Line: fmt.Sprintf("Log line #%d", i),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
buff := &bytes.Buffer{}
|
||||||
|
encoder := json.NewEncoder(buff)
|
||||||
|
err := encoder.Encode(streams)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
resp, err := http.Post("http://127.0.0.1:3100/loki/api/v1/push", "application/json", buff)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if resp.StatusCode != 204 {
|
||||||
|
b, _ := ioutil.ReadAll(resp.Body)
|
||||||
|
log.Error(string(b))
|
||||||
|
return fmt.Errorf("Bad post status %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
err = writerTomb.Wait()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error : %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
err = streamTomb.Wait()
|
err = streamTomb.Wait()
|
||||||
cstest.AssertErrorContains(t, err, ts.streamErr)
|
cstest.AssertErrorContains(t, err, ts.streamErr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type LogStreams struct {
|
||||||
|
Streams []LogStream `json:"streams"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type LogStream struct {
|
||||||
|
Stream map[string]string `json:"stream"`
|
||||||
|
Values []LogValue `json:"values"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type LogValue struct {
|
||||||
|
Time time.Time
|
||||||
|
Line string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *LogValue) MarshalJSON() ([]byte, error) {
|
||||||
|
line, err := json.Marshal(l.Line)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return []byte(fmt.Sprintf(`[%d,%s]`, l.Time.UnixNano(), string(line))), nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue