crowdsec/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go
Thibault "bui" Koechlin cc1ab8c50d
switch to utc time everywhere (#1167)
* switch to utc time everywhere


Co-authored-by: alteredCoder <kevin@crowdsec.net>
2022-01-19 14:56:05 +01:00

872 lines
28 KiB
Go

package cloudwatchacquisition
import (
"fmt"
"net"
"os"
"strings"
"testing"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/crowdsecurity/crowdsec/pkg/types"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
)
/*
test plan :
- start on bad group/bad stream
- start on good settings (oneshot) -> check expected messages
- start on good settings (stream) -> check expected messages within given time
- check shutdown/restart
*/
func checkForLocalStackAvailability() error {
if v := os.Getenv("AWS_ENDPOINT_FORCE"); v != "" {
v = strings.TrimPrefix(v, "http://")
_, err := net.Dial("tcp", v)
if err != nil {
return fmt.Errorf("while dialing %s : %s : aws endpoint isn't available", v, err)
}
} else {
return fmt.Errorf("missing aws endpoint for tests : AWS_ENDPOINT_FORCE")
}
return nil
}
func TestMain(m *testing.M) {
if err := checkForLocalStackAvailability(); err != nil {
log.Fatalf("local stack error : %s", err)
}
def_PollNewStreamInterval = 1 * time.Second
def_PollStreamInterval = 1 * time.Second
def_StreamReadTimeout = 10 * time.Second
def_MaxStreamAge = 5 * time.Second
def_PollDeadStreamInterval = 5 * time.Second
os.Exit(m.Run())
}
func TestWatchLogGroupForStreams(t *testing.T) {
var err error
log.SetLevel(log.DebugLevel)
tests := []struct {
config []byte
expectedCfgErr string
expectedStartErr string
name string
pre func(*CloudwatchSource)
run func(*CloudwatchSource)
post func(*CloudwatchSource)
expectedResLen int
expectedResMessages []string
}{
//require a group name that doesn't exist
{
name: "group_does_not_exists",
config: []byte(`
source: cloudwatch
aws_region: us-east-1
labels:
type: test_source
group_name: b
stream_name: test_stream`),
expectedStartErr: "The specified log group does not exist",
pre: func(cw *CloudwatchSource) {
if _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
LogGroupName: aws.String("test_group_not_used_1"),
}); err != nil {
t.Fatalf("failed to create log group : %s", err)
}
},
post: func(cw *CloudwatchSource) {
if _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
LogGroupName: aws.String("test_group_not_used_1"),
}); err != nil {
t.Fatalf("failed to delete log group : %s", err)
}
},
},
//test stream mismatch
{
name: "group_exists_bad_stream_name",
config: []byte(`
source: cloudwatch
aws_region: us-east-1
labels:
type: test_source
group_name: test_group1
stream_name: test_stream_bad`),
pre: func(cw *CloudwatchSource) {
if _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
LogGroupName: aws.String("test_group1"),
}); err != nil {
t.Fatalf("failed to create log group : %s", err)
}
if _, err := cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
LogGroupName: aws.String("test_group1"),
LogStreamName: aws.String("test_stream"),
}); err != nil {
t.Fatalf("failed to create log stream : %s", err)
}
//have a message before we start - won't be popped, but will trigger stream monitoring
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
LogGroupName: aws.String("test_group1"),
LogStreamName: aws.String("test_stream"),
LogEvents: []*cloudwatchlogs.InputLogEvent{
&cloudwatchlogs.InputLogEvent{
Message: aws.String("test_message_1"),
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
},
},
}); err != nil {
log.Fatalf("failed to put logs")
}
},
post: func(cw *CloudwatchSource) {
if _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
LogGroupName: aws.String("test_group1"),
}); err != nil {
t.Fatalf("failed to delete log group : %s", err)
}
},
expectedResLen: 0,
},
//test stream mismatch
{
name: "group_exists_bad_stream_regexp",
config: []byte(`
source: cloudwatch
aws_region: us-east-1
labels:
type: test_source
group_name: test_group1
stream_regexp: test_bad[0-9]+`),
pre: func(cw *CloudwatchSource) {
if _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
LogGroupName: aws.String("test_group1"),
}); err != nil {
t.Fatalf("failed to create log group : %s", err)
}
if _, err := cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
LogGroupName: aws.String("test_group1"),
LogStreamName: aws.String("test_stream"),
}); err != nil {
t.Fatalf("failed to create log stream : %s", err)
}
//have a message before we start - won't be popped, but will trigger stream monitoring
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
LogGroupName: aws.String("test_group1"),
LogStreamName: aws.String("test_stream"),
LogEvents: []*cloudwatchlogs.InputLogEvent{
&cloudwatchlogs.InputLogEvent{
Message: aws.String("test_message_1"),
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
},
},
}); err != nil {
t.Fatalf("failed to put logs")
}
},
post: func(cw *CloudwatchSource) {
if _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
LogGroupName: aws.String("test_group1"),
}); err != nil {
t.Fatalf("failed to delete log group : %s", err)
}
},
expectedResLen: 0,
},
//require a group name that does exist and contains a stream in which we gonna put events
{
name: "group_exists_stream_exists_has_events",
config: []byte(`
source: cloudwatch
aws_region: us-east-1
labels:
type: test_source
group_name: test_log_group1
log_level: trace
stream_name: test_stream`),
//expectedStartErr: "The specified log group does not exist",
pre: func(cw *CloudwatchSource) {
if _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
LogGroupName: aws.String("test_log_group1"),
}); err != nil {
t.Fatalf("failed to create log group : %s", err)
}
if _, err := cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
LogGroupName: aws.String("test_log_group1"),
LogStreamName: aws.String("test_stream"),
}); err != nil {
t.Fatalf("failed to create log stream : %s", err)
}
//have a message before we start - won't be popped, but will trigger stream monitoring
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
LogGroupName: aws.String("test_log_group1"),
LogStreamName: aws.String("test_stream"),
LogEvents: []*cloudwatchlogs.InputLogEvent{
&cloudwatchlogs.InputLogEvent{
Message: aws.String("test_message_1"),
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
},
},
}); err != nil {
t.Fatalf("failed to put logs")
}
},
run: func(cw *CloudwatchSource) {
//wait for new stream pickup + stream poll interval
time.Sleep(def_PollNewStreamInterval + (1 * time.Second))
time.Sleep(def_PollStreamInterval + (1 * time.Second))
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
LogGroupName: aws.String("test_log_group1"),
LogStreamName: aws.String("test_stream"),
LogEvents: []*cloudwatchlogs.InputLogEvent{
&cloudwatchlogs.InputLogEvent{
Message: aws.String("test_message_4"),
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
},
//and add an event in the future that will be popped
&cloudwatchlogs.InputLogEvent{
Message: aws.String("test_message_5"),
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
},
},
}); err != nil {
t.Fatalf("failed to put logs : %s", err)
}
},
post: func(cw *CloudwatchSource) {
if _, err := cw.cwClient.DeleteLogStream(&cloudwatchlogs.DeleteLogStreamInput{
LogGroupName: aws.String("test_log_group1"),
LogStreamName: aws.String("test_stream"),
}); err != nil {
t.Fatalf("failed to delete log stream : %s", err)
}
if _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
LogGroupName: aws.String("test_log_group1"),
}); err != nil {
t.Fatalf("failed to delete log group : %s", err)
}
},
expectedResLen: 3,
expectedResMessages: []string{"test_message_1", "test_message_4", "test_message_5"},
},
//have a stream generate events, reach time-out and gets polled again
{
name: "group_exists_stream_exists_has_events+timeout",
config: []byte(`
source: cloudwatch
aws_region: us-east-1
labels:
type: test_source
group_name: test_log_group1
log_level: trace
stream_name: test_stream`),
//expectedStartErr: "The specified log group does not exist",
pre: func(cw *CloudwatchSource) {
if _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
LogGroupName: aws.String("test_log_group1"),
}); err != nil {
t.Fatalf("failed to create log group : %s", err)
}
if _, err := cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
LogGroupName: aws.String("test_log_group1"),
LogStreamName: aws.String("test_stream"),
}); err != nil {
t.Fatalf("failed to create log stream : %s", err)
}
//have a message before we start - won't be popped, but will trigger stream monitoring
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
LogGroupName: aws.String("test_log_group1"),
LogStreamName: aws.String("test_stream"),
LogEvents: []*cloudwatchlogs.InputLogEvent{
&cloudwatchlogs.InputLogEvent{
Message: aws.String("test_message_1"),
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
},
},
}); err != nil {
t.Fatalf("failed to put logs")
}
},
run: func(cw *CloudwatchSource) {
//wait for new stream pickup + stream poll interval
time.Sleep(def_PollNewStreamInterval + (1 * time.Second))
time.Sleep(def_PollStreamInterval + (1 * time.Second))
//send some events
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
LogGroupName: aws.String("test_log_group1"),
LogStreamName: aws.String("test_stream"),
LogEvents: []*cloudwatchlogs.InputLogEvent{
&cloudwatchlogs.InputLogEvent{
Message: aws.String("test_message_41"),
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
},
},
}); err != nil {
t.Fatalf("failed to put logs : %s", err)
}
//wait for the stream to time-out
time.Sleep(def_StreamReadTimeout + (1 * time.Second))
//and send events again
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
LogGroupName: aws.String("test_log_group1"),
LogStreamName: aws.String("test_stream"),
LogEvents: []*cloudwatchlogs.InputLogEvent{
&cloudwatchlogs.InputLogEvent{
Message: aws.String("test_message_51"),
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
},
},
}); err != nil {
t.Fatalf("failed to put logs : %s", err)
}
//wait for new stream pickup + stream poll interval
time.Sleep(def_PollNewStreamInterval + (1 * time.Second))
time.Sleep(def_PollStreamInterval + (1 * time.Second))
},
post: func(cw *CloudwatchSource) {
if _, err := cw.cwClient.DeleteLogStream(&cloudwatchlogs.DeleteLogStreamInput{
LogGroupName: aws.String("test_log_group1"),
LogStreamName: aws.String("test_stream"),
}); err != nil {
t.Fatalf("failed to delete log stream : %s", err)
}
if _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
LogGroupName: aws.String("test_log_group1"),
}); err != nil {
t.Fatalf("failed to delete log group : %s", err)
}
},
expectedResLen: 3,
expectedResMessages: []string{"test_message_1", "test_message_41", "test_message_51"},
},
//have a stream generate events, reach time-out and dead body collection
{
name: "group_exists_stream_exists_has_events+timeout+GC",
config: []byte(`
source: cloudwatch
aws_region: us-east-1
labels:
type: test_source
group_name: test_log_group1
log_level: trace
stream_name: test_stream`),
//expectedStartErr: "The specified log group does not exist",
pre: func(cw *CloudwatchSource) {
if _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
LogGroupName: aws.String("test_log_group1"),
}); err != nil {
t.Fatalf("failed to create log group : %s", err)
}
if _, err := cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
LogGroupName: aws.String("test_log_group1"),
LogStreamName: aws.String("test_stream"),
}); err != nil {
t.Fatalf("failed to create log stream : %s", err)
}
//have a message before we start - won't be popped, but will trigger stream monitoring
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
LogGroupName: aws.String("test_log_group1"),
LogStreamName: aws.String("test_stream"),
LogEvents: []*cloudwatchlogs.InputLogEvent{
&cloudwatchlogs.InputLogEvent{
Message: aws.String("test_message_1"),
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
},
},
}); err != nil {
t.Fatalf("failed to put logs")
}
},
run: func(cw *CloudwatchSource) {
//wait for new stream pickup + stream poll interval
time.Sleep(def_PollNewStreamInterval + (1 * time.Second))
time.Sleep(def_PollStreamInterval + (1 * time.Second))
time.Sleep(def_PollDeadStreamInterval + (1 * time.Second))
},
post: func(cw *CloudwatchSource) {
if _, err := cw.cwClient.DeleteLogStream(&cloudwatchlogs.DeleteLogStreamInput{
LogGroupName: aws.String("test_log_group1"),
LogStreamName: aws.String("test_stream"),
}); err != nil {
t.Fatalf("failed to delete log stream : %s", err)
}
if _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
LogGroupName: aws.String("test_log_group1"),
}); err != nil {
t.Fatalf("failed to delete log stream : %s", err)
}
},
expectedResLen: 1,
},
}
for _, test := range tests {
dbgLogger := log.New().WithField("test", test.name)
dbgLogger.Logger.SetLevel(log.DebugLevel)
dbgLogger.Infof("starting test")
cw := CloudwatchSource{}
err = cw.Configure(test.config, dbgLogger)
if err != nil && test.expectedCfgErr != "" {
if !strings.Contains(err.Error(), test.expectedCfgErr) {
t.Fatalf("%s expected error '%s' got error '%s'", test.name, test.expectedCfgErr, err.Error())
}
log.Debugf("got expected error : %s", err)
continue
} else if err != nil && test.expectedCfgErr == "" {
t.Fatalf("%s unexpected error : %s", test.name, err)
continue
} else if test.expectedCfgErr != "" && err == nil {
t.Fatalf("%s expected error '%s', got none", test.name, test.expectedCfgErr)
continue
}
dbgLogger.Infof("config done test")
//run pre-routine : tests use it to set group & streams etc.
if test.pre != nil {
test.pre(&cw)
}
out := make(chan types.Event)
tmb := tomb.Tomb{}
var rcvd_evts []types.Event
dbgLogger.Infof("running StreamingAcquisition")
actmb := tomb.Tomb{}
actmb.Go(func() error {
err := cw.StreamingAcquisition(out, &actmb)
dbgLogger.Infof("acquis done")
if err != nil && test.expectedStartErr != "" && !strings.Contains(err.Error(), test.expectedStartErr) {
t.Fatalf("%s expected error '%s' got '%s'", test.name, test.expectedStartErr, err.Error())
} else if err != nil && test.expectedStartErr == "" {
t.Fatalf("%s unexpected error '%s'", test.name, err)
} else if err == nil && test.expectedStartErr != "" {
t.Fatalf("%s expected error '%s' got none", test.name, err)
}
return nil
})
//let's empty output chan
tmb.Go(func() error {
for {
select {
case in := <-out:
log.Debugf("received event %+v", in)
rcvd_evts = append(rcvd_evts, in)
case <-tmb.Dying():
log.Debugf("pumper died")
return nil
}
}
})
if test.run != nil {
test.run(&cw)
} else {
dbgLogger.Warning("no run code")
}
time.Sleep(5 * time.Second)
dbgLogger.Infof("killing collector")
tmb.Kill(nil)
<-tmb.Dead()
dbgLogger.Infof("killing datasource")
actmb.Kill(nil)
<-actmb.Dead()
//dbgLogger.Infof("collected events : %d -> %+v", len(rcvd_evts), rcvd_evts)
//check results
if test.expectedResLen != -1 {
if test.expectedResLen != len(rcvd_evts) {
t.Fatalf("%s : expected %d results got %d -> %v", test.name, test.expectedResLen, len(rcvd_evts), rcvd_evts)
} else {
dbgLogger.Debugf("got %d expected messages", len(rcvd_evts))
}
}
if len(test.expectedResMessages) != 0 {
res := test.expectedResMessages
for idx, v := range rcvd_evts {
if len(res) == 0 {
t.Fatalf("result %d/%d : received '%s', didn't expect anything (recvd:%d, expected:%d)", idx, len(rcvd_evts), v.Line.Raw, len(rcvd_evts), len(test.expectedResMessages))
}
if res[0] != v.Line.Raw {
t.Fatalf("result %d/%d : expected '%s', received '%s' (recvd:%d, expected:%d)", idx, len(rcvd_evts), res[0], v.Line.Raw, len(rcvd_evts), len(test.expectedResMessages))
} else {
dbgLogger.Debugf("got message '%s'", res[0])
}
res = res[1:]
}
if len(res) != 0 {
t.Fatalf("leftover unmatched results : %v", res)
}
}
if test.post != nil {
test.post(&cw)
}
}
}
func TestConfiguration(t *testing.T) {
var err error
log.SetLevel(log.DebugLevel)
tests := []struct {
config []byte
expectedCfgErr string
expectedStartErr string
name string
}{
{
name: "group_does_not_exists",
config: []byte(`
source: cloudwatch
aws_region: us-east-1
labels:
type: test_source
group_name: test_group
stream_name: test_stream`),
expectedStartErr: "The specified log group does not exist",
},
{
config: []byte(`
xxx: cloudwatch
labels:
type: test_source
group_name: test_group
stream_name: test_stream`),
expectedCfgErr: "field xxx not found in type",
},
{
name: "missing_group_name",
config: []byte(`
source: cloudwatch
aws_region: us-east-1
labels:
type: test_source
stream_name: test_stream`),
expectedCfgErr: "group_name is mandatory for CloudwatchSource",
},
}
for idx, test := range tests {
dbgLogger := log.New().WithField("test", test.name)
dbgLogger.Logger.SetLevel(log.DebugLevel)
log.Printf("%d/%d", idx, len(tests))
cw := CloudwatchSource{}
err = cw.Configure(test.config, dbgLogger)
if err != nil && test.expectedCfgErr != "" {
if !strings.Contains(err.Error(), test.expectedCfgErr) {
t.Fatalf("%s expected error '%s' got error '%s'", test.name, test.expectedCfgErr, err.Error())
}
log.Debugf("got expected error : %s", err)
continue
} else if err != nil && test.expectedCfgErr == "" {
t.Fatalf("%s unexpected error : %s", test.name, err)
continue
} else if test.expectedCfgErr != "" && err == nil {
t.Fatalf("%s expected error '%s', got none", test.name, test.expectedCfgErr)
continue
}
out := make(chan types.Event)
tmb := tomb.Tomb{}
switch cw.GetMode() {
case "tail":
err = cw.StreamingAcquisition(out, &tmb)
case "cat":
err = cw.OneShotAcquisition(out, &tmb)
}
if err != nil && test.expectedStartErr != "" && !strings.Contains(err.Error(), test.expectedStartErr) {
t.Fatalf("%s expected error '%s' got '%s'", test.name, test.expectedStartErr, err.Error())
} else if err != nil && test.expectedStartErr == "" {
t.Fatalf("%s unexpected error '%s'", test.name, err)
} else if err == nil && test.expectedStartErr != "" {
t.Fatalf("%s expected error '%s' got none", test.name, err)
}
log.Debugf("killing ...")
tmb.Kill(nil)
<-tmb.Dead()
log.Debugf("dead :)")
}
}
func TestConfigureByDSN(t *testing.T) {
var err error
log.SetLevel(log.DebugLevel)
tests := []struct {
dsn string
labels map[string]string
expectedCfgErr string
name string
}{
{
name: "missing_query",
dsn: "cloudwatch://bad_log_group:bad_stream_name",
expectedCfgErr: "query is mandatory (at least start_date and end_date or backlog)",
},
{
name: "backlog",
dsn: "cloudwatch://bad_log_group:bad_stream_name?backlog=30m&log_level=info&profile=test",
//expectedCfgErr: "query is mandatory (at least start_date and end_date or backlog)",
},
{
name: "start_date/end_date",
dsn: "cloudwatch://bad_log_group:bad_stream_name?start_date=2021/05/15 14:04&end_date=2021/05/15 15:04",
//expectedCfgErr: "query is mandatory (at least start_date and end_date or backlog)",
},
{
name: "bad_log_level",
dsn: "cloudwatch://bad_log_group:bad_stream_name?backlog=4h&log_level=",
expectedCfgErr: "unknown level : not a valid logrus Level: ",
},
}
for idx, test := range tests {
dbgLogger := log.New().WithField("test", test.name)
dbgLogger.Logger.SetLevel(log.DebugLevel)
log.Printf("%d/%d", idx, len(tests))
cw := CloudwatchSource{}
err = cw.ConfigureByDSN(test.dsn, test.labels, dbgLogger)
if err != nil && test.expectedCfgErr != "" {
if !strings.Contains(err.Error(), test.expectedCfgErr) {
t.Fatalf("%s expected error '%s' got error '%s'", test.name, test.expectedCfgErr, err.Error())
}
log.Debugf("got expected error : %s", err)
continue
} else if err != nil && test.expectedCfgErr == "" {
t.Fatalf("%s unexpected error : %s", test.name, err)
continue
} else if test.expectedCfgErr != "" && err == nil {
t.Fatalf("%s expected error '%s', got none", test.name, test.expectedCfgErr)
continue
}
}
}
func TestOneShotAcquisition(t *testing.T) {
var err error
log.SetLevel(log.DebugLevel)
tests := []struct {
dsn string
expectedCfgErr string
expectedStartErr string
name string
pre func(*CloudwatchSource)
run func(*CloudwatchSource)
post func(*CloudwatchSource)
expectedResLen int
expectedResMessages []string
}{
//stream with no data
{
name: "empty_stream",
dsn: "cloudwatch://test_log_group1:test_stream?backlog=1h",
//expectedStartErr: "The specified log group does not exist",
pre: func(cw *CloudwatchSource) {
cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
LogGroupName: aws.String("test_log_group1"),
})
cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
LogGroupName: aws.String("test_log_group1"),
LogStreamName: aws.String("test_stream"),
})
},
post: func(cw *CloudwatchSource) {
cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
LogGroupName: aws.String("test_log_group1"),
})
},
expectedResLen: 0,
},
//stream with one event
{
name: "get_one_event",
dsn: "cloudwatch://test_log_group1:test_stream?backlog=1h",
//expectedStartErr: "The specified log group does not exist",
pre: func(cw *CloudwatchSource) {
if _, err := cw.cwClient.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
LogGroupName: aws.String("test_log_group1"),
}); err != nil {
t.Fatalf("error while CreateLogGroup")
}
if _, err := cw.cwClient.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
LogGroupName: aws.String("test_log_group1"),
LogStreamName: aws.String("test_stream"),
}); err != nil {
t.Fatalf("error while CreateLogStream")
}
//this one is too much in the back
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
LogGroupName: aws.String("test_log_group1"),
LogStreamName: aws.String("test_stream"),
LogEvents: []*cloudwatchlogs.InputLogEvent{
&cloudwatchlogs.InputLogEvent{
Message: aws.String("test_message_1"),
Timestamp: aws.Int64(time.Now().UTC().Add(-(2 * time.Hour)).UTC().Unix() * 1000),
},
},
}); err != nil {
log.Fatalf("failed to put logs")
}
//this one can be read
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
LogGroupName: aws.String("test_log_group1"),
LogStreamName: aws.String("test_stream"),
LogEvents: []*cloudwatchlogs.InputLogEvent{
&cloudwatchlogs.InputLogEvent{
Message: aws.String("test_message_2"),
Timestamp: aws.Int64(time.Now().UTC().Unix() * 1000),
},
},
}); err != nil {
log.Fatalf("failed to put logs")
}
//this one is in the past
if _, err := cw.cwClient.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
LogGroupName: aws.String("test_log_group1"),
LogStreamName: aws.String("test_stream"),
LogEvents: []*cloudwatchlogs.InputLogEvent{
&cloudwatchlogs.InputLogEvent{
Message: aws.String("test_message_3"),
Timestamp: aws.Int64(time.Now().UTC().Add(-(3 * time.Hour)).UTC().Unix() * 1000),
},
},
}); err != nil {
log.Fatalf("failed to put logs")
}
},
post: func(cw *CloudwatchSource) {
if _, err := cw.cwClient.DeleteLogGroup(&cloudwatchlogs.DeleteLogGroupInput{
LogGroupName: aws.String("test_log_group1"),
}); err != nil {
t.Fatalf("failed to delete")
}
},
expectedResLen: 1,
expectedResMessages: []string{"test_message_2"},
},
}
for _, test := range tests {
dbgLogger := log.New().WithField("test", test.name)
dbgLogger.Logger.SetLevel(log.DebugLevel)
dbgLogger.Infof("starting test")
cw := CloudwatchSource{}
err = cw.ConfigureByDSN(test.dsn, map[string]string{"type": "test"}, dbgLogger)
if err != nil && test.expectedCfgErr != "" {
if !strings.Contains(err.Error(), test.expectedCfgErr) {
t.Fatalf("%s expected error '%s' got error '%s'", test.name, test.expectedCfgErr, err.Error())
}
log.Debugf("got expected error : %s", err)
continue
} else if err != nil && test.expectedCfgErr == "" {
t.Fatalf("%s unexpected error : %s", test.name, err)
continue
} else if test.expectedCfgErr != "" && err == nil {
t.Fatalf("%s expected error '%s', got none", test.name, test.expectedCfgErr)
continue
}
dbgLogger.Infof("config done test")
//run pre-routine : tests use it to set group & streams etc.
if test.pre != nil {
test.pre(&cw)
}
out := make(chan types.Event)
tmb := tomb.Tomb{}
var rcvd_evts []types.Event
dbgLogger.Infof("running StreamingAcquisition")
actmb := tomb.Tomb{}
actmb.Go(func() error {
err := cw.OneShotAcquisition(out, &actmb)
dbgLogger.Infof("acquis done")
if err != nil && test.expectedStartErr != "" && !strings.Contains(err.Error(), test.expectedStartErr) {
t.Fatalf("%s expected error '%s' got '%s'", test.name, test.expectedStartErr, err.Error())
} else if err != nil && test.expectedStartErr == "" {
t.Fatalf("%s unexpected error '%s'", test.name, err)
} else if err == nil && test.expectedStartErr != "" {
t.Fatalf("%s expected error '%s' got none", test.name, err)
}
return nil
})
//let's empty output chan
tmb.Go(func() error {
for {
select {
case in := <-out:
log.Debugf("received event %+v", in)
rcvd_evts = append(rcvd_evts, in)
case <-tmb.Dying():
log.Debugf("pumper died")
return nil
}
}
})
if test.run != nil {
test.run(&cw)
} else {
dbgLogger.Warning("no run code")
}
time.Sleep(5 * time.Second)
dbgLogger.Infof("killing collector")
tmb.Kill(nil)
<-tmb.Dead()
dbgLogger.Infof("killing datasource")
actmb.Kill(nil)
dbgLogger.Infof("waiting datasource death")
<-actmb.Dead()
//check results
if test.expectedResLen != -1 {
if test.expectedResLen != len(rcvd_evts) {
t.Fatalf("%s : expected %d results got %d -> %v", test.name, test.expectedResLen, len(rcvd_evts), rcvd_evts)
} else {
dbgLogger.Debugf("got %d expected messages", len(rcvd_evts))
}
}
if len(test.expectedResMessages) != 0 {
res := test.expectedResMessages
for idx, v := range rcvd_evts {
if len(res) == 0 {
t.Fatalf("result %d/%d : received '%s', didn't expect anything (recvd:%d, expected:%d)", idx, len(rcvd_evts), v.Line.Raw, len(rcvd_evts), len(test.expectedResMessages))
}
if res[0] != v.Line.Raw {
t.Fatalf("result %d/%d : expected '%s', received '%s' (recvd:%d, expected:%d)", idx, len(rcvd_evts), res[0], v.Line.Raw, len(rcvd_evts), len(test.expectedResMessages))
} else {
dbgLogger.Debugf("got message '%s'", res[0])
}
res = res[1:]
}
if len(res) != 0 {
t.Fatalf("leftover unmatched results : %v", res)
}
}
if test.post != nil {
test.post(&cw)
}
}
}