From 64c5fa7360ff84e9c4a5c72540eb7495b4734d77 Mon Sep 17 00:00:00 2001
From: "Thibault \"bui\" Koechlin"
+
diff --git a/cmd/crowdsec/acquisition.go b/cmd/crowdsec/acquisition.go deleted file mode 100644 index 246cf3945..000000000 --- a/cmd/crowdsec/acquisition.go +++ /dev/null @@ -1,34 +0,0 @@ -package main - -import ( - "fmt" - - "github.com/crowdsecurity/crowdsec/pkg/acquisition" -) - -func loadAcquisition() (*acquisition.FileAcquisCtx, error) { - var acquisitionCTX *acquisition.FileAcquisCtx - var err error - /*Init the acqusition : from cli or from acquis.yaml file*/ - if cConfig.SingleFile != "" { - var input acquisition.FileCtx - input.Filename = cConfig.SingleFile - input.Mode = acquisition.CATMODE - input.Labels = make(map[string]string) - input.Labels["type"] = cConfig.SingleFileLabel - acquisitionCTX, err = acquisition.InitReaderFromFileCtx([]acquisition.FileCtx{input}) - } else { /* Init file reader if we tail */ - acquisitionCTX, err = acquisition.InitReader(cConfig.AcquisitionFile) - } - if err != nil { - return nil, fmt.Errorf("unable to start file acquisition, bailout %v", err) - } - if acquisitionCTX == nil { - return nil, fmt.Errorf("no inputs to process") - } - if cConfig.Profiling { - acquisitionCTX.Profiling = true - } - - return acquisitionCTX, nil -} diff --git a/cmd/crowdsec/main.go b/cmd/crowdsec/main.go index cc9bbf85d..d3350db6d 100644 --- a/cmd/crowdsec/main.go +++ b/cmd/crowdsec/main.go @@ -294,7 +294,7 @@ func main() { log.Warningf("Starting processing data") //Init the acqusition : from cli or from acquis.yaml file - acquisitionCTX, err = loadAcquisition() + acquisitionCTX, err = acquisition.LoadAcquisitionConfig(cConfig) if err != nil { log.Fatalf("Failed to start acquisition : %s", err) } diff --git a/pkg/acquisition/file_reader.go b/pkg/acquisition/file_reader.go index 116ab2fab..ecb41711f 100644 --- a/pkg/acquisition/file_reader.go +++ b/pkg/acquisition/file_reader.go @@ -10,6 +10,7 @@ import ( "os" "strings" + "github.com/crowdsecurity/crowdsec/pkg/csconfig" leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" "github.com/crowdsecurity/crowdsec/pkg/types" @@ -66,6 +67,33 @@ var ReaderHits = prometheus.NewCounterVec( []string{"source"}, ) +func LoadAcquisitionConfig(cConfig *csconfig.CrowdSec) (*FileAcquisCtx, error) { + var acquisitionCTX *FileAcquisCtx + var err error + /*Init the acqusition : from cli or from acquis.yaml file*/ + if cConfig.SingleFile != "" { + var input FileCtx + input.Filename = cConfig.SingleFile + input.Mode = CATMODE + input.Labels = make(map[string]string) + input.Labels["type"] = cConfig.SingleFileLabel + acquisitionCTX, err = InitReaderFromFileCtx([]FileCtx{input}) + } else { /* Init file reader if we tail */ + acquisitionCTX, err = InitReader(cConfig.AcquisitionFile) + } + if err != nil { + return nil, fmt.Errorf("unable to start file acquisition, bailout %v", err) + } + if acquisitionCTX == nil { + return nil, fmt.Errorf("no inputs to process") + } + if cConfig.Profiling { + acquisitionCTX.Profiling = true + } + + return acquisitionCTX, nil +} + func InitReader(cfg string) (*FileAcquisCtx, error) { var files []FileCtx diff --git a/pkg/parser/runtime.go b/pkg/parser/runtime.go index b255f21c0..385b23c93 100644 --- a/pkg/parser/runtime.go +++ b/pkg/parser/runtime.go @@ -227,6 +227,9 @@ func stageidx(stage string, stages []string) int { return -1 } +var ParseDump bool +var StageParseCache map[string]map[string]types.Event + func /*(u types.UnixParser)*/ Parse(ctx UnixParserCtx, xp types.Event, nodes []Node) (types.Event, error) { var event types.Event = xp @@ -250,7 +253,14 @@ func /*(u types.UnixParser)*/ Parse(ctx UnixParserCtx, xp types.Event, nodes []N log.Tracef("INPUT '%s'", event.Line.Raw) } + if ParseDump { + StageParseCache = make(map[string]map[string]types.Event) + } + for _, stage := range ctx.Stages { + if ParseDump { + StageParseCache[stage] = make(map[string]types.Event) + } /* if the node is forward in stages, seek to its stage */ /* this is for example used by testing system to inject logs in post-syslog-parsing phase*/ if stageidx(event.Stage, ctx.Stages) > stageidx(stage, ctx.Stages) { @@ -267,14 +277,14 @@ func /*(u types.UnixParser)*/ Parse(ctx UnixParserCtx, xp types.Event, nodes []N isStageOK := false for idx, node := range nodes { - clog := log.WithFields(log.Fields{ - "node-name": node.rn, - "stage": event.Stage, - }) //Only process current stage's nodes if event.Stage != node.Stage { continue } + clog := log.WithFields(log.Fields{ + "node-name": node.rn, + "stage": event.Stage, + }) clog.Tracef("Processing node %d/%d -> %s", idx, len(nodes), node.rn) if ctx.Profiling { node.Profiling = true @@ -286,6 +296,13 @@ func /*(u types.UnixParser)*/ Parse(ctx UnixParserCtx, xp types.Event, nodes []N clog.Tracef("node (%s) ret : %v", node.rn, ret) if ret { isStageOK = true + if ParseDump { + evtcopy := types.Event{} + if err := types.Clone(&event, &evtcopy); err != nil { + log.Fatalf("while cloning Event in parser : %s", err) + } + StageParseCache[stage][node.Name] = evtcopy + } } if ret && node.OnSuccess == "next_stage" { clog.Debugf("node successful, stop end stage %s", stage) diff --git a/pkg/types/utils.go b/pkg/types/utils.go index a5f8fded9..f7f119cfa 100644 --- a/pkg/types/utils.go +++ b/pkg/types/utils.go @@ -1,7 +1,9 @@ package types import ( + "bytes" "encoding/binary" + "encoding/gob" "fmt" "io" "net" @@ -93,3 +95,17 @@ func ConfigureLogger(clog *log.Logger) error { clog.SetLevel(logLevel) return nil } + +func Clone(a, b interface{}) error { + + buff := new(bytes.Buffer) + enc := gob.NewEncoder(buff) + dec := gob.NewDecoder(buff) + if err := enc.Encode(a); err != nil { + return fmt.Errorf("failed cloning %T", a) + } + if err := dec.Decode(b); err != nil { + return fmt.Errorf("failed cloning %T", b) + } + return nil +}