diff --git a/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go b/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go index afe1d181b..1cabcc035 100644 --- a/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go +++ b/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go @@ -11,8 +11,10 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" + "github.com/crowdsecurity/crowdsec/pkg/cstest" "github.com/crowdsecurity/crowdsec/pkg/types" log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" "gopkg.in/tomb.v2" ) @@ -24,16 +26,31 @@ import ( - check shutdown/restart */ +func deleteAllLogGroups(t *testing.T, cw *CloudwatchSource) { + input := &cloudwatchlogs.DescribeLogGroupsInput{} + result, err := cw.cwClient.DescribeLogGroups(input) + require.NoError(t, err) + for _, group := range result.LogGroups { + _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{ + LogGroupName: group.LogGroupName, + }) + require.NoError(t, err) + } +} + func checkForLocalStackAvailability() error { - if v := os.Getenv("AWS_ENDPOINT_FORCE"); v != "" { - v = strings.TrimPrefix(v, "http://") - _, err := net.Dial("tcp", v) - if err != nil { - return fmt.Errorf("while dialing %s : %s : aws endpoint isn't available", v, err) - } - } else { + v := os.Getenv("AWS_ENDPOINT_FORCE") + if v == "" { return fmt.Errorf("missing aws endpoint for tests : AWS_ENDPOINT_FORCE") } + + v = strings.TrimPrefix(v, "http://") + + _, err := net.Dial("tcp", v) + if err != nil { + return fmt.Errorf("while dialing %s : %s : aws endpoint isn't available", v, err) + } + return nil } @@ -56,20 +73,19 @@ func TestWatchLogGroupForStreams(t *testing.T) { if runtime.GOOS == "windows" { t.Skip("Skipping test on windows") } - var err error log.SetLevel(log.DebugLevel) tests := []struct { config []byte expectedCfgErr string expectedStartErr string name string - pre func(*CloudwatchSource) - run func(*CloudwatchSource) - post func(*CloudwatchSource) + setup func(*testing.T, *CloudwatchSource) + run func(*testing.T, *CloudwatchSource) + teardown func(*testing.T, *CloudwatchSource) expectedResLen int expectedResMessages []string }{ - //require a group name that doesn't exist + // require a group name that doesn't exist { name: "group_does_not_exists", config: []byte(` @@ -80,22 +96,21 @@ labels: group_name: b stream_name: test_stream`), expectedStartErr: "The specified log group does not exist", - pre: func(cw *CloudwatchSource) { - if _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ + setup: func(t *testing.T, cw *CloudwatchSource) { + deleteAllLogGroups(t, cw) + _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ LogGroupName: aws.String("test_group_not_used_1"), - }); err != nil { - t.Fatalf("failed to create log group : %s", err) - } + }) + require.NoError(t, err) }, - post: func(cw *CloudwatchSource) { - if _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{ + teardown: func(t *testing.T, cw *CloudwatchSource) { + _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{ LogGroupName: aws.String("test_group_not_used_1"), - }); err != nil { - t.Fatalf("failed to delete log group : %s", err) - } + }) + require.NoError(t, err) }, }, - //test stream mismatch + // test stream mismatch { name: "group_exists_bad_stream_name", config: []byte(` @@ -105,42 +120,41 @@ labels: type: test_source group_name: test_group1 stream_name: test_stream_bad`), - pre: func(cw *CloudwatchSource) { - if _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ + setup: func(t *testing.T, cw *CloudwatchSource) { + deleteAllLogGroups(t, cw) + _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ LogGroupName: aws.String("test_group1"), - }); err != nil { - t.Fatalf("failed to create log group : %s", err) - } - if _, err := cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ + }) + require.NoError(t, err) + + _, err = cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ LogGroupName: aws.String("test_group1"), LogStreamName: aws.String("test_stream"), - }); err != nil { - t.Fatalf("failed to create log stream : %s", err) - } - //have a message before we start - won't be popped, but will trigger stream monitoring - if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ + }) + require.NoError(t, err) + + // have a message before we start - won't be popped, but will trigger stream monitoring + _, err = cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ LogGroupName: aws.String("test_group1"), LogStreamName: aws.String("test_stream"), LogEvents: []*cloudwatchlogs.InputLogEvent{ - &cloudwatchlogs.InputLogEvent{ + { Message: aws.String("test_message_1"), Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000), }, }, - }); err != nil { - log.Fatalf("failed to put logs") - } + }) + require.NoError(t, err) }, - post: func(cw *CloudwatchSource) { - if _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{ + teardown: func(t *testing.T, cw *CloudwatchSource) { + _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{ LogGroupName: aws.String("test_group1"), - }); err != nil { - t.Fatalf("failed to delete log group : %s", err) - } + }) + require.NoError(t, err) }, expectedResLen: 0, }, - //test stream mismatch + // test stream mismatch { name: "group_exists_bad_stream_regexp", config: []byte(` @@ -150,44 +164,41 @@ labels: type: test_source group_name: test_group1 stream_regexp: test_bad[0-9]+`), - pre: func(cw *CloudwatchSource) { - if _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ + setup: func(t *testing.T, cw *CloudwatchSource) { + deleteAllLogGroups(t, cw) + _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ LogGroupName: aws.String("test_group1"), - }); err != nil { - t.Fatalf("failed to create log group : %s", err) - } - if _, err := cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ + }) + require.NoError(t, err) + + _, err = cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ LogGroupName: aws.String("test_group1"), LogStreamName: aws.String("test_stream"), - }); err != nil { - t.Fatalf("failed to create log stream : %s", err) + }) + require.NoError(t, err) - } - //have a message before we start - won't be popped, but will trigger stream monitoring - if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ + // have a message before we start - won't be popped, but will trigger stream monitoring + _, err = cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ LogGroupName: aws.String("test_group1"), LogStreamName: aws.String("test_stream"), LogEvents: []*cloudwatchlogs.InputLogEvent{ - &cloudwatchlogs.InputLogEvent{ + { Message: aws.String("test_message_1"), Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000), }, }, - }); err != nil { - t.Fatalf("failed to put logs") - } + }) + require.NoError(t, err) }, - post: func(cw *CloudwatchSource) { - if _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{ + teardown: func(t *testing.T, cw *CloudwatchSource) { + _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{ LogGroupName: aws.String("test_group1"), - }); err != nil { - t.Fatalf("failed to delete log group : %s", err) - - } + }) + require.NoError(t, err) }, expectedResLen: 0, }, - //require a group name that does exist and contains a stream in which we gonna put events + // require a group name that does exist and contains a stream in which we gonna put events { name: "group_exists_stream_exists_has_events", config: []byte(` @@ -198,76 +209,70 @@ labels: group_name: test_log_group1 log_level: trace stream_name: test_stream`), - //expectedStartErr: "The specified log group does not exist", - pre: func(cw *CloudwatchSource) { - if _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ + // expectedStartErr: "The specified log group does not exist", + setup: func(t *testing.T, cw *CloudwatchSource) { + deleteAllLogGroups(t, cw) + _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ LogGroupName: aws.String("test_log_group1"), - }); err != nil { - t.Fatalf("failed to create log group : %s", err) + }) + require.NoError(t, err) - } - if _, err := cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ + _, err = cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ LogGroupName: aws.String("test_log_group1"), LogStreamName: aws.String("test_stream"), - }); err != nil { - t.Fatalf("failed to create log stream : %s", err) + }) + require.NoError(t, err) - } - //have a message before we start - won't be popped, but will trigger stream monitoring - if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ + // have a message before we start - won't be popped, but will trigger stream monitoring + _, err = cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ LogGroupName: aws.String("test_log_group1"), LogStreamName: aws.String("test_stream"), LogEvents: []*cloudwatchlogs.InputLogEvent{ - &cloudwatchlogs.InputLogEvent{ + { Message: aws.String("test_message_1"), Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000), }, }, - }); err != nil { - t.Fatalf("failed to put logs") - } + }) + require.NoError(t, err) }, - run: func(cw *CloudwatchSource) { - //wait for new stream pickup + stream poll interval + run: func(t *testing.T, cw *CloudwatchSource) { + // wait for new stream pickup + stream poll interval time.Sleep(def_PollNewStreamInterval + (1 * time.Second)) time.Sleep(def_PollStreamInterval + (1 * time.Second)) - if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ + _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ LogGroupName: aws.String("test_log_group1"), LogStreamName: aws.String("test_stream"), LogEvents: []*cloudwatchlogs.InputLogEvent{ - &cloudwatchlogs.InputLogEvent{ + { Message: aws.String("test_message_4"), Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000), }, - //and add an event in the future that will be popped - &cloudwatchlogs.InputLogEvent{ + // and add an event in the future that will be popped + { Message: aws.String("test_message_5"), Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000), }, }, - }); err != nil { - t.Fatalf("failed to put logs : %s", err) - } + }) + require.NoError(t, err) }, - post: func(cw *CloudwatchSource) { - if _, err := cw.cwClient.DeleteLogStream(&cloudwatchlogs.DeleteLogStreamInput{ + teardown: func(t *testing.T, cw *CloudwatchSource) { + _, err := cw.cwClient.DeleteLogStream(&cloudwatchlogs.DeleteLogStreamInput{ LogGroupName: aws.String("test_log_group1"), LogStreamName: aws.String("test_stream"), - }); err != nil { - t.Fatalf("failed to delete log stream : %s", err) + }) + require.NoError(t, err) - } - if _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{ + _, err = cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{ LogGroupName: aws.String("test_log_group1"), - }); err != nil { - t.Fatalf("failed to delete log group : %s", err) - - } + }) + require.NoError(t, err) }, expectedResLen: 3, expectedResMessages: []string{"test_message_1", "test_message_4", "test_message_5"}, }, - //have a stream generate events, reach time-out and gets polled again + // have a stream generate events, reach time-out and gets polled again { name: "group_exists_stream_exists_has_events+timeout", config: []byte(` @@ -278,89 +283,83 @@ labels: group_name: test_log_group1 log_level: trace stream_name: test_stream`), - //expectedStartErr: "The specified log group does not exist", - pre: func(cw *CloudwatchSource) { - if _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ + // expectedStartErr: "The specified log group does not exist", + setup: func(t *testing.T, cw *CloudwatchSource) { + deleteAllLogGroups(t, cw) + _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ LogGroupName: aws.String("test_log_group1"), - }); err != nil { - t.Fatalf("failed to create log group : %s", err) + }) + require.NoError(t, err) - } - if _, err := cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ + _, err = cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ LogGroupName: aws.String("test_log_group1"), LogStreamName: aws.String("test_stream"), - }); err != nil { - t.Fatalf("failed to create log stream : %s", err) - } - //have a message before we start - won't be popped, but will trigger stream monitoring - if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ + }) + require.NoError(t, err) + + // have a message before we start - won't be popped, but will trigger stream monitoring + _, err = cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ LogGroupName: aws.String("test_log_group1"), LogStreamName: aws.String("test_stream"), LogEvents: []*cloudwatchlogs.InputLogEvent{ - &cloudwatchlogs.InputLogEvent{ + { Message: aws.String("test_message_1"), Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000), }, }, - }); err != nil { - t.Fatalf("failed to put logs") - } + }) + require.NoError(t, err) }, - run: func(cw *CloudwatchSource) { - //wait for new stream pickup + stream poll interval + run: func(t *testing.T, cw *CloudwatchSource) { + // wait for new stream pickup + stream poll interval time.Sleep(def_PollNewStreamInterval + (1 * time.Second)) time.Sleep(def_PollStreamInterval + (1 * time.Second)) - //send some events - if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ + // send some events + _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ LogGroupName: aws.String("test_log_group1"), LogStreamName: aws.String("test_stream"), LogEvents: []*cloudwatchlogs.InputLogEvent{ - &cloudwatchlogs.InputLogEvent{ + { Message: aws.String("test_message_41"), Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000), }, }, - }); err != nil { - t.Fatalf("failed to put logs : %s", err) - } - //wait for the stream to time-out + }) + require.NoError(t, err) + // wait for the stream to time-out time.Sleep(def_StreamReadTimeout + (1 * time.Second)) - //and send events again - if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ + // and send events again + _, err = cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ LogGroupName: aws.String("test_log_group1"), LogStreamName: aws.String("test_stream"), LogEvents: []*cloudwatchlogs.InputLogEvent{ - &cloudwatchlogs.InputLogEvent{ + { Message: aws.String("test_message_51"), Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000), }, }, - }); err != nil { - t.Fatalf("failed to put logs : %s", err) - } - //wait for new stream pickup + stream poll interval + }) + require.NoError(t, err) + // wait for new stream pickup + stream poll interval time.Sleep(def_PollNewStreamInterval + (1 * time.Second)) time.Sleep(def_PollStreamInterval + (1 * time.Second)) }, - post: func(cw *CloudwatchSource) { - if _, err := cw.cwClient.DeleteLogStream(&cloudwatchlogs.DeleteLogStreamInput{ + teardown: func(t *testing.T, cw *CloudwatchSource) { + _, err := cw.cwClient.DeleteLogStream(&cloudwatchlogs.DeleteLogStreamInput{ LogGroupName: aws.String("test_log_group1"), LogStreamName: aws.String("test_stream"), - }); err != nil { - t.Fatalf("failed to delete log stream : %s", err) + }) + require.NoError(t, err) - } - if _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{ + _, err = cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{ LogGroupName: aws.String("test_log_group1"), - }); err != nil { - t.Fatalf("failed to delete log group : %s", err) - - } + }) + require.NoError(t, err) }, expectedResLen: 3, expectedResMessages: []string{"test_message_1", "test_message_41", "test_message_51"}, }, - //have a stream generate events, reach time-out and dead body collection + // have a stream generate events, reach time-out and dead body collection { name: "group_exists_stream_exists_has_events+timeout+GC", config: []byte(` @@ -371,157 +370,142 @@ labels: group_name: test_log_group1 log_level: trace stream_name: test_stream`), - //expectedStartErr: "The specified log group does not exist", - pre: func(cw *CloudwatchSource) { - if _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ + // expectedStartErr: "The specified log group does not exist", + setup: func(t *testing.T, cw *CloudwatchSource) { + deleteAllLogGroups(t, cw) + _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ LogGroupName: aws.String("test_log_group1"), - }); err != nil { - t.Fatalf("failed to create log group : %s", err) - } - if _, err := cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ + }) + require.NoError(t, err) + + _, err = cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ LogGroupName: aws.String("test_log_group1"), LogStreamName: aws.String("test_stream"), - }); err != nil { - t.Fatalf("failed to create log stream : %s", err) - } - //have a message before we start - won't be popped, but will trigger stream monitoring - if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ + }) + require.NoError(t, err) + + // have a message before we start - won't be popped, but will trigger stream monitoring + _, err = cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ LogGroupName: aws.String("test_log_group1"), LogStreamName: aws.String("test_stream"), LogEvents: []*cloudwatchlogs.InputLogEvent{ - &cloudwatchlogs.InputLogEvent{ + { Message: aws.String("test_message_1"), Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000), }, }, - }); err != nil { - t.Fatalf("failed to put logs") - } + }) + require.NoError(t, err) }, - run: func(cw *CloudwatchSource) { - //wait for new stream pickup + stream poll interval + run: func(t *testing.T, cw *CloudwatchSource) { + // wait for new stream pickup + stream poll interval time.Sleep(def_PollNewStreamInterval + (1 * time.Second)) time.Sleep(def_PollStreamInterval + (1 * time.Second)) time.Sleep(def_PollDeadStreamInterval + (1 * time.Second)) }, - post: func(cw *CloudwatchSource) { - if _, err := cw.cwClient.DeleteLogStream(&cloudwatchlogs.DeleteLogStreamInput{ + teardown: func(t *testing.T, cw *CloudwatchSource) { + _, err := cw.cwClient.DeleteLogStream(&cloudwatchlogs.DeleteLogStreamInput{ LogGroupName: aws.String("test_log_group1"), LogStreamName: aws.String("test_stream"), - }); err != nil { - t.Fatalf("failed to delete log stream : %s", err) + }) + require.NoError(t, err) - } - if _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{ + _, err = cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{ LogGroupName: aws.String("test_log_group1"), - }); err != nil { - t.Fatalf("failed to delete log stream : %s", err) - - } + }) + require.NoError(t, err) }, expectedResLen: 1, }, } - for _, test := range tests { - dbgLogger := log.New().WithField("test", test.name) - dbgLogger.Logger.SetLevel(log.DebugLevel) - dbgLogger.Infof("starting test") - cw := CloudwatchSource{} - err = cw.Configure(test.config, dbgLogger) - if err != nil && test.expectedCfgErr != "" { - if !strings.Contains(err.Error(), test.expectedCfgErr) { - t.Fatalf("%s expected error '%s' got error '%s'", test.name, test.expectedCfgErr, err) + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + dbgLogger := log.New().WithField("test", tc.name) + dbgLogger.Logger.SetLevel(log.DebugLevel) + dbgLogger.Infof("starting test") + cw := CloudwatchSource{} + err := cw.Configure(tc.config, dbgLogger) + cstest.RequireErrorContains(t, err, tc.expectedCfgErr) + + if tc.expectedCfgErr != "" { + return } - log.Debugf("got expected error : %s", err) - continue - } else if err != nil && test.expectedCfgErr == "" { - t.Fatalf("%s unexpected error : %s", test.name, err) - continue - } else if test.expectedCfgErr != "" && err == nil { - t.Fatalf("%s expected error '%s', got none", test.name, test.expectedCfgErr) - continue - } - dbgLogger.Infof("config done test") - //run pre-routine : tests use it to set group & streams etc. - if test.pre != nil { - test.pre(&cw) - } - out := make(chan types.Event) - tmb := tomb.Tomb{} - var rcvd_evts []types.Event - dbgLogger.Infof("running StreamingAcquisition") - actmb := tomb.Tomb{} - actmb.Go(func() error { - err := cw.StreamingAcquisition(out, &actmb) - dbgLogger.Infof("acquis done") - - if err != nil && test.expectedStartErr != "" && !strings.Contains(err.Error(), test.expectedStartErr) { - t.Fatalf("%s expected error '%s' got '%s'", test.name, test.expectedStartErr, err) - } else if err != nil && test.expectedStartErr == "" { - t.Fatalf("%s unexpected error '%s'", test.name, err) - } else if err == nil && test.expectedStartErr != "" { - t.Fatalf("%s expected error '%s' got none", test.name, err) + // run pre-routine : tests use it to set group & streams etc. + if tc.setup != nil { + tc.setup(t, &cw) } - return nil - }) + out := make(chan types.Event) + tmb := tomb.Tomb{} + var rcvdEvts []types.Event - //let's empty output chan - tmb.Go(func() error { - for { - select { - case in := <-out: - log.Debugf("received event %+v", in) - rcvd_evts = append(rcvd_evts, in) - case <-tmb.Dying(): - log.Debugf("pumper died") - return nil + dbgLogger.Infof("running StreamingAcquisition") + actmb := tomb.Tomb{} + actmb.Go(func() error { + err := cw.StreamingAcquisition(out, &actmb) + dbgLogger.Infof("acquis done") + cstest.RequireErrorContains(t, err, tc.expectedStartErr) + return nil + }) + + // let's empty output chan + tmb.Go(func() error { + for { + select { + case in := <-out: + log.Debugf("received event %+v", in) + rcvdEvts = append(rcvdEvts, in) + case <-tmb.Dying(): + log.Debugf("pumper died") + return nil + } } + }) + + if tc.run != nil { + tc.run(t, &cw) + } else { + dbgLogger.Warning("no code to run") + } + + time.Sleep(5 * time.Second) + dbgLogger.Infof("killing collector") + tmb.Kill(nil) + <-tmb.Dead() + dbgLogger.Infof("killing datasource") + actmb.Kill(nil) + <-actmb.Dead() + // dbgLogger.Infof("collected events : %d -> %+v", len(rcvd_evts), rcvd_evts) + // check results + if tc.expectedResLen != -1 { + if tc.expectedResLen != len(rcvdEvts) { + t.Fatalf("%s : expected %d results got %d -> %v", tc.name, tc.expectedResLen, len(rcvdEvts), rcvdEvts) + } + dbgLogger.Debugf("got %d expected messages", len(rcvdEvts)) + } + if len(tc.expectedResMessages) != 0 { + res := tc.expectedResMessages + for idx, v := range rcvdEvts { + if len(res) == 0 { + t.Fatalf("result %d/%d : received '%s', didn't expect anything (recvd:%d, expected:%d)", idx, len(rcvdEvts), v.Line.Raw, len(rcvdEvts), len(tc.expectedResMessages)) + } + if res[0] != v.Line.Raw { + t.Fatalf("result %d/%d : expected '%s', received '%s' (recvd:%d, expected:%d)", idx, len(rcvdEvts), res[0], v.Line.Raw, len(rcvdEvts), len(tc.expectedResMessages)) + } + dbgLogger.Debugf("got message '%s'", res[0]) + res = res[1:] + } + if len(res) != 0 { + t.Fatalf("leftover unmatched results : %v", res) + } + + } + if tc.teardown != nil { + tc.teardown(t, &cw) } }) - - if test.run != nil { - test.run(&cw) - } else { - dbgLogger.Warning("no run code") - } - - time.Sleep(5 * time.Second) - dbgLogger.Infof("killing collector") - tmb.Kill(nil) - <-tmb.Dead() - dbgLogger.Infof("killing datasource") - actmb.Kill(nil) - <-actmb.Dead() - //dbgLogger.Infof("collected events : %d -> %+v", len(rcvd_evts), rcvd_evts) - //check results - if test.expectedResLen != -1 { - if test.expectedResLen != len(rcvd_evts) { - t.Fatalf("%s : expected %d results got %d -> %v", test.name, test.expectedResLen, len(rcvd_evts), rcvd_evts) - } - dbgLogger.Debugf("got %d expected messages", len(rcvd_evts)) - } - if len(test.expectedResMessages) != 0 { - res := test.expectedResMessages - for idx, v := range rcvd_evts { - if len(res) == 0 { - t.Fatalf("result %d/%d : received '%s', didn't expect anything (recvd:%d, expected:%d)", idx, len(rcvd_evts), v.Line.Raw, len(rcvd_evts), len(test.expectedResMessages)) - } - if res[0] != v.Line.Raw { - t.Fatalf("result %d/%d : expected '%s', received '%s' (recvd:%d, expected:%d)", idx, len(rcvd_evts), res[0], v.Line.Raw, len(rcvd_evts), len(test.expectedResMessages)) - } - dbgLogger.Debugf("got message '%s'", res[0]) - res = res[1:] - } - if len(res) != 0 { - t.Fatalf("leftover unmatched results : %v", res) - } - - } - if test.post != nil { - test.post(&cw) - } } } @@ -529,7 +513,6 @@ func TestConfiguration(t *testing.T) { if runtime.GOOS == "windows" { t.Skip("Skipping test on windows") } - var err error log.SetLevel(log.DebugLevel) tests := []struct { config []byte @@ -569,47 +552,35 @@ stream_name: test_stream`), }, } - for idx, test := range tests { - dbgLogger := log.New().WithField("test", test.name) - dbgLogger.Logger.SetLevel(log.DebugLevel) - log.Printf("%d/%d", idx, len(tests)) - cw := CloudwatchSource{} - err = cw.Configure(test.config, dbgLogger) - if err != nil && test.expectedCfgErr != "" { - if !strings.Contains(err.Error(), test.expectedCfgErr) { - t.Fatalf("%s expected error '%s' got error '%s'", test.name, test.expectedCfgErr, err) + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + dbgLogger := log.New().WithField("test", tc.name) + dbgLogger.Logger.SetLevel(log.DebugLevel) + cw := CloudwatchSource{} + err := cw.Configure(tc.config, dbgLogger) + cstest.RequireErrorContains(t, err, tc.expectedCfgErr) + if tc.expectedCfgErr != "" { + return } - log.Debugf("got expected error : %s", err) - continue - } else if err != nil && test.expectedCfgErr == "" { - t.Fatalf("%s unexpected error : %s", test.name, err) - continue - } else if test.expectedCfgErr != "" && err == nil { - t.Fatalf("%s expected error '%s', got none", test.name, test.expectedCfgErr) - continue - } - out := make(chan types.Event) - tmb := tomb.Tomb{} - switch cw.GetMode() { - case "tail": - err = cw.StreamingAcquisition(out, &tmb) - case "cat": - err = cw.OneShotAcquisition(out, &tmb) - } - if err != nil && test.expectedStartErr != "" && !strings.Contains(err.Error(), test.expectedStartErr) { - t.Fatalf("%s expected error '%s' got '%s'", test.name, test.expectedStartErr, err) - } else if err != nil && test.expectedStartErr == "" { - t.Fatalf("%s unexpected error '%s'", test.name, err) - } else if err == nil && test.expectedStartErr != "" { - t.Fatalf("%s expected error '%s' got none", test.name, err) - } + out := make(chan types.Event) + tmb := tomb.Tomb{} - log.Debugf("killing ...") - tmb.Kill(nil) - <-tmb.Dead() - log.Debugf("dead :)") + switch cw.GetMode() { + case "tail": + err = cw.StreamingAcquisition(out, &tmb) + case "cat": + err = cw.OneShotAcquisition(out, &tmb) + } + cstest.RequireErrorContains(t, err, tc.expectedStartErr) + + log.Debugf("killing ...") + tmb.Kill(nil) + <-tmb.Dead() + log.Debugf("dead :)") + }) } } @@ -617,7 +588,6 @@ func TestConfigureByDSN(t *testing.T) { if runtime.GOOS == "windows" { t.Skip("Skipping test on windows") } - var err error log.SetLevel(log.DebugLevel) tests := []struct { dsn string @@ -633,12 +603,12 @@ func TestConfigureByDSN(t *testing.T) { { name: "backlog", dsn: "cloudwatch://bad_log_group:bad_stream_name?backlog=30m&log_level=info&profile=test", - //expectedCfgErr: "query is mandatory (at least start_date and end_date or backlog)", + // expectedCfgErr: "query is mandatory (at least start_date and end_date or backlog)", }, { name: "start_date/end_date", dsn: "cloudwatch://bad_log_group:bad_stream_name?start_date=2021/05/15 14:04&end_date=2021/05/15 15:04", - //expectedCfgErr: "query is mandatory (at least start_date and end_date or backlog)", + // expectedCfgErr: "query is mandatory (at least start_date and end_date or backlog)", }, { name: "bad_log_level", @@ -647,25 +617,15 @@ func TestConfigureByDSN(t *testing.T) { }, } - for idx, test := range tests { - dbgLogger := log.New().WithField("test", test.name) - dbgLogger.Logger.SetLevel(log.DebugLevel) - log.Printf("%d/%d", idx, len(tests)) - cw := CloudwatchSource{} - err = cw.ConfigureByDSN(test.dsn, test.labels, dbgLogger) - if err != nil && test.expectedCfgErr != "" { - if !strings.Contains(err.Error(), test.expectedCfgErr) { - t.Fatalf("%s expected error '%s' got error '%s'", test.name, test.expectedCfgErr, err) - } - log.Debugf("got expected error : %s", err) - continue - } else if err != nil && test.expectedCfgErr == "" { - t.Fatalf("%s unexpected error : %s", test.name, err) - continue - } else if test.expectedCfgErr != "" && err == nil { - t.Fatalf("%s expected error '%s', got none", test.name, test.expectedCfgErr) - continue - } + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + dbgLogger := log.New().WithField("test", tc.name) + dbgLogger.Logger.SetLevel(log.DebugLevel) + cw := CloudwatchSource{} + err := cw.ConfigureByDSN(tc.dsn, tc.labels, dbgLogger) + cstest.RequireErrorContains(t, err, tc.expectedCfgErr) + }) } } @@ -673,212 +633,199 @@ func TestOneShotAcquisition(t *testing.T) { if runtime.GOOS == "windows" { t.Skip("Skipping test on windows") } - var err error log.SetLevel(log.DebugLevel) tests := []struct { dsn string expectedCfgErr string expectedStartErr string name string - pre func(*CloudwatchSource) - run func(*CloudwatchSource) - post func(*CloudwatchSource) + setup func(*testing.T, *CloudwatchSource) + run func(*testing.T, *CloudwatchSource) + teardown func(*testing.T, *CloudwatchSource) expectedResLen int expectedResMessages []string }{ - //stream with no data + // stream with no data { name: "empty_stream", dsn: "cloudwatch://test_log_group1:test_stream?backlog=1h", - //expectedStartErr: "The specified log group does not exist", - pre: func(cw *CloudwatchSource) { - cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ + // expectedStartErr: "The specified log group does not exist", + setup: func(t *testing.T, cw *CloudwatchSource) { + deleteAllLogGroups(t, cw) + _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ LogGroupName: aws.String("test_log_group1"), }) - cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ + require.NoError(t, err) + + _, err = cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ LogGroupName: aws.String("test_log_group1"), LogStreamName: aws.String("test_stream"), }) + require.NoError(t, err) }, - post: func(cw *CloudwatchSource) { - cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{ + teardown: func(t *testing.T, cw *CloudwatchSource) { + _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{ LogGroupName: aws.String("test_log_group1"), }) + require.NoError(t, err) }, expectedResLen: 0, }, - //stream with one event + // stream with one event { name: "get_one_event", dsn: "cloudwatch://test_log_group1:test_stream?backlog=1h", - //expectedStartErr: "The specified log group does not exist", - pre: func(cw *CloudwatchSource) { - if _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ + // expectedStartErr: "The specified log group does not exist", + setup: func(t *testing.T, cw *CloudwatchSource) { + deleteAllLogGroups(t, cw) + _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ LogGroupName: aws.String("test_log_group1"), - }); err != nil { - t.Fatalf("error while CreateLogGroup") - } - if _, err := cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ + }) + require.NoError(t, err) + + _, err = cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ LogGroupName: aws.String("test_log_group1"), LogStreamName: aws.String("test_stream"), - }); err != nil { - t.Fatalf("error while CreateLogStream") + }) + require.NoError(t, err) - } - //this one is too much in the back - if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ + // this one is too much in the back + _, err = cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ LogGroupName: aws.String("test_log_group1"), LogStreamName: aws.String("test_stream"), LogEvents: []*cloudwatchlogs.InputLogEvent{ - &cloudwatchlogs.InputLogEvent{ + { Message: aws.String("test_message_1"), Timestamp: aws.Int64(time.Now().UTC().Add(-(2 * time.Hour)).UTC().Unix() * 1000), }, }, - }); err != nil { - log.Fatalf("failed to put logs") - } + }) + require.NoError(t, err) - //this one can be read - if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ + // this one can be read + _, err = cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ LogGroupName: aws.String("test_log_group1"), LogStreamName: aws.String("test_stream"), LogEvents: []*cloudwatchlogs.InputLogEvent{ - &cloudwatchlogs.InputLogEvent{ + { Message: aws.String("test_message_2"), Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000), }, }, - }); err != nil { - log.Fatalf("failed to put logs") - } + }) + require.NoError(t, err) - //this one is in the past - if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ + // this one is in the past + _, err = cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ LogGroupName: aws.String("test_log_group1"), LogStreamName: aws.String("test_stream"), LogEvents: []*cloudwatchlogs.InputLogEvent{ - &cloudwatchlogs.InputLogEvent{ + { Message: aws.String("test_message_3"), Timestamp: aws.Int64(time.Now().UTC().Add(-(3 * time.Hour)).UTC().Unix() * 1000), }, }, - }); err != nil { - log.Fatalf("failed to put logs") - } + }) + require.NoError(t, err) }, - post: func(cw *CloudwatchSource) { - if _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{ + teardown: func(t *testing.T, cw *CloudwatchSource) { + _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{ LogGroupName: aws.String("test_log_group1"), - }); err != nil { - t.Fatalf("failed to delete") - } + }) + require.NoError(t, err) }, expectedResLen: 1, expectedResMessages: []string{"test_message_2"}, }, } - for _, test := range tests { - dbgLogger := log.New().WithField("test", test.name) - dbgLogger.Logger.SetLevel(log.DebugLevel) - dbgLogger.Infof("starting test") - cw := CloudwatchSource{} - err = cw.ConfigureByDSN(test.dsn, map[string]string{"type": "test"}, dbgLogger) - if err != nil && test.expectedCfgErr != "" { - if !strings.Contains(err.Error(), test.expectedCfgErr) { - t.Fatalf("%s expected error '%s' got error '%s'", test.name, test.expectedCfgErr, err) + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + dbgLogger := log.New().WithField("test", tc.name) + dbgLogger.Logger.SetLevel(log.DebugLevel) + dbgLogger.Infof("starting test") + cw := CloudwatchSource{} + err := cw.ConfigureByDSN(tc.dsn, map[string]string{"type": "test"}, dbgLogger) + cstest.RequireErrorContains(t, err, tc.expectedCfgErr) + if tc.expectedCfgErr != "" { + return } - log.Debugf("got expected error : %s", err) - continue - } else if err != nil && test.expectedCfgErr == "" { - t.Fatalf("%s unexpected error : %s", test.name, err) - continue - } else if test.expectedCfgErr != "" && err == nil { - t.Fatalf("%s expected error '%s', got none", test.name, test.expectedCfgErr) - continue - } - dbgLogger.Infof("config done test") - //run pre-routine : tests use it to set group & streams etc. - if test.pre != nil { - test.pre(&cw) - } - out := make(chan types.Event) - tmb := tomb.Tomb{} - var rcvd_evts []types.Event - dbgLogger.Infof("running StreamingAcquisition") - actmb := tomb.Tomb{} - actmb.Go(func() error { - err := cw.OneShotAcquisition(out, &actmb) - dbgLogger.Infof("acquis done") - - if err != nil && test.expectedStartErr != "" && !strings.Contains(err.Error(), test.expectedStartErr) { - t.Fatalf("%s expected error '%s' got '%s'", test.name, test.expectedStartErr, err) - } else if err != nil && test.expectedStartErr == "" { - t.Fatalf("%s unexpected error '%s'", test.name, err) - } else if err == nil && test.expectedStartErr != "" { - t.Fatalf("%s expected error '%s' got none", test.name, err) + dbgLogger.Infof("config done test") + // run pre-routine : tests use it to set group & streams etc. + if tc.setup != nil { + tc.setup(t, &cw) } - return nil - }) + out := make(chan types.Event) + tmb := tomb.Tomb{} + var rcvdEvts []types.Event - //let's empty output chan - tmb.Go(func() error { - for { - select { - case in := <-out: - log.Debugf("received event %+v", in) - rcvd_evts = append(rcvd_evts, in) - case <-tmb.Dying(): - log.Debugf("pumper died") - return nil + dbgLogger.Infof("running StreamingAcquisition") + actmb := tomb.Tomb{} + actmb.Go(func() error { + err := cw.OneShotAcquisition(out, &actmb) + dbgLogger.Infof("acquis done") + cstest.RequireErrorContains(t, err, tc.expectedStartErr) + return nil + }) + + // let's empty output chan + tmb.Go(func() error { + for { + select { + case in := <-out: + log.Debugf("received event %+v", in) + rcvdEvts = append(rcvdEvts, in) + case <-tmb.Dying(): + log.Debugf("pumper died") + return nil + } } - } - }) + }) - if test.run != nil { - test.run(&cw) - } else { - dbgLogger.Warning("no run code") - } - - time.Sleep(5 * time.Second) - dbgLogger.Infof("killing collector") - tmb.Kill(nil) - <-tmb.Dead() - dbgLogger.Infof("killing datasource") - actmb.Kill(nil) - dbgLogger.Infof("waiting datasource death") - <-actmb.Dead() - //check results - if test.expectedResLen != -1 { - if test.expectedResLen != len(rcvd_evts) { - t.Fatalf("%s : expected %d results got %d -> %v", test.name, test.expectedResLen, len(rcvd_evts), rcvd_evts) + if tc.run != nil { + tc.run(t, &cw) } else { - dbgLogger.Debugf("got %d expected messages", len(rcvd_evts)) - } - } - if len(test.expectedResMessages) != 0 { - res := test.expectedResMessages - for idx, v := range rcvd_evts { - if len(res) == 0 { - t.Fatalf("result %d/%d : received '%s', didn't expect anything (recvd:%d, expected:%d)", idx, len(rcvd_evts), v.Line.Raw, len(rcvd_evts), len(test.expectedResMessages)) - } - if res[0] != v.Line.Raw { - t.Fatalf("result %d/%d : expected '%s', received '%s' (recvd:%d, expected:%d)", idx, len(rcvd_evts), res[0], v.Line.Raw, len(rcvd_evts), len(test.expectedResMessages)) - } - dbgLogger.Debugf("got message '%s'", res[0]) - res = res[1:] - } - if len(res) != 0 { - t.Fatalf("leftover unmatched results : %v", res) + dbgLogger.Warning("no code to run") } - } - if test.post != nil { - test.post(&cw) - } + time.Sleep(5 * time.Second) + dbgLogger.Infof("killing collector") + tmb.Kill(nil) + <-tmb.Dead() + dbgLogger.Infof("killing datasource") + actmb.Kill(nil) + dbgLogger.Infof("waiting datasource death") + <-actmb.Dead() + // check results + if tc.expectedResLen != -1 { + if tc.expectedResLen != len(rcvdEvts) { + t.Fatalf("%s : expected %d results got %d -> %v", tc.name, tc.expectedResLen, len(rcvdEvts), rcvdEvts) + } else { + dbgLogger.Debugf("got %d expected messages", len(rcvdEvts)) + } + } + if len(tc.expectedResMessages) != 0 { + res := tc.expectedResMessages + for idx, v := range rcvdEvts { + if len(res) == 0 { + t.Fatalf("result %d/%d : received '%s', didn't expect anything (recvd:%d, expected:%d)", idx, len(rcvdEvts), v.Line.Raw, len(rcvdEvts), len(tc.expectedResMessages)) + } + if res[0] != v.Line.Raw { + t.Fatalf("result %d/%d : expected '%s', received '%s' (recvd:%d, expected:%d)", idx, len(rcvdEvts), res[0], v.Line.Raw, len(rcvdEvts), len(tc.expectedResMessages)) + } + dbgLogger.Debugf("got message '%s'", res[0]) + res = res[1:] + } + if len(res) != 0 { + t.Fatalf("leftover unmatched results : %v", res) + } + + } + if tc.teardown != nil { + tc.teardown(t, &cw) + } + }) } - }