diff --git a/.github/workflows/go-tests.yml b/.github/workflows/go-tests.yml index cdfc3af01..322bf8f42 100644 --- a/.github/workflows/go-tests.yml +++ b/.github/workflows/go-tests.yml @@ -55,6 +55,56 @@ jobs: --health-interval=10s --health-timeout=5s --health-retries=3 + zoo1: + image: confluentinc/cp-zookeeper:7.1.1 + ports: + - "2181:2181" + env: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_SERVER_ID: 1 + ZOOKEEPER_SERVERS: zoo1:2888:3888 + options: >- + --name=zoo1 + --health-cmd "jps -l | grep zookeeper" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + kafka1: + image: crowdsecurity/kafka-ssl + ports: + - "9093:9093" + - "9092:9092" + - "9999:9999" + env: + KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://127.0.0.1:19092,LISTENER_DOCKER_EXTERNAL://127.0.0.1:9092,LISTENER_DOCKER_EXTERNAL_SSL://127.0.0.1:9093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL_SSL:SSL + KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" + KAFKA_BROKER_ID: 1 + KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_JMX_PORT: 9999 + KAFKA_JMX_HOSTNAME: "127.0.0.1" + KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" + KAFKA_SSL_KEYSTORE_FILENAME: kafka.kafka1.keystore.jks + KAFKA_SSL_KEYSTORE_CREDENTIALS: kafka1_keystore_creds + KAFKA_SSL_KEY_CREDENTIALS: kafka1_sslkey_creds + KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.kafka1.truststore.jks + KAFKA_SSL_TRUSTSTORE_CREDENTIALS: kafka1_truststore_creds + KAFKA_SSL_ENABLED_PROTOCOLS: TLSv1.2 + KAFKA_SSL_PROTOCOL: TLSv1.2 + KAFKA_SSL_CLIENT_AUTH: none + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + options: >- + --name=kafka1 + --health-cmd "kafka-broker-api-versions --version" + --health-interval 10s + --health-timeout 10s + --health-retries 5 steps: diff --git a/go.mod b/go.mod index 6ce66ac5f..bf84257a9 100644 --- a/go.mod +++ b/go.mod @@ -57,7 +57,7 @@ require ( github.com/r3labs/diff/v2 v2.14.1 github.com/sirupsen/logrus v1.8.1 github.com/spf13/cobra v1.4.0 - github.com/stretchr/testify v1.7.1-0.20210427113832-6241f9ab9942 + github.com/stretchr/testify v1.7.1 golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 google.golang.org/grpc v1.45.0 @@ -126,6 +126,7 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect + github.com/klauspost/compress v1.15.7 // indirect github.com/leodido/go-urn v1.2.1 // indirect github.com/mailru/easyjson v0.7.6 // indirect github.com/mattn/go-colorable v0.1.12 // indirect @@ -146,12 +147,14 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2-0.20211117181255-693428a734f5 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/common v0.30.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/segmentio/kafka-go v0.4.32 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/texttheater/golang-levenshtein/levenshtein v0.0.0-20200805054039-cae8b0eaed6c // indirect github.com/tidwall/gjson v1.13.0 // indirect @@ -167,7 +170,7 @@ require ( google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20220414192740-2d67ff6cf2b4 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect - gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect + gopkg.in/yaml.v3 v3.0.0-20220512140231-539c8e751b99 // indirect ) replace golang.org/x/time/rate => github.com/crowdsecurity/crowdsec/pkg/time/rate v0.0.0 diff --git a/go.sum b/go.sum index c2b01714d..135a811f1 100644 --- a/go.sum +++ b/go.sum @@ -513,6 +513,9 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.15.7 h1:7cgTQxJCU/vy+oP/E3B9RGbQTgbiVzIJWIKOLoAsPok= +github.com/klauspost/compress v1.15.7/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -632,6 +635,9 @@ github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhM github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= +github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -687,6 +693,8 @@ github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sanity-io/litter v1.2.0/go.mod h1:JF6pZUFgu2Q0sBZ+HSV35P8TVPI1TTzEwyu9FXAw2W4= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/segmentio/kafka-go v0.4.32 h1:Ohr+9E+kDv/Ld2UPJN9hnKZRd2qgiqCmI8v2e1qlfLM= +github.com/segmentio/kafka-go v0.4.32/go.mod h1:JAPPIiY3MQIwVHj64CWOP0LsFFfQ7H0w69kuoxnMIS0= github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= @@ -721,6 +729,8 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1-0.20210427113832-6241f9ab9942 h1:t0lM6y/M5IiUZyvbBTcngso8SZEZICH7is9B6g/obVU= github.com/stretchr/testify v1.7.1-0.20210427113832-6241f9ab9942/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/texttheater/golang-levenshtein/levenshtein v0.0.0-20200805054039-cae8b0eaed6c h1:HelZ2kAFadG0La9d+4htN4HzQ68Bm2iM9qKMSMES6xg= github.com/texttheater/golang-levenshtein/levenshtein v0.0.0-20200805054039-cae8b0eaed6c/go.mod h1:JlzghshsemAMDGZLytTFY8C1JQxQPhnatWqNwUXjggo= github.com/tidwall/gjson v1.12.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= @@ -749,6 +759,7 @@ github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+ github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -791,6 +802,7 @@ golang.org/x/crypto v0.0.0-20190320223903-b7391e95e576/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= golang.org/x/crypto v0.0.0-20190422162423-af44ce270edf/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -1165,6 +1177,8 @@ gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20220512140231-539c8e751b99 h1:dbuHpmKjkDzSOMKAWl10QNlgaZUd3V1q99xc81tt2Kc= +gopkg.in/yaml.v3 v3.0.0-20220512140231-539c8e751b99/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= gotest.tools/v3 v3.0.3 h1:4AuOwCGf4lLR9u3YOe2awrHygurzhO/HeQ6laiA6Sx0= gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8= diff --git a/pkg/acquisition/acquisition.go b/pkg/acquisition/acquisition.go index 93c956538..286f3cbc9 100644 --- a/pkg/acquisition/acquisition.go +++ b/pkg/acquisition/acquisition.go @@ -11,6 +11,7 @@ import ( dockeracquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/docker" fileacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/file" journalctlacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/journalctl" + kafkaacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/kafka" kinesisacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/kinesis" syslogacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog" wineventlogacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/wineventlog" @@ -70,6 +71,10 @@ var AcquisitionSources = []struct { name: "wineventlog", iface: func() DataSource { return &wineventlogacquisition.WinEventLogSource{} }, }, + { + name: "kafka", + iface: func() DataSource { return &kafkaacquisition.KafkaSource{} }, + }, } func GetDataSourceIface(dataSourceType string) DataSource { diff --git a/pkg/acquisition/modules/kafka/kafka.go b/pkg/acquisition/modules/kafka/kafka.go new file mode 100644 index 000000000..147f64f18 --- /dev/null +++ b/pkg/acquisition/modules/kafka/kafka.go @@ -0,0 +1,242 @@ +package kafkaacquisition + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "io" + "io/ioutil" + "strconv" + "time" + + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" + "github.com/crowdsecurity/crowdsec/pkg/leakybucket" + "github.com/crowdsecurity/crowdsec/pkg/types" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/segmentio/kafka-go" + log "github.com/sirupsen/logrus" + "gopkg.in/tomb.v2" + "gopkg.in/yaml.v2" +) + +var ( + dataSourceName = "kafka" +) + +var linesRead = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "cs_kafkasource_hits_total", + Help: "Total lines that were read from topic", + }, + []string{"topic"}) + +type KafkaConfiguration struct { + Brokers []string `yaml:"brokers"` + Topic string `yaml:"topic"` + GroupID string `yaml:"group_id"` + Timeout string `yaml:"timeout"` + TLS *TLSConfig `yaml:"tls"` + configuration.DataSourceCommonCfg `yaml:",inline"` +} + +type TLSConfig struct { + InsecureSkipVerify bool `yaml:"insecure_skip_verify"` + ClientCert string `yaml:"client_cert"` + ClientKey string `yaml:"client_key"` + CaCert string `yaml:"ca_cert"` +} + +type KafkaSource struct { + Config KafkaConfiguration + logger *log.Entry + Reader *kafka.Reader +} + +func (k *KafkaSource) Configure(Config []byte, logger *log.Entry) error { + var err error + + k.Config = KafkaConfiguration{} + k.logger = logger + err = yaml.UnmarshalStrict(Config, &k.Config) + if err != nil { + return errors.Wrapf(err, "cannot parse %s datasource configuration", dataSourceName) + } + if len(k.Config.Brokers) == 0 { + return fmt.Errorf("cannot create a %s reader with an empty list of broker addresses", dataSourceName) + } + if k.Config.Topic == "" { + return fmt.Errorf("cannot create a %s reader with am empty topic", dataSourceName) + } + if k.Config.Mode == "" { + k.Config.Mode = configuration.TAIL_MODE + } + dialer, err := k.Config.NewDialer() + if err != nil { + return errors.Wrapf(err, "cannot create %s dialer", dataSourceName) + } + k.Reader, err = k.Config.NewReader(dialer) + if err != nil { + return errors.Wrapf(err, "cannote create %s reader", dataSourceName) + } + if k.Reader == nil { + return fmt.Errorf("cannot create %s reader", dataSourceName) + } + return nil +} + +func (k *KafkaSource) ConfigureByDSN(string, map[string]string, *log.Entry) error { + return fmt.Errorf("%s datasource does not support command-line acquisition", dataSourceName) +} + +func (k *KafkaSource) GetMode() string { + return k.Config.Mode +} + +func (k *KafkaSource) GetName() string { + return dataSourceName +} + +func (k *KafkaSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error { + return fmt.Errorf("%s datasource does not support one-shot acquisition", dataSourceName) +} + +func (k *KafkaSource) CanRun() error { + return nil +} + +func (k *KafkaSource) GetMetrics() []prometheus.Collector { + return []prometheus.Collector{linesRead} +} + +func (k *KafkaSource) GetAggregMetrics() []prometheus.Collector { + return []prometheus.Collector{linesRead} +} + +func (k *KafkaSource) Dump() interface{} { + return k +} + +func (k *KafkaSource) ReadMessage(out chan types.Event) error { + // Start processing from latest Offset + k.Reader.SetOffsetAt(context.Background(), time.Now()) + for { + m, err := k.Reader.ReadMessage(context.Background()) + if err != nil { + if err == io.EOF { + return nil + } + k.logger.Errorln(errors.Wrapf(err, "while reading %s message", dataSourceName)) + } + l := types.Line{ + Raw: string(m.Value), + Labels: k.Config.Labels, + Time: m.Time.UTC(), + Src: k.Config.Topic, + Process: true, + Module: k.GetName(), + } + linesRead.With(prometheus.Labels{"topic": k.Config.Topic}).Inc() + var evt types.Event + + if !k.Config.UseTimeMachine { + evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leakybucket.LIVE} + } else { + evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leakybucket.TIMEMACHINE} + } + out <- evt + } +} + +func (k *KafkaSource) RunReader(out chan types.Event, t *tomb.Tomb) error { + t.Go(func() error { + return k.ReadMessage(out) + }) + //nolint //fp + for { + select { + case <-t.Dying(): + k.logger.Infof("%s datasource topic %s stopping", dataSourceName, k.Config.Topic) + if err := k.Reader.Close(); err != nil { + return errors.Wrapf(err, "while closing %s reader on topic '%s'", dataSourceName, k.Config.Topic) + } + return nil + } + } +} + +func (k *KafkaSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error { + k.logger.Infof("start reader on topic '%s'", k.Config.Topic) + + t.Go(func() error { + defer types.CatchPanic("crowdsec/acquis/kafka/live") + return k.RunReader(out, t) + }) + + return nil +} + +func (kc *KafkaConfiguration) NewTLSConfig() (*tls.Config, error) { + tlsConfig := tls.Config{ + InsecureSkipVerify: kc.TLS.InsecureSkipVerify, + } + + cert, err := tls.LoadX509KeyPair(kc.TLS.ClientCert, kc.TLS.ClientKey) + if err != nil { + return &tlsConfig, err + } + tlsConfig.Certificates = []tls.Certificate{cert} + + caCert, err := ioutil.ReadFile(kc.TLS.CaCert) + if err != nil { + return &tlsConfig, err + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + tlsConfig.RootCAs = caCertPool + + tlsConfig.BuildNameToCertificate() + return &tlsConfig, err +} + +func (kc *KafkaConfiguration) NewDialer() (*kafka.Dialer, error) { + dialer := &kafka.Dialer{} + var timeoutDuration time.Duration + timeoutDuration = time.Duration(10) * time.Second + if kc.Timeout != "" { + intTimeout, err := strconv.Atoi(kc.Timeout) + if err != nil { + return dialer, err + } + timeoutDuration = time.Duration(intTimeout) * time.Second + } + dialer = &kafka.Dialer{ + Timeout: timeoutDuration, + DualStack: true, + } + + if kc.TLS != nil { + tlsConfig, err := kc.NewTLSConfig() + if err != nil { + return dialer, err + } + dialer.TLS = tlsConfig + } + return dialer, nil +} + +func (kc *KafkaConfiguration) NewReader(dialer *kafka.Dialer) (*kafka.Reader, error) { + rConf := kafka.ReaderConfig{ + Brokers: kc.Brokers, + Topic: kc.Topic, + Dialer: dialer, + } + if kc.GroupID != "" { + rConf.GroupID = kc.GroupID + } + if err := rConf.Validate(); err != nil { + return &kafka.Reader{}, errors.Wrapf(err, "while validating reader configuration") + } + return kafka.NewReader(rConf), nil +} diff --git a/pkg/acquisition/modules/kafka/kafka_test.go b/pkg/acquisition/modules/kafka/kafka_test.go new file mode 100644 index 000000000..eb1992d3a --- /dev/null +++ b/pkg/acquisition/modules/kafka/kafka_test.go @@ -0,0 +1,253 @@ +package kafkaacquisition + +import ( + "context" + "net" + "strconv" + "testing" + "time" + + "github.com/crowdsecurity/crowdsec/pkg/cstest" + "github.com/crowdsecurity/crowdsec/pkg/types" + "github.com/segmentio/kafka-go" + log "github.com/sirupsen/logrus" + "gopkg.in/tomb.v2" + "gotest.tools/v3/assert" +) + +func TestConfigure(t *testing.T) { + tests := []struct { + config string + expectedErr string + }{ + { + config: ` +foobar: bla +source: kafka`, + expectedErr: "line 2: field foobar not found in type kafkaacquisition.KafkaConfiguration", + }, + { + config: `source: kafka`, + expectedErr: "cannot create a kafka reader with an empty list of broker addresses", + }, + { + config: ` +source: kafka +brokers: + - bla +timeout: 5`, + expectedErr: "cannot create a kafka reader with am empty topic", + }, + { + config: ` +source: kafka +brokers: + - bla +topic: toto +timeout: aa`, + expectedErr: "cannot create kafka dialer: strconv.Atoi: parsing \"aa\": invalid syntax", + }, + { + config: ` +source: kafka +brokers: + - localhost:9092 +topic: crowdsec`, + expectedErr: "", + }, + } + + subLogger := log.WithFields(log.Fields{ + "type": "kafka", + }) + for _, test := range tests { + k := KafkaSource{} + err := k.Configure([]byte(test.config), subLogger) + cstest.AssertErrorContains(t, err, test.expectedErr) + } +} + +func writeToKafka(w *kafka.Writer, logs []string) { + + for idx, log := range logs { + err := w.WriteMessages(context.Background(), kafka.Message{ + Key: []byte(strconv.Itoa(idx)), + // create an arbitrary message payload for the value + Value: []byte(log), + }) + if err != nil { + panic("could not write message " + err.Error()) + } + } +} + +func createTopic(topic string, broker string) { + conn, err := kafka.Dial("tcp", broker) + if err != nil { + panic(err.Error()) + } + defer conn.Close() + + controller, err := conn.Controller() + if err != nil { + panic(err.Error()) + } + var controllerConn *kafka.Conn + controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + if err != nil { + panic(err.Error()) + } + defer controllerConn.Close() + + topicConfigs := []kafka.TopicConfig{ + { + Topic: topic, + NumPartitions: 1, + ReplicationFactor: 1, + }, + } + + err = controllerConn.CreateTopics(topicConfigs...) + if err != nil { + panic(err.Error()) + } +} + +func TestStreamingAcquisition(t *testing.T) { + tests := []struct { + name string + logs []string + expectedLines int + expectedErr string + }{ + { + name: "valid msgs", + logs: []string{ + "message 1", + "message 2", + "message 3", + }, + expectedLines: 3, + }, + } + + subLogger := log.WithFields(log.Fields{ + "type": "kafka", + }) + + createTopic("crowdsecplaintext", "localhost:9092") + + w := kafka.NewWriter(kafka.WriterConfig{ + Brokers: []string{"localhost:9092"}, + Topic: "crowdsecplaintext", + }) + if w == nil { + log.Fatalf("Unable to setup a kafka producer") + } + + for _, ts := range tests { + t.Run(ts.name, func(t *testing.T) { + k := KafkaSource{} + err := k.Configure([]byte(` +source: kafka +brokers: + - localhost:9092 +topic: crowdsecplaintext`), subLogger) + if err != nil { + t.Fatalf("could not configure kafka source : %s", err) + } + tomb := tomb.Tomb{} + out := make(chan types.Event) + err = k.StreamingAcquisition(out, &tomb) + cstest.AssertErrorContains(t, err, ts.expectedErr) + + actualLines := 0 + go writeToKafka(w, ts.logs) + READLOOP: + for { + select { + case <-out: + actualLines++ + case <-time.After(2 * time.Second): + break READLOOP + } + } + assert.Equal(t, ts.expectedLines, actualLines) + tomb.Kill(nil) + tomb.Wait() + }) + } + +} + +func TestStreamingAcquisitionWithSSL(t *testing.T) { + tests := []struct { + name string + logs []string + expectedLines int + expectedErr string + }{ + { + name: "valid msgs", + logs: []string{ + "message 1", + "message 2", + }, + expectedLines: 2, + }, + } + + subLogger := log.WithFields(log.Fields{ + "type": "kafka", + }) + + createTopic("crowdsecssl", "localhost:9092") + + w2 := kafka.NewWriter(kafka.WriterConfig{ + Brokers: []string{"localhost:9092"}, + Topic: "crowdsecssl", + }) + if w2 == nil { + log.Fatalf("Unable to setup a kafka producer") + } + + for _, ts := range tests { + t.Run(ts.name, func(t *testing.T) { + k := KafkaSource{} + err := k.Configure([]byte(` +source: kafka +brokers: + - localhost:9093 +topic: crowdsecssl +tls: + insecure_skip_verify: true + client_cert: ./testdata/kafkaClient.certificate.pem + client_key: ./testdata/kafkaClient.key + ca_cert: ./testdata/snakeoil-ca-1.crt + `), subLogger) + if err != nil { + t.Fatalf("could not configure kafka source : %s", err) + } + tomb := tomb.Tomb{} + out := make(chan types.Event) + err = k.StreamingAcquisition(out, &tomb) + cstest.AssertErrorContains(t, err, ts.expectedErr) + + actualLines := 0 + go writeToKafka(w2, ts.logs) + READLOOP: + for { + select { + case <-out: + actualLines++ + case <-time.After(2 * time.Second): + break READLOOP + } + } + assert.Equal(t, ts.expectedLines, actualLines) + tomb.Kill(nil) + tomb.Wait() + }) + } + +} diff --git a/pkg/acquisition/modules/kafka/testdata/kafkaClient.certificate.pem b/pkg/acquisition/modules/kafka/testdata/kafkaClient.certificate.pem new file mode 100644 index 000000000..b8e491df1 --- /dev/null +++ b/pkg/acquisition/modules/kafka/testdata/kafkaClient.certificate.pem @@ -0,0 +1,23 @@ +-----BEGIN CERTIFICATE----- +MIIDyDCCArCgAwIBAgIUZ3H0cvKHzTXDByikP2PLvhqCTSwwDQYJKoZIhvcNAQEL +BQAwbzEZMBcGA1UEAwwQQ3Jvd2RTZWMgVGVzdCBDQTERMA8GA1UECwwIQ3Jvd2Rz +ZWMxETAPBgNVBAoMCENyb3dkc2VjMQ4wDAYDVQQHDAVQYXJpczEPMA0GA1UECAwG +RnJhbmNlMQswCQYDVQQGEwJGUjAeFw0yMjA4MDExNjA5NDJaFw00OTEyMTYxNjA5 +NDJaMGYxCzAJBgNVBAYTAkZSMQ8wDQYDVQQIEwZGcmFuY2UxDjAMBgNVBAcTBVBh +cmlzMREwDwYDVQQKEwhDcm93ZHNlYzENMAsGA1UECxMEVEVTVDEUMBIGA1UEAxML +a2Fma2FDbGllbnQwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCG5R6r +xi9FcL6p8bD5+bpV3bTDNwRTF4b9psrhVY8MhvjvaoYODHvENJaDb3Z/ipDUdG6e +zjgigfjLRRwxvj2+E0nTn/TsfRQIlH2BYPEzCCrG33WKkcmG1K3LEbkXGyBcPljd +DPHb2nbZERDFBcIlqNM5N9+cbLFQnJNw3u7Nsv/e4jjUpAeYg30YVKwrr9P4mj8L +NR+ZALe0+2NUJpTYX0ZP4vPeYqTGWPshMrGHLrChhYnaYWzEvYITjVtzkHk9xtFg +uRGjgtwlpf0m2EM8GHhteHaLb2efU1C860QaFkTBK1JeGU1A2O6O8lOo8CMVG6h+ +IBA1kspGRO0wmix5AgMBAAGjZTBjMCEGA1UdEQQaMBiCC2thZmthQ2xpZW50ggls +b2NhbGhvc3QwHQYDVR0OBBYEFD/QYQ2ppLhrC8qnfSjEsppvI/NPMB8GA1UdIwQY +MBaAFCCtzZtp2uUwxDCvIf8ETMpCtLxzMA0GCSqGSIb3DQEBCwUAA4IBAQA7Bly4 +t1ob95om3h+d9tYOuqzERUhO9BZXjqGFmOxb4pmpg5ANa9j82VOy0PWvBPR4M6N5 +uHwUKj6S4HWDLpabNNsBWYUzILBBQDqkiKgy0NmakZjv2fbFSIEpZF8sfyL3Z/ci +JRo6SqZWILh7B2BqysLmgTJeRFode3zqIKhLPIqYqEDBCwgSL1quX0afut2q86lx +x2RJB/N8QsNfXSjTOojXY3cJzLdW4XKGZKk75YhlpYt+v5235paVbocz32diQczk +9yCqfJfG8BBNA6WdPgtLhQHiDLO7UY+Y+jGIe2G41w7adT/b2Omeb2h3RbGeqKx9 +WteVlQb955ItDXKI +-----END CERTIFICATE----- diff --git a/pkg/acquisition/modules/kafka/testdata/kafkaClient.key b/pkg/acquisition/modules/kafka/testdata/kafkaClient.key new file mode 100644 index 000000000..23059ef9d --- /dev/null +++ b/pkg/acquisition/modules/kafka/testdata/kafkaClient.key @@ -0,0 +1,32 @@ +Bag Attributes + friendlyName: kafkaclient + localKeyID: 54 69 6D 65 20 31 36 35 39 33 37 30 31 38 36 30 33 36 +Key Attributes: +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCG5R6rxi9FcL6p +8bD5+bpV3bTDNwRTF4b9psrhVY8MhvjvaoYODHvENJaDb3Z/ipDUdG6ezjgigfjL +RRwxvj2+E0nTn/TsfRQIlH2BYPEzCCrG33WKkcmG1K3LEbkXGyBcPljdDPHb2nbZ +ERDFBcIlqNM5N9+cbLFQnJNw3u7Nsv/e4jjUpAeYg30YVKwrr9P4mj8LNR+ZALe0 ++2NUJpTYX0ZP4vPeYqTGWPshMrGHLrChhYnaYWzEvYITjVtzkHk9xtFguRGjgtwl +pf0m2EM8GHhteHaLb2efU1C860QaFkTBK1JeGU1A2O6O8lOo8CMVG6h+IBA1kspG +RO0wmix5AgMBAAECggEAYLrdwomMDjxpzI2OvcJQ1w/zdmT2Ses+FpfLNchXmsES +swPs+xgCjFC1eaytCYpAjsirJl90K3KOCJ0XOahUt/822nUCea67deedE/CDJXf+ +zLsim2otW+0YbtzXn/UIwHzI1kJZELFYthEhuFaHwN+OD6K8S3w5rjeJFtAV6BQc +AzMAwQ+6j8DG/V+5mgp4YCrcGXmJzvdXqJdiBmFwoEOAdp/ahTjMscmyI0ZQdMpF +t8e8x5WDVT65ScUA6nvKkSttYhJ5qpEWiMerR8rBJbkbNi529fUW7/sX1xTMYI5G +psOxGdXGSyH2i365DtFxHhtovSc+TgWpNJgZDcJLLQKBgQDdWi4sJO985m1hps7m +bKiXLJE/gbSYQekIlt4/KGbY01BrKSSvgsVTIZbC/m8GhAG48uLG1y51R30cGx6g +Wg8616duqnq+P7pvw+hWsGrtzYh4URx6T91SJi313Xi5ouqLsNiMznPoLXEnDgJv +xO17TkCnThU/Ms6ml1PFeccAYwKBgQCcAoFVqibXRyFoD2UVMpgekOQ+QxBZXItm +RoiBhimEahhx6DjJ9GPmAWUTJfAMom0TNoYa/ziM/+VdNruUMRUvAqBHV2vqllYE +Szhfxlh0RyCiZzrgEqZLVMdr0vxbeA4e5D2+26NH0YHGqCVacdX5659bSM5hcP/s +WO/fGIcAcwKBgQCIKv3UcjRRZX9MX01QOu/Lb8M6ihQKxiERA55cxAHgyL3j7j9/ +KLcy2Krh8ZtjKrnUiLYxFBakVwID1DsW8dAEpr19Oqqfdpy+PIolKgndmF6nhV47 +b/36lzoW0dN+f1ZB8NyGYkqzPaEqIVgmYcKl5BGp2kL/ycWOffEuvidJeQKBgHls +eb1y8Ugc1KNpWANnnX4sx3iuOctTfCveOeCVyzqEWQJO++Qzko0yCLkejfdHdB3A +EiBxBFK52Ir0TorIqPQt1xGvuQ6cc9ZjtTzV44Kc2YmNTwWXflajZZNGY6PNjS/9 +9RDXYf5D0f4MYQZEE4axHRavU/IDQS1zCz9Yl7qBAoGAezrd5EziH9VNHMzPnfkH ++Lg2DCrRbyG1pHpuKJg2i98Ulwuu/9A5m5Vj1iYrANt9v4ycWimLikyd5vJXW60V +9PBb8FB/vjpXNJ1PZBGjlxgpWpzF13JGcJpFBK+z5yCPevzJlc/H+IAQbd3mS3WW +DDwAGG2L41aLYKmsjAtr76I= +-----END PRIVATE KEY----- diff --git a/pkg/acquisition/modules/kafka/testdata/snakeoil-ca-1.crt b/pkg/acquisition/modules/kafka/testdata/snakeoil-ca-1.crt new file mode 100644 index 000000000..1205eafce --- /dev/null +++ b/pkg/acquisition/modules/kafka/testdata/snakeoil-ca-1.crt @@ -0,0 +1,23 @@ +-----BEGIN CERTIFICATE----- +MIIDvzCCAqegAwIBAgIUSc+OZ8EjaDgzm0sjqlHVXjZ0od0wDQYJKoZIhvcNAQEL +BQAwbzEZMBcGA1UEAwwQQ3Jvd2RTZWMgVGVzdCBDQTERMA8GA1UECwwIQ3Jvd2Rz +ZWMxETAPBgNVBAoMCENyb3dkc2VjMQ4wDAYDVQQHDAVQYXJpczEPMA0GA1UECAwG +RnJhbmNlMQswCQYDVQQGEwJGUjAeFw0yMjA4MDExNjA5MzZaFw0yMzA4MDExNjA5 +MzZaMG8xGTAXBgNVBAMMEENyb3dkU2VjIFRlc3QgQ0ExETAPBgNVBAsMCENyb3dk +c2VjMREwDwYDVQQKDAhDcm93ZHNlYzEOMAwGA1UEBwwFUGFyaXMxDzANBgNVBAgM +BkZyYW5jZTELMAkGA1UEBhMCRlIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK +AoIBAQDAzj1gJzEhzcymL9dZX6+dTLdi9RDoII7PWtYCIoY5tqvewOzVBZMEDEhG +az8Btwo6Edr7u804Zule9ZVSaTkmse+thNthukXrtmTEuOuienym5KkVddNckTtr +w/5MLMkKK0Ux45BYW4H3wT1HpD56ezCUhxL5O3ACPjufw7yvMheHRnQxe3txmlNq +rd9swZH0sdZovWmW2Fj+C5qYbP/6hLzii9SNWOzOnKxlbw8CMBzK7KZgWp4qi4sz +tFCkGh2+Ya2QV3+q9Z6fD3hTZJfELEbDgP7ULYrvlGzLrrfLFcqAwmQ360PlsWiL +bg0+/rWkBRz/3wpma2RP+dGFfaj7AgMBAAGjUzBRMB0GA1UdDgQWBBQgrc2badrl +MMQwryH/BEzKQrS8czAfBgNVHSMEGDAWgBQgrc2badrlMMQwryH/BEzKQrS8czAP +BgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQAE6ct4k+X4hAw+TUpN +E/rVrEybHFv6qvgyE7Ay/LhpevU+r8UBChv3XZ/u3h4WKdqFrxPN4JDOvIXN0Jq2 +Xs7Bs//hj+hULvJ3DWfQEQ6LivcxVxQsU47Sbxf6sUeCV3kXSxjFEcsvSx9kPNv6 +3Bi1EwPrMiwNdpB1BDUG7Z2mFxhoHv1SUppE7Lhu/x/1b7LgYqNy2VWWOFg/TZI2 +tdg45fMtNYp8kdQW+r18YxToQHUjXkkQqW9HSyxIqeabVqxuuptyY+OSIIFxBWaB +A4BbiHPKhJ0umCQa9mPeVKWUjUeXzRnHMXw2nPyqfK+1wQXt/7DZrBQLVe5Z9IHG +DZj/ +-----END CERTIFICATE-----