2021-06-11 07:53:53 +00:00
package cloudwatchacquisition
import (
"context"
"fmt"
"net/url"
"os"
"regexp"
"strings"
2021-10-22 08:35:05 +00:00
"sync"
2021-06-11 07:53:53 +00:00
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
2022-11-30 16:36:56 +00:00
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
2021-06-11 07:53:53 +00:00
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
"gopkg.in/yaml.v2"
2022-11-30 16:36:56 +00:00
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/parser"
"github.com/crowdsecurity/crowdsec/pkg/types"
2021-06-11 07:53:53 +00:00
)
var openedStreams = prometheus . NewGaugeVec (
prometheus . GaugeOpts {
Name : "cs_cloudwatch_openstreams_total" ,
Help : "Number of opened stream within group." ,
} ,
[ ] string { "group" } ,
)
2021-10-22 08:35:05 +00:00
var streamIndexMutex = sync . Mutex { }
2021-06-11 07:53:53 +00:00
var linesRead = prometheus . NewCounterVec (
prometheus . CounterOpts {
Name : "cs_cloudwatch_stream_hits_total" ,
Help : "Number of event read from stream." ,
} ,
[ ] string { "group" , "stream" } ,
)
//CloudwatchSource is the runtime instance keeping track of N streams within 1 cloudwatch group
type CloudwatchSource struct {
Config CloudwatchSourceConfiguration
/*runtime stuff*/
logger * log . Entry
t * tomb . Tomb
cwClient * cloudwatchlogs . CloudWatchLogs
monitoredStreams [ ] * LogStreamTailConfig
streamIndexes map [ string ] string
}
//CloudwatchSourceConfiguration allows user to define one or more streams to monitor within a cloudwatch log group
type CloudwatchSourceConfiguration struct {
configuration . DataSourceCommonCfg ` yaml:",inline" `
GroupName string ` yaml:"group_name" ` //the group name to be monitored
StreamRegexp * string ` yaml:"stream_regexp,omitempty" ` //allow to filter specific streams
StreamName * string ` yaml:"stream_name,omitempty" `
StartTime , EndTime * time . Time ` yaml:"-" `
DescribeLogStreamsLimit * int64 ` yaml:"describelogstreams_limit,omitempty" ` //batch size for DescribeLogStreamsPagesWithContext
GetLogEventsPagesLimit * int64 ` yaml:"getlogeventspages_limit,omitempty" `
PollNewStreamInterval * time . Duration ` yaml:"poll_new_stream_interval,omitempty" ` //frequency at which we poll for new streams within the log group
MaxStreamAge * time . Duration ` yaml:"max_stream_age,omitempty" ` //monitor only streams that have been updated within $duration
PollStreamInterval * time . Duration ` yaml:"poll_stream_interval,omitempty" ` //frequency at which we poll each stream
StreamReadTimeout * time . Duration ` yaml:"stream_read_timeout,omitempty" ` //stop monitoring streams that haven't been updated within $duration, might be reopened later tho
AwsApiCallTimeout * time . Duration ` yaml:"aws_api_timeout,omitempty" `
AwsProfile * string ` yaml:"aws_profile,omitempty" `
PrependCloudwatchTimestamp * bool ` yaml:"prepend_cloudwatch_timestamp,omitempty" `
AwsConfigDir * string ` yaml:"aws_config_dir,omitempty" `
2021-11-02 09:25:35 +00:00
AwsRegion * string ` yaml:"aws_region,omitempty" `
2021-06-11 07:53:53 +00:00
}
//LogStreamTailConfig is the configuration for one given stream within one group
type LogStreamTailConfig struct {
GroupName string
StreamName string
GetLogEventsPagesLimit int64
PollStreamInterval time . Duration
StreamReadTimeout time . Duration
PrependCloudwatchTimestamp * bool
Labels map [ string ] string
logger * log . Entry
ExpectMode int
t tomb . Tomb
StartTime , EndTime time . Time //only used for CatMode
}
var (
def_DescribeLogStreamsLimit = int64 ( 50 )
def_PollNewStreamInterval = 10 * time . Second
def_MaxStreamAge = 5 * time . Minute
def_PollStreamInterval = 10 * time . Second
def_AwsApiCallTimeout = 10 * time . Second
def_StreamReadTimeout = 10 * time . Minute
def_PollDeadStreamInterval = 10 * time . Second
def_GetLogEventsPagesLimit = int64 ( 1000 )
2021-11-02 09:25:35 +00:00
def_AwsConfigDir = ""
2021-06-11 07:53:53 +00:00
)
2022-11-30 16:36:56 +00:00
func ( cw * CloudwatchSource ) UnmarshalConfig ( yamlConfig [ ] byte ) error {
cw . Config = CloudwatchSourceConfiguration { }
if err := yaml . UnmarshalStrict ( yamlConfig , & cw . Config ) ; err != nil {
return fmt . Errorf ( "cannot parse CloudwatchSource configuration: %w" , err )
2021-06-11 07:53:53 +00:00
}
2022-11-30 16:36:56 +00:00
2021-06-11 07:53:53 +00:00
if len ( cw . Config . GroupName ) == 0 {
return fmt . Errorf ( "group_name is mandatory for CloudwatchSource" )
}
2022-11-30 16:36:56 +00:00
2021-06-11 07:53:53 +00:00
if cw . Config . Mode == "" {
cw . Config . Mode = configuration . TAIL_MODE
}
if cw . Config . DescribeLogStreamsLimit == nil {
cw . Config . DescribeLogStreamsLimit = & def_DescribeLogStreamsLimit
}
2022-11-30 16:36:56 +00:00
2021-06-11 07:53:53 +00:00
if cw . Config . PollNewStreamInterval == nil {
cw . Config . PollNewStreamInterval = & def_PollNewStreamInterval
}
2022-11-30 16:36:56 +00:00
2021-06-11 07:53:53 +00:00
if cw . Config . MaxStreamAge == nil {
cw . Config . MaxStreamAge = & def_MaxStreamAge
}
2022-11-30 16:36:56 +00:00
2021-06-11 07:53:53 +00:00
if cw . Config . PollStreamInterval == nil {
cw . Config . PollStreamInterval = & def_PollStreamInterval
}
2022-11-30 16:36:56 +00:00
2021-06-11 07:53:53 +00:00
if cw . Config . StreamReadTimeout == nil {
cw . Config . StreamReadTimeout = & def_StreamReadTimeout
}
2022-11-30 16:36:56 +00:00
2021-06-11 07:53:53 +00:00
if cw . Config . GetLogEventsPagesLimit == nil {
cw . Config . GetLogEventsPagesLimit = & def_GetLogEventsPagesLimit
}
2022-11-30 16:36:56 +00:00
2021-06-11 07:53:53 +00:00
if cw . Config . AwsApiCallTimeout == nil {
cw . Config . AwsApiCallTimeout = & def_AwsApiCallTimeout
}
2022-11-30 16:36:56 +00:00
2021-06-11 07:53:53 +00:00
if cw . Config . AwsConfigDir == nil {
cw . Config . AwsConfigDir = & def_AwsConfigDir
}
2022-11-30 16:36:56 +00:00
return nil
}
func ( cw * CloudwatchSource ) Configure ( yamlConfig [ ] byte , logger * log . Entry ) error {
err := cw . UnmarshalConfig ( yamlConfig )
if err != nil {
return err
}
cw . logger = logger . WithField ( "group" , cw . Config . GroupName )
cw . logger . Debugf ( "Starting configuration for Cloudwatch group %s" , cw . Config . GroupName )
cw . logger . Tracef ( "describelogstreams_limit set to %d" , * cw . Config . DescribeLogStreamsLimit )
cw . logger . Tracef ( "poll_new_stream_interval set to %v" , * cw . Config . PollNewStreamInterval )
cw . logger . Tracef ( "max_stream_age set to %v" , * cw . Config . MaxStreamAge )
cw . logger . Tracef ( "poll_stream_interval set to %v" , * cw . Config . PollStreamInterval )
cw . logger . Tracef ( "stream_read_timeout set to %v" , * cw . Config . StreamReadTimeout )
cw . logger . Tracef ( "getlogeventspages_limit set to %v" , * cw . Config . GetLogEventsPagesLimit )
cw . logger . Tracef ( "aws_api_timeout set to %v" , * cw . Config . AwsApiCallTimeout )
if * cw . Config . MaxStreamAge > * cw . Config . StreamReadTimeout {
cw . logger . Warningf ( "max_stream_age > stream_read_timeout, stream might keep being opened/closed" )
}
cw . logger . Tracef ( "aws_config_dir set to %s" , * cw . Config . AwsConfigDir )
2021-11-02 09:25:35 +00:00
if * cw . Config . AwsConfigDir != "" {
_ , err := os . Stat ( * cw . Config . AwsConfigDir )
if err != nil {
2022-11-30 16:36:56 +00:00
cw . logger . Errorf ( "can't read aws_config_dir '%s' got err %s" , * cw . Config . AwsConfigDir , err )
2021-11-02 09:25:35 +00:00
return fmt . Errorf ( "can't read aws_config_dir %s got err %s " , * cw . Config . AwsConfigDir , err )
}
os . Setenv ( "AWS_SDK_LOAD_CONFIG" , "1" )
//as aws sdk relies on $HOME, let's allow the user to override it :)
os . Setenv ( "AWS_CONFIG_FILE" , fmt . Sprintf ( "%s/config" , * cw . Config . AwsConfigDir ) )
os . Setenv ( "AWS_SHARED_CREDENTIALS_FILE" , fmt . Sprintf ( "%s/credentials" , * cw . Config . AwsConfigDir ) )
} else {
if cw . Config . AwsRegion == nil {
2022-11-30 16:36:56 +00:00
cw . logger . Errorf ( "aws_region is not specified, specify it or aws_config_dir" )
2021-11-02 09:25:35 +00:00
return fmt . Errorf ( "aws_region is not specified, specify it or aws_config_dir" )
}
os . Setenv ( "AWS_REGION" , * cw . Config . AwsRegion )
2021-06-11 07:53:53 +00:00
}
2021-11-02 09:25:35 +00:00
2021-06-11 07:53:53 +00:00
if err := cw . newClient ( ) ; err != nil {
return err
}
cw . streamIndexes = make ( map [ string ] string )
2022-11-30 16:36:56 +00:00
targetStream := "*"
2021-06-11 07:53:53 +00:00
if cw . Config . StreamRegexp != nil {
if _ , err := regexp . Compile ( * cw . Config . StreamRegexp ) ; err != nil {
return errors . Wrapf ( err , "error while compiling regexp '%s'" , * cw . Config . StreamRegexp )
}
targetStream = * cw . Config . StreamRegexp
} else if cw . Config . StreamName != nil {
targetStream = * cw . Config . StreamName
}
2022-11-30 16:36:56 +00:00
cw . logger . Infof ( "Adding cloudwatch group '%s' (stream:%s) to datasources" , cw . Config . GroupName , targetStream )
2021-06-11 07:53:53 +00:00
return nil
}
func ( cw * CloudwatchSource ) newClient ( ) error {
var sess * session . Session
if cw . Config . AwsProfile != nil {
sess = session . Must ( session . NewSessionWithOptions ( session . Options {
SharedConfigState : session . SharedConfigEnable ,
Profile : * cw . Config . AwsProfile ,
} ) )
} else {
sess = session . Must ( session . NewSessionWithOptions ( session . Options {
SharedConfigState : session . SharedConfigEnable ,
} ) )
}
if sess == nil {
return fmt . Errorf ( "failed to create aws session" )
}
if v := os . Getenv ( "AWS_ENDPOINT_FORCE" ) ; v != "" {
cw . logger . Debugf ( "[testing] overloading endpoint with %s" , v )
cw . cwClient = cloudwatchlogs . New ( sess , aws . NewConfig ( ) . WithEndpoint ( v ) )
} else {
cw . cwClient = cloudwatchlogs . New ( sess )
}
if cw . cwClient == nil {
return fmt . Errorf ( "failed to create cloudwatch client" )
}
return nil
}
func ( cw * CloudwatchSource ) StreamingAcquisition ( out chan types . Event , t * tomb . Tomb ) error {
cw . t = t
monitChan := make ( chan LogStreamTailConfig )
t . Go ( func ( ) error {
return cw . LogStreamManager ( monitChan , out )
} )
return cw . WatchLogGroupForStreams ( monitChan )
}
func ( cw * CloudwatchSource ) GetMetrics ( ) [ ] prometheus . Collector {
return [ ] prometheus . Collector { linesRead , openedStreams }
}
func ( cw * CloudwatchSource ) GetAggregMetrics ( ) [ ] prometheus . Collector {
return [ ] prometheus . Collector { linesRead , openedStreams }
}
func ( cw * CloudwatchSource ) GetMode ( ) string {
return cw . Config . Mode
}
func ( cw * CloudwatchSource ) GetName ( ) string {
return "cloudwatch"
}
func ( cw * CloudwatchSource ) CanRun ( ) error {
return nil
}
func ( cw * CloudwatchSource ) Dump ( ) interface { } {
return cw
}
func ( cw * CloudwatchSource ) WatchLogGroupForStreams ( out chan LogStreamTailConfig ) error {
cw . logger . Debugf ( "Starting to watch group (interval:%s)" , cw . Config . PollNewStreamInterval )
ticker := time . NewTicker ( * cw . Config . PollNewStreamInterval )
var startFrom * string
for {
select {
case <- cw . t . Dying ( ) :
cw . logger . Infof ( "stopping group watch" )
return nil
case <- ticker . C :
hasMoreStreams := true
startFrom = nil
for hasMoreStreams {
cw . logger . Tracef ( "doing the call to DescribeLogStreamsPagesWithContext" )
ctx := context . Background ( )
//there can be a lot of streams in a group, and we're only interested in those recently written to, so we sort by LastEventTime
err := cw . cwClient . DescribeLogStreamsPagesWithContext (
ctx ,
& cloudwatchlogs . DescribeLogStreamsInput {
LogGroupName : aws . String ( cw . Config . GroupName ) ,
Descending : aws . Bool ( true ) ,
NextToken : startFrom ,
OrderBy : aws . String ( cloudwatchlogs . OrderByLastEventTime ) ,
Limit : cw . Config . DescribeLogStreamsLimit ,
} ,
func ( page * cloudwatchlogs . DescribeLogStreamsOutput , lastPage bool ) bool {
2022-10-26 13:11:37 +00:00
cw . logger . Tracef ( "in helper of DescribeLogStreamsPagesWithContext" )
2021-06-11 07:53:53 +00:00
for _ , event := range page . LogStreams {
startFrom = page . NextToken
//we check if the stream has been written to recently enough to be monitored
if event . LastIngestionTime != nil {
//aws uses millisecond since the epoch
oldest := time . Now ( ) . UTC ( ) . Add ( - * cw . Config . MaxStreamAge )
//TBD : verify that this is correct : Unix 2nd arg expects Nanoseconds, and have a code that is more explicit.
LastIngestionTime := time . Unix ( 0 , * event . LastIngestionTime * int64 ( time . Millisecond ) )
if LastIngestionTime . Before ( oldest ) {
2022-01-19 13:56:05 +00:00
cw . logger . Tracef ( "stop iteration, %s reached oldest age, stop (%s < %s)" , * event . LogStreamName , LastIngestionTime , time . Now ( ) . UTC ( ) . Add ( - * cw . Config . MaxStreamAge ) )
2021-06-11 07:53:53 +00:00
hasMoreStreams = false
return false
}
cw . logger . Tracef ( "stream %s is elligible for monitoring" , * event . LogStreamName )
2022-11-07 09:36:50 +00:00
//the stream has been updated recently, check if we should monitor it
2022-05-17 10:14:59 +00:00
var expectMode int
if ! cw . Config . UseTimeMachine {
expectMode = leaky . LIVE
} else {
expectMode = leaky . TIMEMACHINE
}
2021-06-11 07:53:53 +00:00
monitorStream := LogStreamTailConfig {
GroupName : cw . Config . GroupName ,
StreamName : * event . LogStreamName ,
GetLogEventsPagesLimit : * cw . Config . GetLogEventsPagesLimit ,
PollStreamInterval : * cw . Config . PollStreamInterval ,
StreamReadTimeout : * cw . Config . StreamReadTimeout ,
PrependCloudwatchTimestamp : cw . Config . PrependCloudwatchTimestamp ,
2022-05-17 10:14:59 +00:00
ExpectMode : expectMode ,
2021-06-11 07:53:53 +00:00
Labels : cw . Config . Labels ,
}
out <- monitorStream
}
}
if lastPage {
cw . logger . Tracef ( "reached last page" )
hasMoreStreams = false
}
return true
} ,
)
if err != nil {
newerr := errors . Wrapf ( err , "while describing group %s" , cw . Config . GroupName )
return newerr
}
cw . logger . Tracef ( "after DescribeLogStreamsPagesWithContext" )
}
}
}
}
2022-11-07 09:36:50 +00:00
//LogStreamManager receives the potential streams to monitor, and starts a go routine when needed
2021-06-11 07:53:53 +00:00
func ( cw * CloudwatchSource ) LogStreamManager ( in chan LogStreamTailConfig , outChan chan types . Event ) error {
cw . logger . Debugf ( "starting to monitor streams for %s" , cw . Config . GroupName )
pollDeadStreamInterval := time . NewTicker ( def_PollDeadStreamInterval )
for {
select {
2022-06-16 12:41:54 +00:00
case newStream := <- in : //nolint:govet // copylocks won't matter if the tomb is not initialized
2021-06-11 07:53:53 +00:00
shouldCreate := true
cw . logger . Tracef ( "received new streams to monitor : %s/%s" , newStream . GroupName , newStream . StreamName )
if cw . Config . StreamName != nil && newStream . StreamName != * cw . Config . StreamName {
cw . logger . Tracef ( "stream %s != %s" , newStream . StreamName , * cw . Config . StreamName )
continue
}
if cw . Config . StreamRegexp != nil {
2021-09-23 11:52:05 +00:00
match , err := regexp . Match ( * cw . Config . StreamRegexp , [ ] byte ( newStream . StreamName ) )
2021-06-11 07:53:53 +00:00
if err != nil {
cw . logger . Warningf ( "invalid regexp : %s" , err )
2022-11-07 09:36:50 +00:00
} else if ! match {
cw . logger . Tracef ( "stream %s doesn't match %s" , newStream . StreamName , * cw . Config . StreamRegexp )
continue
2021-06-11 07:53:53 +00:00
}
}
for idx , stream := range cw . monitoredStreams {
if newStream . GroupName == stream . GroupName && newStream . StreamName == stream . StreamName {
//stream exists, but is dead, remove it from list
if ! stream . t . Alive ( ) {
cw . logger . Debugf ( "stream %s already exists, but is dead" , newStream . StreamName )
cw . monitoredStreams = append ( cw . monitoredStreams [ : idx ] , cw . monitoredStreams [ idx + 1 : ] ... )
openedStreams . With ( prometheus . Labels { "group" : newStream . GroupName } ) . Dec ( )
break
}
shouldCreate = false
break
}
}
//let's start watching this stream
if shouldCreate {
openedStreams . With ( prometheus . Labels { "group" : newStream . GroupName } ) . Inc ( )
newStream . t = tomb . Tomb { }
newStream . logger = cw . logger . WithFields ( log . Fields { "stream" : newStream . StreamName } )
cw . logger . Debugf ( "starting tail of stream %s" , newStream . StreamName )
newStream . t . Go ( func ( ) error {
return cw . TailLogStream ( & newStream , outChan )
} )
cw . monitoredStreams = append ( cw . monitoredStreams , & newStream )
}
case <- pollDeadStreamInterval . C :
2021-10-22 08:35:05 +00:00
newMonitoredStreams := cw . monitoredStreams [ : 0 ]
2021-06-11 07:53:53 +00:00
for idx , stream := range cw . monitoredStreams {
if ! cw . monitoredStreams [ idx ] . t . Alive ( ) {
cw . logger . Debugf ( "remove dead stream %s" , stream . StreamName )
openedStreams . With ( prometheus . Labels { "group" : cw . monitoredStreams [ idx ] . GroupName } ) . Dec ( )
2021-10-22 08:35:05 +00:00
} else {
newMonitoredStreams = append ( newMonitoredStreams , stream )
2021-06-11 07:53:53 +00:00
}
}
2021-10-22 08:35:05 +00:00
cw . monitoredStreams = newMonitoredStreams
2021-06-11 07:53:53 +00:00
case <- cw . t . Dying ( ) :
cw . logger . Infof ( "LogStreamManager for %s is dying, %d alive streams" , cw . Config . GroupName , len ( cw . monitoredStreams ) )
for idx , stream := range cw . monitoredStreams {
if cw . monitoredStreams [ idx ] . t . Alive ( ) {
cw . logger . Debugf ( "killing stream %s" , stream . StreamName )
cw . monitoredStreams [ idx ] . t . Kill ( nil )
if err := cw . monitoredStreams [ idx ] . t . Wait ( ) ; err != nil {
cw . logger . Debugf ( "error while waiting for death of %s : %s" , stream . StreamName , err )
}
}
}
2021-10-22 08:35:05 +00:00
cw . monitoredStreams = nil
2021-06-11 07:53:53 +00:00
cw . logger . Debugf ( "routine cleanup done, return" )
return nil
}
}
}
func ( cw * CloudwatchSource ) TailLogStream ( cfg * LogStreamTailConfig , outChan chan types . Event ) error {
var startFrom * string
2022-11-07 09:36:50 +00:00
lastReadMessage := time . Now ( ) . UTC ( )
2021-06-11 07:53:53 +00:00
ticker := time . NewTicker ( cfg . PollStreamInterval )
//resume at existing index if we already had
2021-10-22 08:35:05 +00:00
streamIndexMutex . Lock ( )
v := cw . streamIndexes [ cfg . GroupName + "+" + cfg . StreamName ]
streamIndexMutex . Unlock ( )
if v != "" {
2021-06-11 07:53:53 +00:00
cfg . logger . Debugf ( "restarting on index %s" , v )
startFrom = & v
}
/ * during first run , we want to avoid reading any message , but just get a token .
if we don ' t , we might end up sending the same item several times . hence the ' startup ' hack * /
for {
select {
case <- ticker . C :
cfg . logger . Tracef ( "entering loop" )
hasMorePages := true
for hasMorePages {
/*for the first call, we only consume the last item*/
cfg . logger . Tracef ( "calling GetLogEventsPagesWithContext" )
ctx := context . Background ( )
err := cw . cwClient . GetLogEventsPagesWithContext ( ctx ,
& cloudwatchlogs . GetLogEventsInput {
2021-10-22 08:35:05 +00:00
Limit : aws . Int64 ( cfg . GetLogEventsPagesLimit ) ,
2021-06-11 07:53:53 +00:00
LogGroupName : aws . String ( cfg . GroupName ) ,
LogStreamName : aws . String ( cfg . StreamName ) ,
NextToken : startFrom ,
2021-10-22 08:35:05 +00:00
StartFromHead : aws . Bool ( true ) ,
2021-06-11 07:53:53 +00:00
} ,
func ( page * cloudwatchlogs . GetLogEventsOutput , lastPage bool ) bool {
cfg . logger . Tracef ( "%d results, last:%t" , len ( page . Events ) , lastPage )
startFrom = page . NextForwardToken
if page . NextForwardToken != nil {
2021-10-22 08:35:05 +00:00
streamIndexMutex . Lock ( )
2021-06-11 07:53:53 +00:00
cw . streamIndexes [ cfg . GroupName + "+" + cfg . StreamName ] = * page . NextForwardToken
2021-10-22 08:35:05 +00:00
streamIndexMutex . Unlock ( )
2021-06-11 07:53:53 +00:00
}
if lastPage { /*wait another ticker to check on new log availability*/
cfg . logger . Tracef ( "last page" )
hasMorePages = false
}
if len ( page . Events ) > 0 {
2022-01-19 13:56:05 +00:00
lastReadMessage = time . Now ( ) . UTC ( )
2021-06-11 07:53:53 +00:00
}
for _ , event := range page . Events {
evt , err := cwLogToEvent ( event , cfg )
if err != nil {
cfg . logger . Warningf ( "cwLogToEvent error, discarded event : %s" , err )
} else {
cfg . logger . Debugf ( "pushing message : %s" , evt . Line . Raw )
linesRead . With ( prometheus . Labels { "group" : cfg . GroupName , "stream" : cfg . StreamName } ) . Inc ( )
outChan <- evt
}
}
return true
} ,
)
if err != nil {
newerr := errors . Wrapf ( err , "while reading %s/%s" , cfg . GroupName , cfg . StreamName )
cfg . logger . Warningf ( "err : %s" , newerr )
return newerr
}
cfg . logger . Tracef ( "done reading GetLogEventsPagesWithContext" )
if time . Since ( lastReadMessage ) > cfg . StreamReadTimeout {
cfg . logger . Infof ( "%s/%s reached timeout (%s) (last message was %s)" , cfg . GroupName , cfg . StreamName , time . Since ( lastReadMessage ) ,
lastReadMessage )
return nil
}
}
case <- cfg . t . Dying ( ) :
cfg . logger . Infof ( "logstream tail stopping" )
return fmt . Errorf ( "killed" )
}
}
}
2021-11-17 09:08:46 +00:00
func ( cw * CloudwatchSource ) ConfigureByDSN ( dsn string , labels map [ string ] string , logger * log . Entry ) error {
2021-06-11 07:53:53 +00:00
cw . logger = logger
dsn = strings . TrimPrefix ( dsn , cw . GetName ( ) + "://" )
args := strings . Split ( dsn , "?" )
if len ( args ) != 2 {
return fmt . Errorf ( "query is mandatory (at least start_date and end_date or backlog)" )
}
frags := strings . Split ( args [ 0 ] , ":" )
if len ( frags ) != 2 {
return fmt . Errorf ( "cloudwatch path must contain group and stream : /my/group/name:stream/name" )
}
cw . Config . GroupName = frags [ 0 ]
cw . Config . StreamName = & frags [ 1 ]
2021-11-17 09:08:46 +00:00
cw . Config . Labels = labels
2021-06-11 07:53:53 +00:00
u , err := url . ParseQuery ( args [ 1 ] )
if err != nil {
return errors . Wrapf ( err , "while parsing %s" , dsn )
}
for k , v := range u {
switch k {
case "log_level" :
if len ( v ) != 1 {
return fmt . Errorf ( "expected zero or one value for 'log_level'" )
}
lvl , err := log . ParseLevel ( v [ 0 ] )
if err != nil {
return errors . Wrapf ( err , "unknown level %s" , v [ 0 ] )
}
cw . logger . Logger . SetLevel ( lvl )
case "profile" :
if len ( v ) != 1 {
return fmt . Errorf ( "expected zero or one value for 'profile'" )
}
awsprof := v [ 0 ]
cw . Config . AwsProfile = & awsprof
cw . logger . Debugf ( "profile set to '%s'" , * cw . Config . AwsProfile )
case "start_date" :
if len ( v ) != 1 {
return fmt . Errorf ( "expected zero or one argument for 'start_date'" )
}
//let's reuse our parser helper so that a ton of date formats are supported
strdate , startDate := parser . GenDateParse ( v [ 0 ] )
cw . logger . Debugf ( "parsed '%s' as '%s'" , v [ 0 ] , strdate )
cw . Config . StartTime = & startDate
case "end_date" :
if len ( v ) != 1 {
return fmt . Errorf ( "expected zero or one argument for 'end_date'" )
}
//let's reuse our parser helper so that a ton of date formats are supported
strdate , endDate := parser . GenDateParse ( v [ 0 ] )
cw . logger . Debugf ( "parsed '%s' as '%s'" , v [ 0 ] , strdate )
cw . Config . EndTime = & endDate
case "backlog" :
if len ( v ) != 1 {
return fmt . Errorf ( "expected zero or one argument for 'backlog'" )
}
//let's reuse our parser helper so that a ton of date formats are supported
duration , err := time . ParseDuration ( v [ 0 ] )
if err != nil {
return errors . Wrapf ( err , "unable to parse '%s' as duration" , v [ 0 ] )
}
cw . logger . Debugf ( "parsed '%s' as '%s'" , v [ 0 ] , duration )
start := time . Now ( ) . UTC ( ) . Add ( - duration )
cw . Config . StartTime = & start
end := time . Now ( ) . UTC ( )
cw . Config . EndTime = & end
default :
return fmt . Errorf ( "unexpected argument %s" , k )
}
}
cw . logger . Tracef ( "host=%s" , cw . Config . GroupName )
cw . logger . Tracef ( "stream=%s" , * cw . Config . StreamName )
cw . Config . GetLogEventsPagesLimit = & def_GetLogEventsPagesLimit
if err := cw . newClient ( ) ; err != nil {
return err
}
if cw . Config . StreamName == nil || cw . Config . GroupName == "" {
return fmt . Errorf ( "missing stream or group name" )
}
if cw . Config . StartTime == nil || cw . Config . EndTime == nil {
return fmt . Errorf ( "start_date and end_date or backlog are mandatory in one-shot mode" )
}
cw . Config . Mode = configuration . CAT_MODE
cw . streamIndexes = make ( map [ string ] string )
cw . t = & tomb . Tomb { }
return nil
}
func ( cw * CloudwatchSource ) OneShotAcquisition ( out chan types . Event , t * tomb . Tomb ) error {
//StreamName string, Start time.Time, End time.Time
config := LogStreamTailConfig {
GroupName : cw . Config . GroupName ,
StreamName : * cw . Config . StreamName ,
StartTime : * cw . Config . StartTime ,
EndTime : * cw . Config . EndTime ,
GetLogEventsPagesLimit : * cw . Config . GetLogEventsPagesLimit ,
logger : cw . logger . WithFields ( log . Fields {
"group" : cw . Config . GroupName ,
"stream" : * cw . Config . StreamName ,
} ) ,
Labels : cw . Config . Labels ,
ExpectMode : leaky . TIMEMACHINE ,
}
return cw . CatLogStream ( & config , out )
}
func ( cw * CloudwatchSource ) CatLogStream ( cfg * LogStreamTailConfig , outChan chan types . Event ) error {
var startFrom * string
var head = true
/*convert the times*/
startTime := cfg . StartTime . UTC ( ) . Unix ( ) * 1000
endTime := cfg . EndTime . UTC ( ) . Unix ( ) * 1000
hasMoreEvents := true
for hasMoreEvents {
select {
default :
cfg . logger . Tracef ( "Calling GetLogEventsPagesWithContext(%s, %s), startTime:%d / endTime:%d" ,
cfg . GroupName , cfg . StreamName , startTime , endTime )
cfg . logger . Tracef ( "startTime:%s / endTime:%s" , cfg . StartTime , cfg . EndTime )
if startFrom != nil {
cfg . logger . Tracef ( "next_token: %s" , * startFrom )
}
ctx := context . Background ( )
err := cw . cwClient . GetLogEventsPagesWithContext ( ctx ,
& cloudwatchlogs . GetLogEventsInput {
Limit : aws . Int64 ( 10 ) ,
LogGroupName : aws . String ( cfg . GroupName ) ,
LogStreamName : aws . String ( cfg . StreamName ) ,
StartTime : aws . Int64 ( startTime ) ,
EndTime : aws . Int64 ( endTime ) ,
StartFromHead : & head ,
NextToken : startFrom ,
} ,
func ( page * cloudwatchlogs . GetLogEventsOutput , lastPage bool ) bool {
cfg . logger . Tracef ( "in GetLogEventsPagesWithContext handker (%d events) (last:%t)" , len ( page . Events ) , lastPage )
for _ , event := range page . Events {
evt , err := cwLogToEvent ( event , cfg )
if err != nil {
cfg . logger . Warningf ( "discard event : %s" , err )
}
cfg . logger . Debugf ( "pushing message : %s" , evt . Line . Raw )
outChan <- evt
}
if startFrom != nil && * page . NextForwardToken == * startFrom {
cfg . logger . Debugf ( "reached end of available events" )
hasMoreEvents = false
return false
}
startFrom = page . NextForwardToken
return true
} ,
)
if err != nil {
return errors . Wrapf ( err , "while reading logs from %s/%s" , cfg . GroupName , cfg . StreamName )
}
cfg . logger . Tracef ( "after GetLogEventsPagesWithContext" )
case <- cw . t . Dying ( ) :
cfg . logger . Warningf ( "cat stream killed" )
return nil
}
}
cfg . logger . Tracef ( "CatLogStream out" )
return nil
}
func cwLogToEvent ( log * cloudwatchlogs . OutputLogEvent , cfg * LogStreamTailConfig ) ( types . Event , error ) {
l := types . Line { }
evt := types . Event { }
if log . Message == nil {
return evt , fmt . Errorf ( "nil message" )
}
msg := * log . Message
if cfg . PrependCloudwatchTimestamp != nil && * cfg . PrependCloudwatchTimestamp {
eventTimestamp := time . Unix ( 0 , * log . Timestamp * int64 ( time . Millisecond ) )
msg = eventTimestamp . String ( ) + " " + msg
}
l . Raw = msg
l . Labels = cfg . Labels
2022-01-19 13:56:05 +00:00
l . Time = time . Now ( ) . UTC ( )
2021-06-11 07:53:53 +00:00
l . Src = fmt . Sprintf ( "%s/%s" , cfg . GroupName , cfg . StreamName )
l . Process = true
l . Module = "cloudwatch"
evt . Line = l
evt . Process = true
evt . Type = types . LOG
evt . ExpectMode = cfg . ExpectMode
cfg . logger . Debugf ( "returned event labels : %+v" , evt . Line . Labels )
return evt , nil
}