fix & cleanup cloudwatch_test.go (#1780)
This commit is contained in:
parent
edced6818a
commit
6120571421
|
@ -11,8 +11,10 @@ import (
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
|
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
|
||||||
|
"github.com/crowdsecurity/crowdsec/pkg/cstest"
|
||||||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
"gopkg.in/tomb.v2"
|
"gopkg.in/tomb.v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -24,16 +26,31 @@ import (
|
||||||
- check shutdown/restart
|
- check shutdown/restart
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
func deleteAllLogGroups(t *testing.T, cw *CloudwatchSource) {
|
||||||
|
input := &cloudwatchlogs.DescribeLogGroupsInput{}
|
||||||
|
result, err := cw.cwClient.DescribeLogGroups(input)
|
||||||
|
require.NoError(t, err)
|
||||||
|
for _, group := range result.LogGroups {
|
||||||
|
_, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
|
||||||
|
LogGroupName: group.LogGroupName,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func checkForLocalStackAvailability() error {
|
func checkForLocalStackAvailability() error {
|
||||||
if v := os.Getenv("AWS_ENDPOINT_FORCE"); v != "" {
|
v := os.Getenv("AWS_ENDPOINT_FORCE")
|
||||||
|
if v == "" {
|
||||||
|
return fmt.Errorf("missing aws endpoint for tests : AWS_ENDPOINT_FORCE")
|
||||||
|
}
|
||||||
|
|
||||||
v = strings.TrimPrefix(v, "http://")
|
v = strings.TrimPrefix(v, "http://")
|
||||||
|
|
||||||
_, err := net.Dial("tcp", v)
|
_, err := net.Dial("tcp", v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("while dialing %s : %s : aws endpoint isn't available", v, err)
|
return fmt.Errorf("while dialing %s : %s : aws endpoint isn't available", v, err)
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
return fmt.Errorf("missing aws endpoint for tests : AWS_ENDPOINT_FORCE")
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -56,16 +73,15 @@ func TestWatchLogGroupForStreams(t *testing.T) {
|
||||||
if runtime.GOOS == "windows" {
|
if runtime.GOOS == "windows" {
|
||||||
t.Skip("Skipping test on windows")
|
t.Skip("Skipping test on windows")
|
||||||
}
|
}
|
||||||
var err error
|
|
||||||
log.SetLevel(log.DebugLevel)
|
log.SetLevel(log.DebugLevel)
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
config []byte
|
config []byte
|
||||||
expectedCfgErr string
|
expectedCfgErr string
|
||||||
expectedStartErr string
|
expectedStartErr string
|
||||||
name string
|
name string
|
||||||
pre func(*CloudwatchSource)
|
setup func(*testing.T, *CloudwatchSource)
|
||||||
run func(*CloudwatchSource)
|
run func(*testing.T, *CloudwatchSource)
|
||||||
post func(*CloudwatchSource)
|
teardown func(*testing.T, *CloudwatchSource)
|
||||||
expectedResLen int
|
expectedResLen int
|
||||||
expectedResMessages []string
|
expectedResMessages []string
|
||||||
}{
|
}{
|
||||||
|
@ -80,19 +96,18 @@ labels:
|
||||||
group_name: b
|
group_name: b
|
||||||
stream_name: test_stream`),
|
stream_name: test_stream`),
|
||||||
expectedStartErr: "The specified log group does not exist",
|
expectedStartErr: "The specified log group does not exist",
|
||||||
pre: func(cw *CloudwatchSource) {
|
setup: func(t *testing.T, cw *CloudwatchSource) {
|
||||||
if _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
|
deleteAllLogGroups(t, cw)
|
||||||
|
_, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
|
||||||
LogGroupName: aws.String("test_group_not_used_1"),
|
LogGroupName: aws.String("test_group_not_used_1"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to create log group : %s", err)
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
},
|
},
|
||||||
post: func(cw *CloudwatchSource) {
|
teardown: func(t *testing.T, cw *CloudwatchSource) {
|
||||||
if _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
|
_, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
|
||||||
LogGroupName: aws.String("test_group_not_used_1"),
|
LogGroupName: aws.String("test_group_not_used_1"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to delete log group : %s", err)
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
// test stream mismatch
|
// test stream mismatch
|
||||||
|
@ -105,38 +120,37 @@ labels:
|
||||||
type: test_source
|
type: test_source
|
||||||
group_name: test_group1
|
group_name: test_group1
|
||||||
stream_name: test_stream_bad`),
|
stream_name: test_stream_bad`),
|
||||||
pre: func(cw *CloudwatchSource) {
|
setup: func(t *testing.T, cw *CloudwatchSource) {
|
||||||
if _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
|
deleteAllLogGroups(t, cw)
|
||||||
|
_, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
|
||||||
LogGroupName: aws.String("test_group1"),
|
LogGroupName: aws.String("test_group1"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to create log group : %s", err)
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
if _, err := cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
|
_, err = cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
|
||||||
LogGroupName: aws.String("test_group1"),
|
LogGroupName: aws.String("test_group1"),
|
||||||
LogStreamName: aws.String("test_stream"),
|
LogStreamName: aws.String("test_stream"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to create log stream : %s", err)
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
// have a message before we start - won't be popped, but will trigger stream monitoring
|
// have a message before we start - won't be popped, but will trigger stream monitoring
|
||||||
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
_, err = cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
||||||
LogGroupName: aws.String("test_group1"),
|
LogGroupName: aws.String("test_group1"),
|
||||||
LogStreamName: aws.String("test_stream"),
|
LogStreamName: aws.String("test_stream"),
|
||||||
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
||||||
&cloudwatchlogs.InputLogEvent{
|
{
|
||||||
Message: aws.String("test_message_1"),
|
Message: aws.String("test_message_1"),
|
||||||
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
|
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}); err != nil {
|
})
|
||||||
log.Fatalf("failed to put logs")
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
},
|
},
|
||||||
post: func(cw *CloudwatchSource) {
|
teardown: func(t *testing.T, cw *CloudwatchSource) {
|
||||||
if _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
|
_, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
|
||||||
LogGroupName: aws.String("test_group1"),
|
LogGroupName: aws.String("test_group1"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to delete log group : %s", err)
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
},
|
},
|
||||||
expectedResLen: 0,
|
expectedResLen: 0,
|
||||||
},
|
},
|
||||||
|
@ -150,40 +164,37 @@ labels:
|
||||||
type: test_source
|
type: test_source
|
||||||
group_name: test_group1
|
group_name: test_group1
|
||||||
stream_regexp: test_bad[0-9]+`),
|
stream_regexp: test_bad[0-9]+`),
|
||||||
pre: func(cw *CloudwatchSource) {
|
setup: func(t *testing.T, cw *CloudwatchSource) {
|
||||||
if _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
|
deleteAllLogGroups(t, cw)
|
||||||
|
_, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
|
||||||
LogGroupName: aws.String("test_group1"),
|
LogGroupName: aws.String("test_group1"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to create log group : %s", err)
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
if _, err := cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
|
_, err = cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
|
||||||
LogGroupName: aws.String("test_group1"),
|
LogGroupName: aws.String("test_group1"),
|
||||||
LogStreamName: aws.String("test_stream"),
|
LogStreamName: aws.String("test_stream"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to create log stream : %s", err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
}
|
|
||||||
// have a message before we start - won't be popped, but will trigger stream monitoring
|
// have a message before we start - won't be popped, but will trigger stream monitoring
|
||||||
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
_, err = cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
||||||
LogGroupName: aws.String("test_group1"),
|
LogGroupName: aws.String("test_group1"),
|
||||||
LogStreamName: aws.String("test_stream"),
|
LogStreamName: aws.String("test_stream"),
|
||||||
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
||||||
&cloudwatchlogs.InputLogEvent{
|
{
|
||||||
Message: aws.String("test_message_1"),
|
Message: aws.String("test_message_1"),
|
||||||
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
|
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to put logs")
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
},
|
},
|
||||||
post: func(cw *CloudwatchSource) {
|
teardown: func(t *testing.T, cw *CloudwatchSource) {
|
||||||
if _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
|
_, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
|
||||||
LogGroupName: aws.String("test_group1"),
|
LogGroupName: aws.String("test_group1"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to delete log group : %s", err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
expectedResLen: 0,
|
expectedResLen: 0,
|
||||||
},
|
},
|
||||||
|
@ -199,70 +210,64 @@ group_name: test_log_group1
|
||||||
log_level: trace
|
log_level: trace
|
||||||
stream_name: test_stream`),
|
stream_name: test_stream`),
|
||||||
// expectedStartErr: "The specified log group does not exist",
|
// expectedStartErr: "The specified log group does not exist",
|
||||||
pre: func(cw *CloudwatchSource) {
|
setup: func(t *testing.T, cw *CloudwatchSource) {
|
||||||
if _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
|
deleteAllLogGroups(t, cw)
|
||||||
|
_, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to create log group : %s", err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
}
|
_, err = cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
|
||||||
if _, err := cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
|
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
LogStreamName: aws.String("test_stream"),
|
LogStreamName: aws.String("test_stream"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to create log stream : %s", err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
}
|
|
||||||
// have a message before we start - won't be popped, but will trigger stream monitoring
|
// have a message before we start - won't be popped, but will trigger stream monitoring
|
||||||
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
_, err = cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
LogStreamName: aws.String("test_stream"),
|
LogStreamName: aws.String("test_stream"),
|
||||||
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
||||||
&cloudwatchlogs.InputLogEvent{
|
{
|
||||||
Message: aws.String("test_message_1"),
|
Message: aws.String("test_message_1"),
|
||||||
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
|
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to put logs")
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
},
|
},
|
||||||
run: func(cw *CloudwatchSource) {
|
run: func(t *testing.T, cw *CloudwatchSource) {
|
||||||
// wait for new stream pickup + stream poll interval
|
// wait for new stream pickup + stream poll interval
|
||||||
time.Sleep(def_PollNewStreamInterval + (1 * time.Second))
|
time.Sleep(def_PollNewStreamInterval + (1 * time.Second))
|
||||||
time.Sleep(def_PollStreamInterval + (1 * time.Second))
|
time.Sleep(def_PollStreamInterval + (1 * time.Second))
|
||||||
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
_, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
LogStreamName: aws.String("test_stream"),
|
LogStreamName: aws.String("test_stream"),
|
||||||
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
||||||
&cloudwatchlogs.InputLogEvent{
|
{
|
||||||
Message: aws.String("test_message_4"),
|
Message: aws.String("test_message_4"),
|
||||||
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
|
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
|
||||||
},
|
},
|
||||||
// and add an event in the future that will be popped
|
// and add an event in the future that will be popped
|
||||||
&cloudwatchlogs.InputLogEvent{
|
{
|
||||||
Message: aws.String("test_message_5"),
|
Message: aws.String("test_message_5"),
|
||||||
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
|
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to put logs : %s", err)
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
},
|
},
|
||||||
post: func(cw *CloudwatchSource) {
|
teardown: func(t *testing.T, cw *CloudwatchSource) {
|
||||||
if _, err := cw.cwClient.DeleteLogStream(&cloudwatchlogs.DeleteLogStreamInput{
|
_, err := cw.cwClient.DeleteLogStream(&cloudwatchlogs.DeleteLogStreamInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
LogStreamName: aws.String("test_stream"),
|
LogStreamName: aws.String("test_stream"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to delete log stream : %s", err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
}
|
_, err = cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
|
||||||
if _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
|
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to delete log group : %s", err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
expectedResLen: 3,
|
expectedResLen: 3,
|
||||||
expectedResMessages: []string{"test_message_1", "test_message_4", "test_message_5"},
|
expectedResMessages: []string{"test_message_1", "test_message_4", "test_message_5"},
|
||||||
|
@ -279,83 +284,77 @@ group_name: test_log_group1
|
||||||
log_level: trace
|
log_level: trace
|
||||||
stream_name: test_stream`),
|
stream_name: test_stream`),
|
||||||
// expectedStartErr: "The specified log group does not exist",
|
// expectedStartErr: "The specified log group does not exist",
|
||||||
pre: func(cw *CloudwatchSource) {
|
setup: func(t *testing.T, cw *CloudwatchSource) {
|
||||||
if _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
|
deleteAllLogGroups(t, cw)
|
||||||
|
_, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to create log group : %s", err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
}
|
_, err = cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
|
||||||
if _, err := cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
|
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
LogStreamName: aws.String("test_stream"),
|
LogStreamName: aws.String("test_stream"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to create log stream : %s", err)
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
// have a message before we start - won't be popped, but will trigger stream monitoring
|
// have a message before we start - won't be popped, but will trigger stream monitoring
|
||||||
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
_, err = cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
LogStreamName: aws.String("test_stream"),
|
LogStreamName: aws.String("test_stream"),
|
||||||
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
||||||
&cloudwatchlogs.InputLogEvent{
|
{
|
||||||
Message: aws.String("test_message_1"),
|
Message: aws.String("test_message_1"),
|
||||||
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
|
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to put logs")
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
},
|
},
|
||||||
run: func(cw *CloudwatchSource) {
|
run: func(t *testing.T, cw *CloudwatchSource) {
|
||||||
// wait for new stream pickup + stream poll interval
|
// wait for new stream pickup + stream poll interval
|
||||||
time.Sleep(def_PollNewStreamInterval + (1 * time.Second))
|
time.Sleep(def_PollNewStreamInterval + (1 * time.Second))
|
||||||
time.Sleep(def_PollStreamInterval + (1 * time.Second))
|
time.Sleep(def_PollStreamInterval + (1 * time.Second))
|
||||||
// send some events
|
// send some events
|
||||||
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
_, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
LogStreamName: aws.String("test_stream"),
|
LogStreamName: aws.String("test_stream"),
|
||||||
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
||||||
&cloudwatchlogs.InputLogEvent{
|
{
|
||||||
Message: aws.String("test_message_41"),
|
Message: aws.String("test_message_41"),
|
||||||
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
|
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to put logs : %s", err)
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
// wait for the stream to time-out
|
// wait for the stream to time-out
|
||||||
time.Sleep(def_StreamReadTimeout + (1 * time.Second))
|
time.Sleep(def_StreamReadTimeout + (1 * time.Second))
|
||||||
// and send events again
|
// and send events again
|
||||||
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
_, err = cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
LogStreamName: aws.String("test_stream"),
|
LogStreamName: aws.String("test_stream"),
|
||||||
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
||||||
&cloudwatchlogs.InputLogEvent{
|
{
|
||||||
Message: aws.String("test_message_51"),
|
Message: aws.String("test_message_51"),
|
||||||
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
|
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to put logs : %s", err)
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
// wait for new stream pickup + stream poll interval
|
// wait for new stream pickup + stream poll interval
|
||||||
time.Sleep(def_PollNewStreamInterval + (1 * time.Second))
|
time.Sleep(def_PollNewStreamInterval + (1 * time.Second))
|
||||||
time.Sleep(def_PollStreamInterval + (1 * time.Second))
|
time.Sleep(def_PollStreamInterval + (1 * time.Second))
|
||||||
},
|
},
|
||||||
post: func(cw *CloudwatchSource) {
|
teardown: func(t *testing.T, cw *CloudwatchSource) {
|
||||||
if _, err := cw.cwClient.DeleteLogStream(&cloudwatchlogs.DeleteLogStreamInput{
|
_, err := cw.cwClient.DeleteLogStream(&cloudwatchlogs.DeleteLogStreamInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
LogStreamName: aws.String("test_stream"),
|
LogStreamName: aws.String("test_stream"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to delete log stream : %s", err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
}
|
_, err = cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
|
||||||
if _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
|
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to delete log group : %s", err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
expectedResLen: 3,
|
expectedResLen: 3,
|
||||||
expectedResMessages: []string{"test_message_1", "test_message_41", "test_message_51"},
|
expectedResMessages: []string{"test_message_1", "test_message_41", "test_message_51"},
|
||||||
|
@ -372,98 +371,82 @@ group_name: test_log_group1
|
||||||
log_level: trace
|
log_level: trace
|
||||||
stream_name: test_stream`),
|
stream_name: test_stream`),
|
||||||
// expectedStartErr: "The specified log group does not exist",
|
// expectedStartErr: "The specified log group does not exist",
|
||||||
pre: func(cw *CloudwatchSource) {
|
setup: func(t *testing.T, cw *CloudwatchSource) {
|
||||||
if _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
|
deleteAllLogGroups(t, cw)
|
||||||
|
_, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to create log group : %s", err)
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
if _, err := cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
|
_, err = cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
LogStreamName: aws.String("test_stream"),
|
LogStreamName: aws.String("test_stream"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to create log stream : %s", err)
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
// have a message before we start - won't be popped, but will trigger stream monitoring
|
// have a message before we start - won't be popped, but will trigger stream monitoring
|
||||||
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
_, err = cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
LogStreamName: aws.String("test_stream"),
|
LogStreamName: aws.String("test_stream"),
|
||||||
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
||||||
&cloudwatchlogs.InputLogEvent{
|
{
|
||||||
Message: aws.String("test_message_1"),
|
Message: aws.String("test_message_1"),
|
||||||
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
|
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to put logs")
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
},
|
},
|
||||||
run: func(cw *CloudwatchSource) {
|
run: func(t *testing.T, cw *CloudwatchSource) {
|
||||||
// wait for new stream pickup + stream poll interval
|
// wait for new stream pickup + stream poll interval
|
||||||
time.Sleep(def_PollNewStreamInterval + (1 * time.Second))
|
time.Sleep(def_PollNewStreamInterval + (1 * time.Second))
|
||||||
time.Sleep(def_PollStreamInterval + (1 * time.Second))
|
time.Sleep(def_PollStreamInterval + (1 * time.Second))
|
||||||
time.Sleep(def_PollDeadStreamInterval + (1 * time.Second))
|
time.Sleep(def_PollDeadStreamInterval + (1 * time.Second))
|
||||||
},
|
},
|
||||||
post: func(cw *CloudwatchSource) {
|
teardown: func(t *testing.T, cw *CloudwatchSource) {
|
||||||
if _, err := cw.cwClient.DeleteLogStream(&cloudwatchlogs.DeleteLogStreamInput{
|
_, err := cw.cwClient.DeleteLogStream(&cloudwatchlogs.DeleteLogStreamInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
LogStreamName: aws.String("test_stream"),
|
LogStreamName: aws.String("test_stream"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to delete log stream : %s", err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
}
|
_, err = cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
|
||||||
if _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
|
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to delete log stream : %s", err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
expectedResLen: 1,
|
expectedResLen: 1,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, tc := range tests {
|
||||||
dbgLogger := log.New().WithField("test", test.name)
|
tc := tc
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
dbgLogger := log.New().WithField("test", tc.name)
|
||||||
dbgLogger.Logger.SetLevel(log.DebugLevel)
|
dbgLogger.Logger.SetLevel(log.DebugLevel)
|
||||||
dbgLogger.Infof("starting test")
|
dbgLogger.Infof("starting test")
|
||||||
cw := CloudwatchSource{}
|
cw := CloudwatchSource{}
|
||||||
err = cw.Configure(test.config, dbgLogger)
|
err := cw.Configure(tc.config, dbgLogger)
|
||||||
if err != nil && test.expectedCfgErr != "" {
|
cstest.RequireErrorContains(t, err, tc.expectedCfgErr)
|
||||||
if !strings.Contains(err.Error(), test.expectedCfgErr) {
|
|
||||||
t.Fatalf("%s expected error '%s' got error '%s'", test.name, test.expectedCfgErr, err)
|
if tc.expectedCfgErr != "" {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
log.Debugf("got expected error : %s", err)
|
|
||||||
continue
|
|
||||||
} else if err != nil && test.expectedCfgErr == "" {
|
|
||||||
t.Fatalf("%s unexpected error : %s", test.name, err)
|
|
||||||
continue
|
|
||||||
} else if test.expectedCfgErr != "" && err == nil {
|
|
||||||
t.Fatalf("%s expected error '%s', got none", test.name, test.expectedCfgErr)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
dbgLogger.Infof("config done test")
|
|
||||||
// run pre-routine : tests use it to set group & streams etc.
|
// run pre-routine : tests use it to set group & streams etc.
|
||||||
if test.pre != nil {
|
if tc.setup != nil {
|
||||||
test.pre(&cw)
|
tc.setup(t, &cw)
|
||||||
}
|
}
|
||||||
out := make(chan types.Event)
|
out := make(chan types.Event)
|
||||||
tmb := tomb.Tomb{}
|
tmb := tomb.Tomb{}
|
||||||
var rcvd_evts []types.Event
|
var rcvdEvts []types.Event
|
||||||
|
|
||||||
dbgLogger.Infof("running StreamingAcquisition")
|
dbgLogger.Infof("running StreamingAcquisition")
|
||||||
actmb := tomb.Tomb{}
|
actmb := tomb.Tomb{}
|
||||||
actmb.Go(func() error {
|
actmb.Go(func() error {
|
||||||
err := cw.StreamingAcquisition(out, &actmb)
|
err := cw.StreamingAcquisition(out, &actmb)
|
||||||
dbgLogger.Infof("acquis done")
|
dbgLogger.Infof("acquis done")
|
||||||
|
cstest.RequireErrorContains(t, err, tc.expectedStartErr)
|
||||||
if err != nil && test.expectedStartErr != "" && !strings.Contains(err.Error(), test.expectedStartErr) {
|
|
||||||
t.Fatalf("%s expected error '%s' got '%s'", test.name, test.expectedStartErr, err)
|
|
||||||
} else if err != nil && test.expectedStartErr == "" {
|
|
||||||
t.Fatalf("%s unexpected error '%s'", test.name, err)
|
|
||||||
} else if err == nil && test.expectedStartErr != "" {
|
|
||||||
t.Fatalf("%s expected error '%s' got none", test.name, err)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -473,7 +456,7 @@ stream_name: test_stream`),
|
||||||
select {
|
select {
|
||||||
case in := <-out:
|
case in := <-out:
|
||||||
log.Debugf("received event %+v", in)
|
log.Debugf("received event %+v", in)
|
||||||
rcvd_evts = append(rcvd_evts, in)
|
rcvdEvts = append(rcvdEvts, in)
|
||||||
case <-tmb.Dying():
|
case <-tmb.Dying():
|
||||||
log.Debugf("pumper died")
|
log.Debugf("pumper died")
|
||||||
return nil
|
return nil
|
||||||
|
@ -481,10 +464,10 @@ stream_name: test_stream`),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
if test.run != nil {
|
if tc.run != nil {
|
||||||
test.run(&cw)
|
tc.run(t, &cw)
|
||||||
} else {
|
} else {
|
||||||
dbgLogger.Warning("no run code")
|
dbgLogger.Warning("no code to run")
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
|
@ -496,20 +479,20 @@ stream_name: test_stream`),
|
||||||
<-actmb.Dead()
|
<-actmb.Dead()
|
||||||
// dbgLogger.Infof("collected events : %d -> %+v", len(rcvd_evts), rcvd_evts)
|
// dbgLogger.Infof("collected events : %d -> %+v", len(rcvd_evts), rcvd_evts)
|
||||||
// check results
|
// check results
|
||||||
if test.expectedResLen != -1 {
|
if tc.expectedResLen != -1 {
|
||||||
if test.expectedResLen != len(rcvd_evts) {
|
if tc.expectedResLen != len(rcvdEvts) {
|
||||||
t.Fatalf("%s : expected %d results got %d -> %v", test.name, test.expectedResLen, len(rcvd_evts), rcvd_evts)
|
t.Fatalf("%s : expected %d results got %d -> %v", tc.name, tc.expectedResLen, len(rcvdEvts), rcvdEvts)
|
||||||
}
|
}
|
||||||
dbgLogger.Debugf("got %d expected messages", len(rcvd_evts))
|
dbgLogger.Debugf("got %d expected messages", len(rcvdEvts))
|
||||||
}
|
}
|
||||||
if len(test.expectedResMessages) != 0 {
|
if len(tc.expectedResMessages) != 0 {
|
||||||
res := test.expectedResMessages
|
res := tc.expectedResMessages
|
||||||
for idx, v := range rcvd_evts {
|
for idx, v := range rcvdEvts {
|
||||||
if len(res) == 0 {
|
if len(res) == 0 {
|
||||||
t.Fatalf("result %d/%d : received '%s', didn't expect anything (recvd:%d, expected:%d)", idx, len(rcvd_evts), v.Line.Raw, len(rcvd_evts), len(test.expectedResMessages))
|
t.Fatalf("result %d/%d : received '%s', didn't expect anything (recvd:%d, expected:%d)", idx, len(rcvdEvts), v.Line.Raw, len(rcvdEvts), len(tc.expectedResMessages))
|
||||||
}
|
}
|
||||||
if res[0] != v.Line.Raw {
|
if res[0] != v.Line.Raw {
|
||||||
t.Fatalf("result %d/%d : expected '%s', received '%s' (recvd:%d, expected:%d)", idx, len(rcvd_evts), res[0], v.Line.Raw, len(rcvd_evts), len(test.expectedResMessages))
|
t.Fatalf("result %d/%d : expected '%s', received '%s' (recvd:%d, expected:%d)", idx, len(rcvdEvts), res[0], v.Line.Raw, len(rcvdEvts), len(tc.expectedResMessages))
|
||||||
}
|
}
|
||||||
dbgLogger.Debugf("got message '%s'", res[0])
|
dbgLogger.Debugf("got message '%s'", res[0])
|
||||||
res = res[1:]
|
res = res[1:]
|
||||||
|
@ -519,9 +502,10 @@ stream_name: test_stream`),
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
if test.post != nil {
|
if tc.teardown != nil {
|
||||||
test.post(&cw)
|
tc.teardown(t, &cw)
|
||||||
}
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -529,7 +513,6 @@ func TestConfiguration(t *testing.T) {
|
||||||
if runtime.GOOS == "windows" {
|
if runtime.GOOS == "windows" {
|
||||||
t.Skip("Skipping test on windows")
|
t.Skip("Skipping test on windows")
|
||||||
}
|
}
|
||||||
var err error
|
|
||||||
log.SetLevel(log.DebugLevel)
|
log.SetLevel(log.DebugLevel)
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
config []byte
|
config []byte
|
||||||
|
@ -569,25 +552,18 @@ stream_name: test_stream`),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for idx, test := range tests {
|
for _, tc := range tests {
|
||||||
dbgLogger := log.New().WithField("test", test.name)
|
tc := tc
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
dbgLogger := log.New().WithField("test", tc.name)
|
||||||
dbgLogger.Logger.SetLevel(log.DebugLevel)
|
dbgLogger.Logger.SetLevel(log.DebugLevel)
|
||||||
log.Printf("%d/%d", idx, len(tests))
|
|
||||||
cw := CloudwatchSource{}
|
cw := CloudwatchSource{}
|
||||||
err = cw.Configure(test.config, dbgLogger)
|
err := cw.Configure(tc.config, dbgLogger)
|
||||||
if err != nil && test.expectedCfgErr != "" {
|
cstest.RequireErrorContains(t, err, tc.expectedCfgErr)
|
||||||
if !strings.Contains(err.Error(), test.expectedCfgErr) {
|
if tc.expectedCfgErr != "" {
|
||||||
t.Fatalf("%s expected error '%s' got error '%s'", test.name, test.expectedCfgErr, err)
|
return
|
||||||
}
|
|
||||||
log.Debugf("got expected error : %s", err)
|
|
||||||
continue
|
|
||||||
} else if err != nil && test.expectedCfgErr == "" {
|
|
||||||
t.Fatalf("%s unexpected error : %s", test.name, err)
|
|
||||||
continue
|
|
||||||
} else if test.expectedCfgErr != "" && err == nil {
|
|
||||||
t.Fatalf("%s expected error '%s', got none", test.name, test.expectedCfgErr)
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
out := make(chan types.Event)
|
out := make(chan types.Event)
|
||||||
tmb := tomb.Tomb{}
|
tmb := tomb.Tomb{}
|
||||||
|
|
||||||
|
@ -597,19 +573,14 @@ stream_name: test_stream`),
|
||||||
case "cat":
|
case "cat":
|
||||||
err = cw.OneShotAcquisition(out, &tmb)
|
err = cw.OneShotAcquisition(out, &tmb)
|
||||||
}
|
}
|
||||||
if err != nil && test.expectedStartErr != "" && !strings.Contains(err.Error(), test.expectedStartErr) {
|
|
||||||
t.Fatalf("%s expected error '%s' got '%s'", test.name, test.expectedStartErr, err)
|
cstest.RequireErrorContains(t, err, tc.expectedStartErr)
|
||||||
} else if err != nil && test.expectedStartErr == "" {
|
|
||||||
t.Fatalf("%s unexpected error '%s'", test.name, err)
|
|
||||||
} else if err == nil && test.expectedStartErr != "" {
|
|
||||||
t.Fatalf("%s expected error '%s' got none", test.name, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Debugf("killing ...")
|
log.Debugf("killing ...")
|
||||||
tmb.Kill(nil)
|
tmb.Kill(nil)
|
||||||
<-tmb.Dead()
|
<-tmb.Dead()
|
||||||
log.Debugf("dead :)")
|
log.Debugf("dead :)")
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -617,7 +588,6 @@ func TestConfigureByDSN(t *testing.T) {
|
||||||
if runtime.GOOS == "windows" {
|
if runtime.GOOS == "windows" {
|
||||||
t.Skip("Skipping test on windows")
|
t.Skip("Skipping test on windows")
|
||||||
}
|
}
|
||||||
var err error
|
|
||||||
log.SetLevel(log.DebugLevel)
|
log.SetLevel(log.DebugLevel)
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
dsn string
|
dsn string
|
||||||
|
@ -647,25 +617,15 @@ func TestConfigureByDSN(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for idx, test := range tests {
|
for _, tc := range tests {
|
||||||
dbgLogger := log.New().WithField("test", test.name)
|
tc := tc
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
dbgLogger := log.New().WithField("test", tc.name)
|
||||||
dbgLogger.Logger.SetLevel(log.DebugLevel)
|
dbgLogger.Logger.SetLevel(log.DebugLevel)
|
||||||
log.Printf("%d/%d", idx, len(tests))
|
|
||||||
cw := CloudwatchSource{}
|
cw := CloudwatchSource{}
|
||||||
err = cw.ConfigureByDSN(test.dsn, test.labels, dbgLogger)
|
err := cw.ConfigureByDSN(tc.dsn, tc.labels, dbgLogger)
|
||||||
if err != nil && test.expectedCfgErr != "" {
|
cstest.RequireErrorContains(t, err, tc.expectedCfgErr)
|
||||||
if !strings.Contains(err.Error(), test.expectedCfgErr) {
|
})
|
||||||
t.Fatalf("%s expected error '%s' got error '%s'", test.name, test.expectedCfgErr, err)
|
|
||||||
}
|
|
||||||
log.Debugf("got expected error : %s", err)
|
|
||||||
continue
|
|
||||||
} else if err != nil && test.expectedCfgErr == "" {
|
|
||||||
t.Fatalf("%s unexpected error : %s", test.name, err)
|
|
||||||
continue
|
|
||||||
} else if test.expectedCfgErr != "" && err == nil {
|
|
||||||
t.Fatalf("%s expected error '%s', got none", test.name, test.expectedCfgErr)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -673,16 +633,15 @@ func TestOneShotAcquisition(t *testing.T) {
|
||||||
if runtime.GOOS == "windows" {
|
if runtime.GOOS == "windows" {
|
||||||
t.Skip("Skipping test on windows")
|
t.Skip("Skipping test on windows")
|
||||||
}
|
}
|
||||||
var err error
|
|
||||||
log.SetLevel(log.DebugLevel)
|
log.SetLevel(log.DebugLevel)
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
dsn string
|
dsn string
|
||||||
expectedCfgErr string
|
expectedCfgErr string
|
||||||
expectedStartErr string
|
expectedStartErr string
|
||||||
name string
|
name string
|
||||||
pre func(*CloudwatchSource)
|
setup func(*testing.T, *CloudwatchSource)
|
||||||
run func(*CloudwatchSource)
|
run func(*testing.T, *CloudwatchSource)
|
||||||
post func(*CloudwatchSource)
|
teardown func(*testing.T, *CloudwatchSource)
|
||||||
expectedResLen int
|
expectedResLen int
|
||||||
expectedResMessages []string
|
expectedResMessages []string
|
||||||
}{
|
}{
|
||||||
|
@ -691,19 +650,24 @@ func TestOneShotAcquisition(t *testing.T) {
|
||||||
name: "empty_stream",
|
name: "empty_stream",
|
||||||
dsn: "cloudwatch://test_log_group1:test_stream?backlog=1h",
|
dsn: "cloudwatch://test_log_group1:test_stream?backlog=1h",
|
||||||
// expectedStartErr: "The specified log group does not exist",
|
// expectedStartErr: "The specified log group does not exist",
|
||||||
pre: func(cw *CloudwatchSource) {
|
setup: func(t *testing.T, cw *CloudwatchSource) {
|
||||||
cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
|
deleteAllLogGroups(t, cw)
|
||||||
|
_, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
})
|
})
|
||||||
cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
LogStreamName: aws.String("test_stream"),
|
LogStreamName: aws.String("test_stream"),
|
||||||
})
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
},
|
},
|
||||||
post: func(cw *CloudwatchSource) {
|
teardown: func(t *testing.T, cw *CloudwatchSource) {
|
||||||
cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
|
_, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
})
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
},
|
},
|
||||||
expectedResLen: 0,
|
expectedResLen: 0,
|
||||||
},
|
},
|
||||||
|
@ -712,114 +676,97 @@ func TestOneShotAcquisition(t *testing.T) {
|
||||||
name: "get_one_event",
|
name: "get_one_event",
|
||||||
dsn: "cloudwatch://test_log_group1:test_stream?backlog=1h",
|
dsn: "cloudwatch://test_log_group1:test_stream?backlog=1h",
|
||||||
// expectedStartErr: "The specified log group does not exist",
|
// expectedStartErr: "The specified log group does not exist",
|
||||||
pre: func(cw *CloudwatchSource) {
|
setup: func(t *testing.T, cw *CloudwatchSource) {
|
||||||
if _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
|
deleteAllLogGroups(t, cw)
|
||||||
|
_, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("error while CreateLogGroup")
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
if _, err := cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
|
_, err = cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
LogStreamName: aws.String("test_stream"),
|
LogStreamName: aws.String("test_stream"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("error while CreateLogStream")
|
require.NoError(t, err)
|
||||||
|
|
||||||
}
|
|
||||||
// this one is too much in the back
|
// this one is too much in the back
|
||||||
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
_, err = cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
LogStreamName: aws.String("test_stream"),
|
LogStreamName: aws.String("test_stream"),
|
||||||
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
||||||
&cloudwatchlogs.InputLogEvent{
|
{
|
||||||
Message: aws.String("test_message_1"),
|
Message: aws.String("test_message_1"),
|
||||||
Timestamp: aws.Int64(time.Now().UTC().Add(-(2 * time.Hour)).UTC().Unix() * 1000),
|
Timestamp: aws.Int64(time.Now().UTC().Add(-(2 * time.Hour)).UTC().Unix() * 1000),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}); err != nil {
|
})
|
||||||
log.Fatalf("failed to put logs")
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
|
|
||||||
// this one can be read
|
// this one can be read
|
||||||
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
_, err = cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
LogStreamName: aws.String("test_stream"),
|
LogStreamName: aws.String("test_stream"),
|
||||||
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
||||||
&cloudwatchlogs.InputLogEvent{
|
{
|
||||||
Message: aws.String("test_message_2"),
|
Message: aws.String("test_message_2"),
|
||||||
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
|
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}); err != nil {
|
})
|
||||||
log.Fatalf("failed to put logs")
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
|
|
||||||
// this one is in the past
|
// this one is in the past
|
||||||
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
_, err = cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
LogStreamName: aws.String("test_stream"),
|
LogStreamName: aws.String("test_stream"),
|
||||||
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
LogEvents: []*cloudwatchlogs.InputLogEvent{
|
||||||
&cloudwatchlogs.InputLogEvent{
|
{
|
||||||
Message: aws.String("test_message_3"),
|
Message: aws.String("test_message_3"),
|
||||||
Timestamp: aws.Int64(time.Now().UTC().Add(-(3 * time.Hour)).UTC().Unix() * 1000),
|
Timestamp: aws.Int64(time.Now().UTC().Add(-(3 * time.Hour)).UTC().Unix() * 1000),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}); err != nil {
|
})
|
||||||
log.Fatalf("failed to put logs")
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
},
|
},
|
||||||
post: func(cw *CloudwatchSource) {
|
teardown: func(t *testing.T, cw *CloudwatchSource) {
|
||||||
if _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
|
_, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
|
||||||
LogGroupName: aws.String("test_log_group1"),
|
LogGroupName: aws.String("test_log_group1"),
|
||||||
}); err != nil {
|
})
|
||||||
t.Fatalf("failed to delete")
|
require.NoError(t, err)
|
||||||
}
|
|
||||||
},
|
},
|
||||||
expectedResLen: 1,
|
expectedResLen: 1,
|
||||||
expectedResMessages: []string{"test_message_2"},
|
expectedResMessages: []string{"test_message_2"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, tc := range tests {
|
||||||
dbgLogger := log.New().WithField("test", test.name)
|
tc := tc
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
dbgLogger := log.New().WithField("test", tc.name)
|
||||||
dbgLogger.Logger.SetLevel(log.DebugLevel)
|
dbgLogger.Logger.SetLevel(log.DebugLevel)
|
||||||
dbgLogger.Infof("starting test")
|
dbgLogger.Infof("starting test")
|
||||||
cw := CloudwatchSource{}
|
cw := CloudwatchSource{}
|
||||||
err = cw.ConfigureByDSN(test.dsn, map[string]string{"type": "test"}, dbgLogger)
|
err := cw.ConfigureByDSN(tc.dsn, map[string]string{"type": "test"}, dbgLogger)
|
||||||
if err != nil && test.expectedCfgErr != "" {
|
cstest.RequireErrorContains(t, err, tc.expectedCfgErr)
|
||||||
if !strings.Contains(err.Error(), test.expectedCfgErr) {
|
if tc.expectedCfgErr != "" {
|
||||||
t.Fatalf("%s expected error '%s' got error '%s'", test.name, test.expectedCfgErr, err)
|
return
|
||||||
}
|
|
||||||
log.Debugf("got expected error : %s", err)
|
|
||||||
continue
|
|
||||||
} else if err != nil && test.expectedCfgErr == "" {
|
|
||||||
t.Fatalf("%s unexpected error : %s", test.name, err)
|
|
||||||
continue
|
|
||||||
} else if test.expectedCfgErr != "" && err == nil {
|
|
||||||
t.Fatalf("%s expected error '%s', got none", test.name, test.expectedCfgErr)
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dbgLogger.Infof("config done test")
|
dbgLogger.Infof("config done test")
|
||||||
// run pre-routine : tests use it to set group & streams etc.
|
// run pre-routine : tests use it to set group & streams etc.
|
||||||
if test.pre != nil {
|
if tc.setup != nil {
|
||||||
test.pre(&cw)
|
tc.setup(t, &cw)
|
||||||
}
|
}
|
||||||
out := make(chan types.Event)
|
out := make(chan types.Event)
|
||||||
tmb := tomb.Tomb{}
|
tmb := tomb.Tomb{}
|
||||||
var rcvd_evts []types.Event
|
var rcvdEvts []types.Event
|
||||||
|
|
||||||
dbgLogger.Infof("running StreamingAcquisition")
|
dbgLogger.Infof("running StreamingAcquisition")
|
||||||
actmb := tomb.Tomb{}
|
actmb := tomb.Tomb{}
|
||||||
actmb.Go(func() error {
|
actmb.Go(func() error {
|
||||||
err := cw.OneShotAcquisition(out, &actmb)
|
err := cw.OneShotAcquisition(out, &actmb)
|
||||||
dbgLogger.Infof("acquis done")
|
dbgLogger.Infof("acquis done")
|
||||||
|
cstest.RequireErrorContains(t, err, tc.expectedStartErr)
|
||||||
if err != nil && test.expectedStartErr != "" && !strings.Contains(err.Error(), test.expectedStartErr) {
|
|
||||||
t.Fatalf("%s expected error '%s' got '%s'", test.name, test.expectedStartErr, err)
|
|
||||||
} else if err != nil && test.expectedStartErr == "" {
|
|
||||||
t.Fatalf("%s unexpected error '%s'", test.name, err)
|
|
||||||
} else if err == nil && test.expectedStartErr != "" {
|
|
||||||
t.Fatalf("%s expected error '%s' got none", test.name, err)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -829,7 +776,7 @@ func TestOneShotAcquisition(t *testing.T) {
|
||||||
select {
|
select {
|
||||||
case in := <-out:
|
case in := <-out:
|
||||||
log.Debugf("received event %+v", in)
|
log.Debugf("received event %+v", in)
|
||||||
rcvd_evts = append(rcvd_evts, in)
|
rcvdEvts = append(rcvdEvts, in)
|
||||||
case <-tmb.Dying():
|
case <-tmb.Dying():
|
||||||
log.Debugf("pumper died")
|
log.Debugf("pumper died")
|
||||||
return nil
|
return nil
|
||||||
|
@ -837,10 +784,10 @@ func TestOneShotAcquisition(t *testing.T) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
if test.run != nil {
|
if tc.run != nil {
|
||||||
test.run(&cw)
|
tc.run(t, &cw)
|
||||||
} else {
|
} else {
|
||||||
dbgLogger.Warning("no run code")
|
dbgLogger.Warning("no code to run")
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
|
@ -852,21 +799,21 @@ func TestOneShotAcquisition(t *testing.T) {
|
||||||
dbgLogger.Infof("waiting datasource death")
|
dbgLogger.Infof("waiting datasource death")
|
||||||
<-actmb.Dead()
|
<-actmb.Dead()
|
||||||
// check results
|
// check results
|
||||||
if test.expectedResLen != -1 {
|
if tc.expectedResLen != -1 {
|
||||||
if test.expectedResLen != len(rcvd_evts) {
|
if tc.expectedResLen != len(rcvdEvts) {
|
||||||
t.Fatalf("%s : expected %d results got %d -> %v", test.name, test.expectedResLen, len(rcvd_evts), rcvd_evts)
|
t.Fatalf("%s : expected %d results got %d -> %v", tc.name, tc.expectedResLen, len(rcvdEvts), rcvdEvts)
|
||||||
} else {
|
} else {
|
||||||
dbgLogger.Debugf("got %d expected messages", len(rcvd_evts))
|
dbgLogger.Debugf("got %d expected messages", len(rcvdEvts))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(test.expectedResMessages) != 0 {
|
if len(tc.expectedResMessages) != 0 {
|
||||||
res := test.expectedResMessages
|
res := tc.expectedResMessages
|
||||||
for idx, v := range rcvd_evts {
|
for idx, v := range rcvdEvts {
|
||||||
if len(res) == 0 {
|
if len(res) == 0 {
|
||||||
t.Fatalf("result %d/%d : received '%s', didn't expect anything (recvd:%d, expected:%d)", idx, len(rcvd_evts), v.Line.Raw, len(rcvd_evts), len(test.expectedResMessages))
|
t.Fatalf("result %d/%d : received '%s', didn't expect anything (recvd:%d, expected:%d)", idx, len(rcvdEvts), v.Line.Raw, len(rcvdEvts), len(tc.expectedResMessages))
|
||||||
}
|
}
|
||||||
if res[0] != v.Line.Raw {
|
if res[0] != v.Line.Raw {
|
||||||
t.Fatalf("result %d/%d : expected '%s', received '%s' (recvd:%d, expected:%d)", idx, len(rcvd_evts), res[0], v.Line.Raw, len(rcvd_evts), len(test.expectedResMessages))
|
t.Fatalf("result %d/%d : expected '%s', received '%s' (recvd:%d, expected:%d)", idx, len(rcvdEvts), res[0], v.Line.Raw, len(rcvdEvts), len(tc.expectedResMessages))
|
||||||
}
|
}
|
||||||
dbgLogger.Debugf("got message '%s'", res[0])
|
dbgLogger.Debugf("got message '%s'", res[0])
|
||||||
res = res[1:]
|
res = res[1:]
|
||||||
|
@ -876,9 +823,9 @@ func TestOneShotAcquisition(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
if test.post != nil {
|
if tc.teardown != nil {
|
||||||
test.post(&cw)
|
tc.teardown(t, &cw)
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in a new issue