lp metrics: collect datasources and console options (#2870)

This commit is contained in:
mmetc 2024-03-05 14:56:14 +01:00 committed by GitHub
parent e7ecea764e
commit d8877a71fc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 67 additions and 30 deletions

View file

@ -23,39 +23,42 @@ import (
"github.com/crowdsecurity/crowdsec/pkg/types"
)
func initCrowdsec(cConfig *csconfig.Config, hub *cwhub.Hub) (*parser.Parsers, error) {
// initCrowdsec prepares the log processor service
func initCrowdsec(cConfig *csconfig.Config, hub *cwhub.Hub) (*parser.Parsers, []acquisition.DataSource, error) {
var err error
if err = alertcontext.LoadConsoleContext(cConfig, hub); err != nil {
return nil, fmt.Errorf("while loading context: %w", err)
return nil, nil, fmt.Errorf("while loading context: %w", err)
}
// Start loading configs
csParsers := parser.NewParsers(hub)
if csParsers, err = parser.LoadParsers(cConfig, csParsers); err != nil {
return nil, fmt.Errorf("while loading parsers: %w", err)
return nil, nil, fmt.Errorf("while loading parsers: %w", err)
}
if err := LoadBuckets(cConfig, hub); err != nil {
return nil, fmt.Errorf("while loading scenarios: %w", err)
return nil, nil, fmt.Errorf("while loading scenarios: %w", err)
}
if err := appsec.LoadAppsecRules(hub); err != nil {
return nil, fmt.Errorf("while loading appsec rules: %w", err)
return nil, nil, fmt.Errorf("while loading appsec rules: %w", err)
}
if err := LoadAcquisition(cConfig); err != nil {
return nil, fmt.Errorf("while loading acquisition config: %w", err)
datasources, err := LoadAcquisition(cConfig)
if err != nil {
return nil, nil, fmt.Errorf("while loading acquisition config: %w", err)
}
return csParsers, nil
return csParsers, datasources, nil
}
func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub) error {
// runCrowdsec starts the log processor service
func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisition.DataSource) error {
inputEventChan = make(chan types.Event)
inputLineChan = make(chan types.Event)
//start go-routines for parsing, buckets pour and outputs.
// start go-routines for parsing, buckets pour and outputs.
parserWg := &sync.WaitGroup{}
parsersTomb.Go(func() error {
@ -65,7 +68,8 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
parsersTomb.Go(func() error {
defer trace.CatchPanic("crowdsec/runParse")
if err := runParse(inputLineChan, inputEventChan, *parsers.Ctx, parsers.Nodes); err != nil { //this error will never happen as parser.Parse is not able to return errors
if err := runParse(inputLineChan, inputEventChan, *parsers.Ctx, parsers.Nodes); err != nil {
// this error will never happen as parser.Parse is not able to return errors
log.Fatalf("starting parse error : %s", err)
return err
}
@ -161,7 +165,8 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
return nil
}
func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, agentReady chan bool) {
// serveCrowdsec wraps the log processor service
func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, datasources []acquisition.DataSource, agentReady chan bool) {
crowdsecTomb.Go(func() error {
defer trace.CatchPanic("crowdsec/serveCrowdsec")
@ -171,7 +176,7 @@ func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub
log.Debugf("running agent after %s ms", time.Since(crowdsecT0))
agentReady <- true
if err := runCrowdsec(cConfig, parsers, hub); err != nil {
if err := runCrowdsec(cConfig, parsers, hub, datasources); err != nil {
log.Fatalf("unable to start crowdsec routines: %s", err)
}
}()

View file

@ -1,6 +1,7 @@
package main
import (
"errors"
"flag"
"fmt"
_ "net/http/pprof"
@ -10,7 +11,6 @@ import (
"strings"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
@ -95,7 +95,7 @@ func LoadBuckets(cConfig *csconfig.Config, hub *cwhub.Hub) error {
holders, outputEventChan, err = leakybucket.LoadBuckets(cConfig.Crowdsec, hub, files, &bucketsTomb, buckets, flags.OrderEvent)
if err != nil {
return fmt.Errorf("scenario loading failed: %v", err)
return fmt.Errorf("scenario loading failed: %w", err)
}
if cConfig.Prometheus != nil && cConfig.Prometheus.Enabled {
@ -107,7 +107,7 @@ func LoadBuckets(cConfig *csconfig.Config, hub *cwhub.Hub) error {
return nil
}
func LoadAcquisition(cConfig *csconfig.Config) error {
func LoadAcquisition(cConfig *csconfig.Config) ([]acquisition.DataSource, error) {
var err error
if flags.SingleFileType != "" && flags.OneShotDSN != "" {
@ -116,20 +116,20 @@ func LoadAcquisition(cConfig *csconfig.Config) error {
dataSources, err = acquisition.LoadAcquisitionFromDSN(flags.OneShotDSN, flags.Labels, flags.Transform)
if err != nil {
return errors.Wrapf(err, "failed to configure datasource for %s", flags.OneShotDSN)
return nil, fmt.Errorf("failed to configure datasource for %s: %w", flags.OneShotDSN, err)
}
} else {
dataSources, err = acquisition.LoadAcquisitionFromFile(cConfig.Crowdsec)
if err != nil {
return err
return nil, err
}
}
if len(dataSources) == 0 {
return fmt.Errorf("no datasource enabled")
return nil, errors.New("no datasource enabled")
}
return nil
return dataSources, nil
}
var (
@ -272,7 +272,7 @@ func LoadConfig(configFile string, disableAgent bool, disableAPI bool, quiet boo
}
if cConfig.DisableAPI && cConfig.DisableAgent {
return nil, errors.New("You must run at least the API Server or crowdsec")
return nil, errors.New("you must run at least the API Server or crowdsec")
}
if flags.OneShotDSN != "" && flags.SingleFileType == "" {
@ -360,11 +360,14 @@ func main() {
if err != nil {
log.Fatalf("could not create CPU profile: %s", err)
}
log.Infof("CPU profile will be written to %s", flags.CpuProfile)
if err := pprof.StartCPUProfile(f); err != nil {
f.Close()
log.Fatalf("could not start CPU profile: %s", err)
}
defer f.Close()
defer pprof.StopCPUProfile()
}

View file

@ -86,7 +86,7 @@ func reloadHandler(sig os.Signal) (*csconfig.Config, error) {
return nil, fmt.Errorf("while loading hub index: %w", err)
}
csParsers, err := initCrowdsec(cConfig, hub)
csParsers, datasources, err := initCrowdsec(cConfig, hub)
if err != nil {
return nil, fmt.Errorf("unable to init crowdsec: %w", err)
}
@ -103,7 +103,7 @@ func reloadHandler(sig os.Signal) (*csconfig.Config, error) {
}
agentReady := make(chan bool, 1)
serveCrowdsec(csParsers, cConfig, hub, agentReady)
serveCrowdsec(csParsers, cConfig, hub, datasources, agentReady)
}
log.Printf("Reload is finished")
@ -230,7 +230,7 @@ func drainChan(c chan types.Event) {
for {
select {
case _, ok := <-c:
if !ok { //closed
if !ok { // closed
return
}
default:
@ -256,8 +256,8 @@ func HandleSignals(cConfig *csconfig.Config) error {
exitChan := make(chan error)
//Always try to stop CPU profiling to avoid passing flags around
//It's a noop if profiling is not enabled
// Always try to stop CPU profiling to avoid passing flags around
// It's a noop if profiling is not enabled
defer pprof.StopCPUProfile()
go func() {
@ -369,14 +369,14 @@ func Serve(cConfig *csconfig.Config, agentReady chan bool) error {
return fmt.Errorf("while loading hub index: %w", err)
}
csParsers, err := initCrowdsec(cConfig, hub)
csParsers, datasources, err := initCrowdsec(cConfig, hub)
if err != nil {
return fmt.Errorf("crowdsec init: %w", err)
}
// if it's just linting, we're done
if !flags.TestMode {
serveCrowdsec(csParsers, cConfig, hub, agentReady)
serveCrowdsec(csParsers, cConfig, hub, datasources, agentReady)
} else {
agentReady <- true
}

View file

@ -37,6 +37,35 @@ type ConsoleConfig struct {
ShareContext *bool `yaml:"share_context"`
}
func (c *ConsoleConfig) EnabledOptions() []string {
ret := []string{}
if c == nil {
return ret
}
if c.ShareCustomScenarios != nil && *c.ShareCustomScenarios {
ret = append(ret, SEND_CUSTOM_SCENARIOS)
}
if c.ShareTaintedScenarios != nil && *c.ShareTaintedScenarios {
ret = append(ret, SEND_TAINTED_SCENARIOS)
}
if c.ShareManualDecisions != nil && *c.ShareManualDecisions {
ret = append(ret, SEND_MANUAL_SCENARIOS)
}
if c.ConsoleManagement != nil && *c.ConsoleManagement {
ret = append(ret, CONSOLE_MANAGEMENT)
}
if c.ShareContext != nil && *c.ShareContext {
ret = append(ret, SEND_CONTEXT)
}
return ret
}
func (c *ConsoleConfig) IsPAPIEnabled() bool {
if c == nil || c.ConsoleManagement == nil {
return false

View file

@ -38,7 +38,7 @@ teardown() {
@test "crowdsec (no api and no agent)" {
rune -0 wait-for \
--err "You must run at least the API Server or crowdsec" \
--err "you must run at least the API Server or crowdsec" \
"${CROWDSEC}" -no-api -no-cs
}

View file

@ -28,7 +28,7 @@ teardown() {
@test "lapi (.api.server.enable=false)" {
rune -0 config_set '.api.server.enable=false'
rune -1 "${CROWDSEC}" -no-cs
assert_stderr --partial "You must run at least the API Server or crowdsec"
assert_stderr --partial "you must run at least the API Server or crowdsec"
}
@test "lapi (no .api.server.listen_uri)" {