From a7b1c02bd562f716ad0639d45cd46be094c8af5c Mon Sep 17 00:00:00 2001 From: Shivam Sandbhor Date: Fri, 22 Oct 2021 14:05:05 +0530 Subject: [PATCH] Fix bugs in cloudwatch acq (#991) * Fix bugs in cloudwatch acq - Fix concurrent writes to map streamIndexes - Fix multiple cases of modifying while iterating on slice. - Fix order of fetching cloudwatch events. - Remove `startup` hack. Signed-off-by: Shivam Sandbhor * Fix cloudwatch tests Signed-off-by: Shivam Sandbhor --- cmd/crowdsec/crowdsec.go | 2 +- .../modules/cloudwatch/cloudwatch.go | 38 ++++++++----------- .../modules/cloudwatch/cloudwatch_test.go | 10 ++--- 3 files changed, 22 insertions(+), 28 deletions(-) diff --git a/cmd/crowdsec/crowdsec.go b/cmd/crowdsec/crowdsec.go index d489c4d1b..ce10312c4 100644 --- a/cmd/crowdsec/crowdsec.go +++ b/cmd/crowdsec/crowdsec.go @@ -50,7 +50,7 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers) error { inputLineChan := make(chan types.Event) inputEventChan := make(chan types.Event) - //start go-routines for parsing, buckets pour and ouputs. + //start go-routines for parsing, buckets pour and outputs. parserWg := &sync.WaitGroup{} parsersTomb.Go(func() error { parserWg.Add(1) diff --git a/pkg/acquisition/modules/cloudwatch/cloudwatch.go b/pkg/acquisition/modules/cloudwatch/cloudwatch.go index 227989a6b..ae0d8bc3b 100644 --- a/pkg/acquisition/modules/cloudwatch/cloudwatch.go +++ b/pkg/acquisition/modules/cloudwatch/cloudwatch.go @@ -7,6 +7,7 @@ import ( "os" "regexp" "strings" + "sync" "time" "github.com/aws/aws-sdk-go/aws" @@ -32,6 +33,8 @@ var openedStreams = prometheus.NewGaugeVec( []string{"group"}, ) +var streamIndexMutex = sync.Mutex{} + var linesRead = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "cs_cloudwatch_stream_hits_total", @@ -366,14 +369,16 @@ func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outCha cw.monitoredStreams = append(cw.monitoredStreams, &newStream) } case <-pollDeadStreamInterval.C: + newMonitoredStreams := cw.monitoredStreams[:0] for idx, stream := range cw.monitoredStreams { if !cw.monitoredStreams[idx].t.Alive() { cw.logger.Debugf("remove dead stream %s", stream.StreamName) openedStreams.With(prometheus.Labels{"group": cw.monitoredStreams[idx].GroupName}).Dec() - cw.monitoredStreams = append(cw.monitoredStreams[:idx], cw.monitoredStreams[idx+1:]...) - break + } else { + newMonitoredStreams = append(newMonitoredStreams, stream) } } + cw.monitoredStreams = newMonitoredStreams case <-cw.t.Dying(): cw.logger.Infof("LogStreamManager for %s is dying, %d alive streams", cw.Config.GroupName, len(cw.monitoredStreams)) for idx, stream := range cw.monitoredStreams { @@ -383,10 +388,9 @@ func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outCha if err := cw.monitoredStreams[idx].t.Wait(); err != nil { cw.logger.Debugf("error while waiting for death of %s : %s", stream.StreamName, err) } - } else { - cw.monitoredStreams = append(cw.monitoredStreams[:idx], cw.monitoredStreams[idx+1:]...) } } + cw.monitoredStreams = nil cw.logger.Debugf("routine cleanup done, return") return nil } @@ -396,14 +400,14 @@ func (cw *CloudwatchSource) LogStreamManager(in chan LogStreamTailConfig, outCha func (cw *CloudwatchSource) TailLogStream(cfg *LogStreamTailConfig, outChan chan types.Event) error { var startFrom *string var lastReadMessage time.Time = time.Now() - startup := true - ticker := time.NewTicker(cfg.PollStreamInterval) //resume at existing index if we already had - if v, ok := cw.streamIndexes[cfg.GroupName+"+"+cfg.StreamName]; ok && v != "" { + streamIndexMutex.Lock() + v := cw.streamIndexes[cfg.GroupName+"+"+cfg.StreamName] + streamIndexMutex.Unlock() + if v != "" { cfg.logger.Debugf("restarting on index %s", v) startFrom = &v - startup = false } /*during first run, we want to avoid reading any message, but just get a token. if we don't, we might end up sending the same item several times. hence the 'startup' hack */ @@ -414,27 +418,23 @@ func (cw *CloudwatchSource) TailLogStream(cfg *LogStreamTailConfig, outChan chan hasMorePages := true for hasMorePages { /*for the first call, we only consume the last item*/ - limit := cfg.GetLogEventsPagesLimit - if startup { - limit = 1 - } cfg.logger.Tracef("calling GetLogEventsPagesWithContext") ctx := context.Background() err := cw.cwClient.GetLogEventsPagesWithContext(ctx, &cloudwatchlogs.GetLogEventsInput{ - Limit: aws.Int64(limit), + Limit: aws.Int64(cfg.GetLogEventsPagesLimit), LogGroupName: aws.String(cfg.GroupName), LogStreamName: aws.String(cfg.StreamName), NextToken: startFrom, + StartFromHead: aws.Bool(true), }, func(page *cloudwatchlogs.GetLogEventsOutput, lastPage bool) bool { cfg.logger.Tracef("%d results, last:%t", len(page.Events), lastPage) startFrom = page.NextForwardToken if page.NextForwardToken != nil { + streamIndexMutex.Lock() cw.streamIndexes[cfg.GroupName+"+"+cfg.StreamName] = *page.NextForwardToken - } - if startup { //we grab the NextForwardToken and we return on first iteration - return false + streamIndexMutex.Unlock() } if lastPage { /*wait another ticker to check on new log availability*/ cfg.logger.Tracef("last page") @@ -451,7 +451,6 @@ func (cw *CloudwatchSource) TailLogStream(cfg *LogStreamTailConfig, outChan chan cfg.logger.Debugf("pushing message : %s", evt.Line.Raw) linesRead.With(prometheus.Labels{"group": cfg.GroupName, "stream": cfg.StreamName}).Inc() outChan <- evt - } } return true @@ -462,11 +461,7 @@ func (cw *CloudwatchSource) TailLogStream(cfg *LogStreamTailConfig, outChan chan cfg.logger.Warningf("err : %s", newerr) return newerr } - if startup { - startup = false - } cfg.logger.Tracef("done reading GetLogEventsPagesWithContext") - if time.Since(lastReadMessage) > cfg.StreamReadTimeout { cfg.logger.Infof("%s/%s reached timeout (%s) (last message was %s)", cfg.GroupName, cfg.StreamName, time.Since(lastReadMessage), lastReadMessage) @@ -665,7 +660,6 @@ func cwLogToEvent(log *cloudwatchlogs.OutputLogEvent, cfg *LogStreamTailConfig) eventTimestamp := time.Unix(0, *log.Timestamp*int64(time.Millisecond)) msg = eventTimestamp.String() + " " + msg } - l.Raw = msg l.Labels = cfg.Labels l.Time = time.Now() diff --git a/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go b/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go index 7e12b276d..824f07695 100644 --- a/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go +++ b/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go @@ -253,8 +253,8 @@ stream_name: test_stream`), } }, - expectedResLen: 2, - expectedResMessages: []string{"test_message_4", "test_message_5"}, + expectedResLen: 3, + expectedResMessages: []string{"test_message_1", "test_message_4", "test_message_5"}, }, //have a stream generate events, reach time-out and gets polled again { @@ -345,8 +345,8 @@ stream_name: test_stream`), } }, - expectedResLen: 2, - expectedResMessages: []string{"test_message_41", "test_message_51"}, + expectedResLen: 3, + expectedResMessages: []string{"test_message_1", "test_message_41", "test_message_51"}, }, //have a stream generate events, reach time-out and dead body collection { @@ -406,7 +406,7 @@ stream_name: test_stream`), } }, - expectedResLen: 0, + expectedResLen: 1, }, }