Fix bugs in cloudwatch acq (#991)

* Fix bugs in cloudwatch acq

- Fix concurrent writes to map streamIndexes
- Fix multiple cases of modifying while iterating on slice.
- Fix order of fetching cloudwatch events.
- Remove `startup` hack.

Signed-off-by: Shivam Sandbhor <shivam.sandbhor@gmail.com>

* Fix cloudwatch tests

Signed-off-by: Shivam Sandbhor <shivam.sandbhor@gmail.com>
This commit is contained in:
Shivam Sandbhor 2021-10-22 14:05:05 +05:30 committed by GitHub
parent 0d075f32cd
commit a7b1c02bd5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 22 additions and 28 deletions

View file

@ -50,7 +50,7 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers) error {
inputLineChan := make(chan types.Event) inputLineChan := make(chan types.Event)
inputEventChan := make(chan types.Event) inputEventChan := make(chan types.Event)
//start go-routines for parsing, buckets pour and ouputs. //start go-routines for parsing, buckets pour and outputs.
parserWg := &sync.WaitGroup{} parserWg := &sync.WaitGroup{}
parsersTomb.Go(func() error { parsersTomb.Go(func() error {
parserWg.Add(1) parserWg.Add(1)

View file

@ -7,6 +7,7 @@ import (
"os" "os"
"regexp" "regexp"
"strings" "strings"
"sync"
"time" "time"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
@ -32,6 +33,8 @@ var openedStreams = prometheus.NewGaugeVec(
[]string{"group"}, []string{"group"},
) )
var streamIndexMutex = sync.Mutex{}
var linesRead = prometheus.NewCounterVec( var linesRead = prometheus.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
Name: "cs_cloudwatch_stream_hits_total", Name: "cs_cloudwatch_stream_hits_total",
@ -366,14 +369,16 @@ func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outCha
cw.monitoredStreams = append(cw.monitoredStreams, &newStream) cw.monitoredStreams = append(cw.monitoredStreams, &newStream)
} }
case <-pollDeadStreamInterval.C: case <-pollDeadStreamInterval.C:
newMonitoredStreams := cw.monitoredStreams[:0]
for idx, stream := range cw.monitoredStreams { for idx, stream := range cw.monitoredStreams {
if !cw.monitoredStreams[idx].t.Alive() { if !cw.monitoredStreams[idx].t.Alive() {
cw.logger.Debugf("remove dead stream %s", stream.StreamName) cw.logger.Debugf("remove dead stream %s", stream.StreamName)
openedStreams.With(prometheus.Labels{"group": cw.monitoredStreams[idx].GroupName}).Dec() openedStreams.With(prometheus.Labels{"group": cw.monitoredStreams[idx].GroupName}).Dec()
cw.monitoredStreams = append(cw.monitoredStreams[:idx], cw.monitoredStreams[idx+1:]...) } else {
break newMonitoredStreams = append(newMonitoredStreams, stream)
} }
} }
cw.monitoredStreams = newMonitoredStreams
case <-cw.t.Dying(): case <-cw.t.Dying():
cw.logger.Infof("LogStreamManager for %s is dying, %d alive streams", cw.Config.GroupName, len(cw.monitoredStreams)) cw.logger.Infof("LogStreamManager for %s is dying, %d alive streams", cw.Config.GroupName, len(cw.monitoredStreams))
for idx, stream := range cw.monitoredStreams { for idx, stream := range cw.monitoredStreams {
@ -383,10 +388,9 @@ func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outCha
if err := cw.monitoredStreams[idx].t.Wait(); err != nil { if err := cw.monitoredStreams[idx].t.Wait(); err != nil {
cw.logger.Debugf("error while waiting for death of %s : %s", stream.StreamName, err) cw.logger.Debugf("error while waiting for death of %s : %s", stream.StreamName, err)
} }
} else {
cw.monitoredStreams = append(cw.monitoredStreams[:idx], cw.monitoredStreams[idx+1:]...)
} }
} }
cw.monitoredStreams = nil
cw.logger.Debugf("routine cleanup done, return") cw.logger.Debugf("routine cleanup done, return")
return nil return nil
} }
@ -396,14 +400,14 @@ func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outCha
func (cw *CloudwatchSource) TailLogStream(cfg *LogStreamTailConfig, outChan chan types.Event) error { func (cw *CloudwatchSource) TailLogStream(cfg *LogStreamTailConfig, outChan chan types.Event) error {
var startFrom *string var startFrom *string
var lastReadMessage time.Time = time.Now() var lastReadMessage time.Time = time.Now()
startup := true
ticker := time.NewTicker(cfg.PollStreamInterval) ticker := time.NewTicker(cfg.PollStreamInterval)
//resume at existing index if we already had //resume at existing index if we already had
if v, ok := cw.streamIndexes[cfg.GroupName+"+"+cfg.StreamName]; ok && v != "" { streamIndexMutex.Lock()
v := cw.streamIndexes[cfg.GroupName+"+"+cfg.StreamName]
streamIndexMutex.Unlock()
if v != "" {
cfg.logger.Debugf("restarting on index %s", v) cfg.logger.Debugf("restarting on index %s", v)
startFrom = &v startFrom = &v
startup = false
} }
/*during first run, we want to avoid reading any message, but just get a token. /*during first run, we want to avoid reading any message, but just get a token.
if we don't, we might end up sending the same item several times. hence the 'startup' hack */ if we don't, we might end up sending the same item several times. hence the 'startup' hack */
@ -414,27 +418,23 @@ func (cw *CloudwatchSource) TailLogStream(cfg *LogStreamTailConfig, outChan chan
hasMorePages := true hasMorePages := true
for hasMorePages { for hasMorePages {
/*for the first call, we only consume the last item*/ /*for the first call, we only consume the last item*/
limit := cfg.GetLogEventsPagesLimit
if startup {
limit = 1
}
cfg.logger.Tracef("calling GetLogEventsPagesWithContext") cfg.logger.Tracef("calling GetLogEventsPagesWithContext")
ctx := context.Background() ctx := context.Background()
err := cw.cwClient.GetLogEventsPagesWithContext(ctx, err := cw.cwClient.GetLogEventsPagesWithContext(ctx,
&cloudwatchlogs.GetLogEventsInput{ &cloudwatchlogs.GetLogEventsInput{
Limit: aws.Int64(limit), Limit: aws.Int64(cfg.GetLogEventsPagesLimit),
LogGroupName: aws.String(cfg.GroupName), LogGroupName: aws.String(cfg.GroupName),
LogStreamName: aws.String(cfg.StreamName), LogStreamName: aws.String(cfg.StreamName),
NextToken: startFrom, NextToken: startFrom,
StartFromHead: aws.Bool(true),
}, },
func(page *cloudwatchlogs.GetLogEventsOutput, lastPage bool) bool { func(page *cloudwatchlogs.GetLogEventsOutput, lastPage bool) bool {
cfg.logger.Tracef("%d results, last:%t", len(page.Events), lastPage) cfg.logger.Tracef("%d results, last:%t", len(page.Events), lastPage)
startFrom = page.NextForwardToken startFrom = page.NextForwardToken
if page.NextForwardToken != nil { if page.NextForwardToken != nil {
streamIndexMutex.Lock()
cw.streamIndexes[cfg.GroupName+"+"+cfg.StreamName] = *page.NextForwardToken cw.streamIndexes[cfg.GroupName+"+"+cfg.StreamName] = *page.NextForwardToken
} streamIndexMutex.Unlock()
if startup { //we grab the NextForwardToken and we return on first iteration
return false
} }
if lastPage { /*wait another ticker to check on new log availability*/ if lastPage { /*wait another ticker to check on new log availability*/
cfg.logger.Tracef("last page") cfg.logger.Tracef("last page")
@ -451,7 +451,6 @@ func (cw *CloudwatchSource) TailLogStream(cfg *LogStreamTailConfig, outChan chan
cfg.logger.Debugf("pushing message : %s", evt.Line.Raw) cfg.logger.Debugf("pushing message : %s", evt.Line.Raw)
linesRead.With(prometheus.Labels{"group": cfg.GroupName, "stream": cfg.StreamName}).Inc() linesRead.With(prometheus.Labels{"group": cfg.GroupName, "stream": cfg.StreamName}).Inc()
outChan <- evt outChan <- evt
} }
} }
return true return true
@ -462,11 +461,7 @@ func (cw *CloudwatchSource) TailLogStream(cfg *LogStreamTailConfig, outChan chan
cfg.logger.Warningf("err : %s", newerr) cfg.logger.Warningf("err : %s", newerr)
return newerr return newerr
} }
if startup {
startup = false
}
cfg.logger.Tracef("done reading GetLogEventsPagesWithContext") cfg.logger.Tracef("done reading GetLogEventsPagesWithContext")
if time.Since(lastReadMessage) > cfg.StreamReadTimeout { if time.Since(lastReadMessage) > cfg.StreamReadTimeout {
cfg.logger.Infof("%s/%s reached timeout (%s) (last message was %s)", cfg.GroupName, cfg.StreamName, time.Since(lastReadMessage), cfg.logger.Infof("%s/%s reached timeout (%s) (last message was %s)", cfg.GroupName, cfg.StreamName, time.Since(lastReadMessage),
lastReadMessage) lastReadMessage)
@ -665,7 +660,6 @@ func cwLogToEvent(log *cloudwatchlogs.OutputLogEvent, cfg *LogStreamTailConfig)
eventTimestamp := time.Unix(0, *log.Timestamp*int64(time.Millisecond)) eventTimestamp := time.Unix(0, *log.Timestamp*int64(time.Millisecond))
msg = eventTimestamp.String() + " " + msg msg = eventTimestamp.String() + " " + msg
} }
l.Raw = msg l.Raw = msg
l.Labels = cfg.Labels l.Labels = cfg.Labels
l.Time = time.Now() l.Time = time.Now()

View file

@ -253,8 +253,8 @@ stream_name: test_stream`),
} }
}, },
expectedResLen: 2, expectedResLen: 3,
expectedResMessages: []string{"test_message_4", "test_message_5"}, expectedResMessages: []string{"test_message_1", "test_message_4", "test_message_5"},
}, },
//have a stream generate events, reach time-out and gets polled again //have a stream generate events, reach time-out and gets polled again
{ {
@ -345,8 +345,8 @@ stream_name: test_stream`),
} }
}, },
expectedResLen: 2, expectedResLen: 3,
expectedResMessages: []string{"test_message_41", "test_message_51"}, expectedResMessages: []string{"test_message_1", "test_message_41", "test_message_51"},
}, },
//have a stream generate events, reach time-out and dead body collection //have a stream generate events, reach time-out and dead body collection
{ {
@ -406,7 +406,7 @@ stream_name: test_stream`),
} }
}, },
expectedResLen: 0, expectedResLen: 1,
}, },
} }