Lazy query. More error.
This commit is contained in:
parent
6632bcaccd
commit
92e3ea565b
|
@ -106,7 +106,9 @@ func (l *LokiSource) buildUrl() error {
|
||||||
}
|
}
|
||||||
buff.WriteByte('?')
|
buff.WriteByte('?')
|
||||||
params := url.Values{}
|
params := url.Values{}
|
||||||
params.Add("query", l.Config.Query)
|
if l.Config.Query != "" {
|
||||||
|
params.Add("query", l.Config.Query)
|
||||||
|
}
|
||||||
params.Add("limit", fmt.Sprintf("%d", lokiLimit))
|
params.Add("limit", fmt.Sprintf("%d", lokiLimit))
|
||||||
if l.Config.DelayFor != 0 {
|
if l.Config.DelayFor != 0 {
|
||||||
params.Add("delay_for", fmt.Sprintf("%d", int64(l.Config.DelayFor.Seconds())))
|
params.Add("delay_for", fmt.Sprintf("%d", int64(l.Config.DelayFor.Seconds())))
|
||||||
|
@ -118,6 +120,7 @@ func (l *LokiSource) buildUrl() error {
|
||||||
params.Add("start", fmt.Sprintf("%d", start.UnixNano()))
|
params.Add("start", fmt.Sprintf("%d", start.UnixNano()))
|
||||||
buff.WriteString(params.Encode())
|
buff.WriteString(params.Encode())
|
||||||
l.lokiWebsocket = buff.String()
|
l.lokiWebsocket = buff.String()
|
||||||
|
l.logger.Info("Websocket url : ", l.lokiWebsocket)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -191,7 +194,7 @@ func (l *LokiSource) readOneTail(resp Tail, out chan types.Event) {
|
||||||
func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
|
func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
|
||||||
err := l.ready()
|
err := l.ready()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "error while getting OneShotAcquisition")
|
return errors.Wrap(err, "error while getting StreamingAcquisition")
|
||||||
}
|
}
|
||||||
t.Go(func() error {
|
t.Go(func() error {
|
||||||
for {
|
for {
|
||||||
|
@ -200,8 +203,11 @@ func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) er
|
||||||
header := &http.Header{}
|
header := &http.Header{}
|
||||||
c, res, err := l.dialer.DialContext(ctx, l.lokiWebsocket, *header)
|
c, res, err := l.dialer.DialContext(ctx, l.lokiWebsocket, *header)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
buf, _ := ioutil.ReadAll(res.Body)
|
buf, err2 := ioutil.ReadAll(res.Body)
|
||||||
return fmt.Errorf("loki websocket (%s) error %v : %s", l.lokiWebsocket, err, string(buf))
|
if err2 == nil {
|
||||||
|
return fmt.Errorf("loki websocket (%s) error %v : %s", l.lokiWebsocket, err, string(buf))
|
||||||
|
}
|
||||||
|
return err2
|
||||||
}
|
}
|
||||||
defer c.Close()
|
defer c.Close()
|
||||||
var resp Tail
|
var resp Tail
|
||||||
|
|
Loading…
Reference in a new issue