Prometheus counter. Better URL handling. Streamin acquisition.

This commit is contained in:
Mathieu Lecarme 2022-06-06 16:33:21 +02:00 committed by lperdereau
parent bc6f327998
commit 437c2af8e2

View file

@ -30,12 +30,21 @@ const (
readyTimeout time.Duration = 3 * time.Second
readyLoop int = 3
readySleep time.Duration = 10 * time.Second
lokiLimit int = 100
)
var linesRead = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "cs_lokisource_hits_total",
Help: "Total lines that were read.",
},
[]string{"source"})
type LokiConfiguration struct {
configuration.DataSourceCommonCfg `yaml:",inline"`
URL string // websocket url
URL string // Loki url
Query string // LogQL query
DelayFor time.Duration
}
type LokiSource struct {
@ -47,11 +56,11 @@ type LokiSource struct {
}
func (l *LokiSource) GetMetrics() []prometheus.Collector {
return nil
return []prometheus.Collector{linesRead}
}
func (l *LokiSource) GetAggregMetrics() []prometheus.Collector {
return nil
return []prometheus.Collector{linesRead}
}
func (l *LokiSource) Configure(config []byte, logger *log.Entry) error {
@ -62,7 +71,7 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error {
return errors.Wrap(err, "Cannot parse LokiAcquisition configuration")
}
l.dialer = &websocket.Dialer{}
l.lokiWebsocket, l.lokiReady, err = websocketFromUrl(lokiConfig.URL)
err = l.buildUrl()
if err != nil {
return errors.Wrap(err, "Cannot parse Loki url")
}
@ -70,11 +79,12 @@ func (l *LokiSource) Configure(config []byte, logger *log.Entry) error {
return nil
}
func websocketFromUrl(lokiUrl string) (string, string, error) {
u, err := url.Parse(lokiUrl)
func (l *LokiSource) buildUrl() error {
u, err := url.Parse(l.config.URL)
if err != nil {
return "", "", errors.Wrap(err, "Cannot parse Loki URL")
return errors.Wrap(err, "Cannot parse Loki URL")
}
l.lokiReady = fmt.Sprintf("%s://%s/ready", u.Scheme, u.Host)
buff := bytes.Buffer{}
switch u.Scheme {
@ -83,7 +93,7 @@ func websocketFromUrl(lokiUrl string) (string, string, error) {
case "https":
buff.WriteString("wss")
default:
return "", "", fmt.Errorf("unknown scheme : %s", u.Scheme)
return fmt.Errorf("unknown scheme : %s", u.Scheme)
}
buff.WriteString("://")
buff.WriteString(u.Host)
@ -92,7 +102,18 @@ func websocketFromUrl(lokiUrl string) (string, string, error) {
} else {
buff.WriteString(u.Path)
}
return buff.String(), fmt.Sprintf("%s://%s/ready", u.Scheme, u.Host), nil
buff.WriteByte('?')
params := url.Values{}
params.Add("query", l.config.Query)
params.Add("limit", fmt.Sprintf("%d", lokiLimit))
if l.config.DelayFor != 0 {
params.Add("delay_for", fmt.Sprintf("%d", int64(l.config.DelayFor.Seconds())))
}
start := time.Now() // FIXME config
params.Add("start", fmt.Sprintf("%d", start.UnixNano()))
buff.WriteString(params.Encode())
l.lokiWebsocket = buff.String()
return nil
}
func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry) error {
@ -104,6 +125,7 @@ func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger
if !strings.HasPrefix(dsn, "loki://") {
return fmt.Errorf("invalid DSN %s for loki source, must start with loki://", dsn)
}
// FIXME DSN parsing
return nil
}
@ -134,24 +156,60 @@ func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) erro
if err != nil {
return errors.Wrap(err, "OneShotAcquisition error while reading JSON websocket")
}
ll := types.Line{}
ll.Raw = resp.Streams[0].Entries[0].Line
ll.Time = resp.Streams[0].Entries[0].Timestamp
ll.Src = l.lokiReady
ll.Labels = resp.Streams[0].Stream
ll.Process = true
ll.Module = l.GetName()
out <- types.Event{
Line: ll,
Process: true,
Type: types.LOG,
ExpectMode: leaky.TIMEMACHINE,
}
l.readOneTail(resp, out)
return nil
}
func (l *LokiSource) StreamingAcquisition(chan types.Event, *tomb.Tomb) error {
func (l *LokiSource) readOneTail(resp Tail, out chan types.Event) {
for _, stream := range resp.Streams {
for _, entry := range stream.Entries {
ll := types.Line{}
ll.Raw = entry.Line
ll.Time = entry.Timestamp
ll.Src = l.config.URL
ll.Labels = stream.Stream
ll.Process = true
ll.Module = l.GetName()
linesRead.With(prometheus.Labels{"source": l.config.URL}).Inc()
out <- types.Event{
Line: ll,
Process: true,
Type: types.LOG,
ExpectMode: leaky.TIMEMACHINE,
}
}
}
}
func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
err := l.ready()
if err != nil {
return errors.Wrap(err, "error while getting OneShotAcquisition")
}
t.Go(func() error {
for {
ctx, cancel := context.WithTimeout(context.TODO(), readyTimeout)
defer cancel()
header := &http.Header{}
c, res, err := l.dialer.DialContext(ctx, l.lokiWebsocket, *header)
if err != nil {
buf, _ := ioutil.ReadAll(res.Body)
return fmt.Errorf("loki websocket (%s) error %v : %s", l.lokiWebsocket, err, string(buf))
}
defer c.Close()
var resp Tail
for { // draining the websocket
err = c.ReadJSON(&resp)
if err != nil {
return errors.Wrap(err, "OneShotAcquisition error while reading JSON websocket")
}
l.readOneTail(resp, out)
}
}
return nil
})
return nil
}