crowdsec/pkg/acquisition/modules/kafka/kafka.go

291 lines
7.6 KiB
Go

package kafkaacquisition
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"os"
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/segmentio/kafka-go"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
"gopkg.in/yaml.v2"
"github.com/crowdsecurity/go-cs-lib/trace"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
var (
dataSourceName = "kafka"
)
var linesRead = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "cs_kafkasource_hits_total",
Help: "Total lines that were read from topic",
},
[]string{"topic"})
type KafkaConfiguration struct {
Brokers []string `yaml:"brokers"`
Topic string `yaml:"topic"`
GroupID string `yaml:"group_id"`
Partition int `yaml:"partition"`
Timeout string `yaml:"timeout"`
TLS *TLSConfig `yaml:"tls"`
configuration.DataSourceCommonCfg `yaml:",inline"`
}
type TLSConfig struct {
InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
ClientCert string `yaml:"client_cert"`
ClientKey string `yaml:"client_key"`
CaCert string `yaml:"ca_cert"`
}
type KafkaSource struct {
Config KafkaConfiguration
logger *log.Entry
Reader *kafka.Reader
}
func (k *KafkaSource) GetUuid() string {
return k.Config.UniqueId
}
func (k *KafkaSource) UnmarshalConfig(yamlConfig []byte) error {
k.Config = KafkaConfiguration{}
err := yaml.UnmarshalStrict(yamlConfig, &k.Config)
if err != nil {
return fmt.Errorf("cannot parse %s datasource configuration: %w", dataSourceName, err)
}
if len(k.Config.Brokers) == 0 {
return fmt.Errorf("cannot create a %s reader with an empty list of broker addresses", dataSourceName)
}
if k.Config.Topic == "" {
return fmt.Errorf("cannot create a %s reader with am empty topic", dataSourceName)
}
if k.Config.Mode == "" {
k.Config.Mode = configuration.TAIL_MODE
}
k.logger.Debugf("successfully unmarshaled kafka configuration : %+v", k.Config)
return err
}
func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry) error {
k.logger = logger
k.logger.Debugf("start configuring %s source", dataSourceName)
err := k.UnmarshalConfig(yamlConfig)
if err != nil {
return err
}
dialer, err := k.Config.NewDialer()
if err != nil {
return fmt.Errorf("cannot create %s dialer: %w", dataSourceName, err)
}
k.Reader, err = k.Config.NewReader(dialer, k.logger)
if err != nil {
return fmt.Errorf("cannote create %s reader: %w", dataSourceName, err)
}
if k.Reader == nil {
return fmt.Errorf("cannot create %s reader", dataSourceName)
}
k.logger.Debugf("successfully configured %s source", dataSourceName)
return nil
}
func (k *KafkaSource) ConfigureByDSN(string, map[string]string, *log.Entry, string) error {
return fmt.Errorf("%s datasource does not support command-line acquisition", dataSourceName)
}
func (k *KafkaSource) GetMode() string {
return k.Config.Mode
}
func (k *KafkaSource) GetName() string {
return dataSourceName
}
func (k *KafkaSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
return fmt.Errorf("%s datasource does not support one-shot acquisition", dataSourceName)
}
func (k *KafkaSource) CanRun() error {
return nil
}
func (k *KafkaSource) GetMetrics() []prometheus.Collector {
return []prometheus.Collector{linesRead}
}
func (k *KafkaSource) GetAggregMetrics() []prometheus.Collector {
return []prometheus.Collector{linesRead}
}
func (k *KafkaSource) Dump() interface{} {
return k
}
func (k *KafkaSource) ReadMessage(out chan types.Event) error {
// Start processing from latest Offset
k.Reader.SetOffsetAt(context.Background(), time.Now())
for {
k.logger.Tracef("reading message from topic '%s'", k.Config.Topic)
m, err := k.Reader.ReadMessage(context.Background())
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
k.logger.Errorln(fmt.Errorf("while reading %s message: %w", dataSourceName, err))
continue
}
k.logger.Tracef("got message: %s", string(m.Value))
l := types.Line{
Raw: string(m.Value),
Labels: k.Config.Labels,
Time: m.Time.UTC(),
Src: k.Config.Topic,
Process: true,
Module: k.GetName(),
}
k.logger.Tracef("line with message read from topic '%s': %+v", k.Config.Topic, l)
linesRead.With(prometheus.Labels{"topic": k.Config.Topic}).Inc()
var evt types.Event
if !k.Config.UseTimeMachine {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
} else {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
}
out <- evt
}
}
func (k *KafkaSource) RunReader(out chan types.Event, t *tomb.Tomb) error {
k.logger.Debugf("starting %s datasource reader goroutine with configuration %+v", dataSourceName, k.Config)
t.Go(func() error {
return k.ReadMessage(out)
})
//nolint //fp
for {
select {
case <-t.Dying():
k.logger.Infof("%s datasource topic %s stopping", dataSourceName, k.Config.Topic)
if err := k.Reader.Close(); err != nil {
return fmt.Errorf("while closing %s reader on topic '%s': %w", dataSourceName, k.Config.Topic, err)
}
return nil
}
}
}
func (k *KafkaSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
k.logger.Infof("start reader on brokers '%+v' with topic '%s'", k.Config.Brokers, k.Config.Topic)
t.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis/kafka/live")
return k.RunReader(out, t)
})
return nil
}
func (kc *KafkaConfiguration) NewTLSConfig() (*tls.Config, error) {
tlsConfig := tls.Config{
InsecureSkipVerify: kc.TLS.InsecureSkipVerify,
}
cert, err := tls.LoadX509KeyPair(kc.TLS.ClientCert, kc.TLS.ClientKey)
if err != nil {
return &tlsConfig, err
}
tlsConfig.Certificates = []tls.Certificate{cert}
caCert, err := os.ReadFile(kc.TLS.CaCert)
if err != nil {
return &tlsConfig, err
}
caCertPool, err := x509.SystemCertPool()
if err != nil {
return &tlsConfig, fmt.Errorf("unable to load system CA certificates: %w", err)
}
if caCertPool == nil {
caCertPool = x509.NewCertPool()
}
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caCertPool
return &tlsConfig, err
}
func (kc *KafkaConfiguration) NewDialer() (*kafka.Dialer, error) {
dialer := &kafka.Dialer{}
var timeoutDuration time.Duration
timeoutDuration = time.Duration(10) * time.Second
if kc.Timeout != "" {
intTimeout, err := strconv.Atoi(kc.Timeout)
if err != nil {
return dialer, err
}
timeoutDuration = time.Duration(intTimeout) * time.Second
}
dialer = &kafka.Dialer{
Timeout: timeoutDuration,
DualStack: true,
}
if kc.TLS != nil {
tlsConfig, err := kc.NewTLSConfig()
if err != nil {
return dialer, err
}
dialer.TLS = tlsConfig
}
return dialer, nil
}
func (kc *KafkaConfiguration) NewReader(dialer *kafka.Dialer, logger *log.Entry) (*kafka.Reader, error) {
rConf := kafka.ReaderConfig{
Brokers: kc.Brokers,
Topic: kc.Topic,
Dialer: dialer,
Logger: kafka.LoggerFunc(logger.Debugf),
ErrorLogger: kafka.LoggerFunc(logger.Errorf),
}
if kc.GroupID != "" && kc.Partition != 0 {
return &kafka.Reader{}, fmt.Errorf("cannot specify both group_id and partition")
}
if kc.GroupID != "" {
rConf.GroupID = kc.GroupID
} else if kc.Partition != 0 {
rConf.Partition = kc.Partition
} else {
logger.Warnf("no group_id specified, crowdsec will only read from the 1st partition of the topic")
}
if err := rConf.Validate(); err != nil {
return &kafka.Reader{}, fmt.Errorf("while validating reader configuration: %w", err)
}
return kafka.NewReader(rConf), nil
}