This commit is contained in:
bui 2021-04-30 17:36:42 +02:00
parent 3b084ee8b8
commit 6e36f12d64
2 changed files with 23 additions and 29 deletions

View file

@ -72,15 +72,14 @@ cat mode will return once source has been exhausted.
// The interface each datasource must implement
type DataSource interface {
GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module
Configure([]byte, *log.Entry) error // Configure the datasource
ConfigureByDSN(string, string, *log.Entry) error // Configure the datasource
GetMode() string // Get the mode (TAIL, CAT or SERVER)
SupportedModes() []string // TO REMOVE : Returns the mode supported by the datasource
SupportedDSN() []string // Returns the list of supported URI schemes (file:// s3:// ...)
OneShotAcquisition(chan types.Event, *tomb.Tomb) error // Start one shot acquisition(eg, cat a file)
LiveAcquisition(chan types.Event, *tomb.Tomb) error // Start live acquisition (eg, tail a file)
CanRun() error // Whether the datasource can run or not (eg, journalctl on BSD is a non-sense)
GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module
Configure([]byte, *log.Entry) error // Configure the datasource
ConfigureByDSN(string, string, *log.Entry) error // Configure the datasource
GetMode() string // Get the mode (TAIL, CAT or SERVER)
GetName() string
OneShotAcquisition(chan types.Event, *tomb.Tomb) error // Start one shot acquisition(eg, cat a file)
StreamingAcquisition(chan types.Event, *tomb.Tomb) error // Start live acquisition (eg, tail a file)
CanRun() error // Whether the datasource can run or not (eg, journalctl on BSD is a non-sense)
Dump() interface{}
}
@ -237,7 +236,7 @@ func StartAcquisition(sources []DataSource, output chan types.Event, AcquisTomb
defer types.CatchPanic("crowdsec/acquis")
var err error
if subsrc.GetMode() == configuration.TAIL_MODE {
err = subsrc.LiveAcquisition(output, AcquisTomb)
err = subsrc.StreamingAcquisition(output, AcquisTomb)
} else {
err = subsrc.OneShotAcquisition(output, AcquisTomb)
}

View file

@ -39,27 +39,24 @@ func (f *MockSource) Configure(cfg []byte, logger *log.Entry) error {
}
return nil
}
func (f *MockSource) GetMode() string { return f.Mode }
func (f *MockSource) SupportedModes() []string { return []string{"tail", "cat"} }
func (f *MockSource) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSource) LiveAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSource) CanRun() error { return nil }
func (f *MockSource) GetMetrics() []prometheus.Collector { return nil }
func (f *MockSource) Dump() interface{} { return f }
func (f *MockSource) SupportedDSN() []string { return []string{"mock://"} }
func (f *MockSource) GetMode() string { return f.Mode }
func (f *MockSource) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSource) StreamingAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSource) CanRun() error { return nil }
func (f *MockSource) GetMetrics() []prometheus.Collector { return nil }
func (f *MockSource) Dump() interface{} { return f }
func (f *MockSource) GetName() string { return "mock" }
func (f *MockSource) ConfigureByDSN(string, string, *log.Entry) error {
return fmt.Errorf("not supported")
}
//func (f *MockSource) New() DataSource { return &MockSource{} }
//copy the mocksource, but this one can't run
type MockSourceCantRun struct {
MockSource
}
func (f *MockSourceCantRun) CanRun() error { return fmt.Errorf("can't run bro") }
func (f *MockSourceCantRun) New() DataSource { return &MockSourceCantRun{} }
func (f *MockSourceCantRun) GetName() string { return "mock_cant_run" }
//appendMockSource is only used to add mock source for tests
func appendMockSource() {
@ -327,8 +324,8 @@ func (f *MockCat) Configure(cfg []byte, logger *log.Entry) error {
}
return nil
}
func (f *MockCat) GetMode() string { return "cat" }
func (f *MockCat) SupportedModes() []string { return []string{"cat"} }
func (f *MockCat) GetName() string { return "mock_cat" }
func (f *MockCat) GetMode() string { return "cat" }
func (f *MockCat) OneShotAcquisition(out chan types.Event, tomb *tomb.Tomb) error {
for i := 0; i < 10; i++ {
evt := types.Event{}
@ -337,13 +334,12 @@ func (f *MockCat) OneShotAcquisition(out chan types.Event, tomb *tomb.Tomb) erro
}
return nil
}
func (f *MockCat) LiveAcquisition(chan types.Event, *tomb.Tomb) error {
func (f *MockCat) StreamingAcquisition(chan types.Event, *tomb.Tomb) error {
return fmt.Errorf("can't run in tail")
}
func (f *MockCat) CanRun() error { return nil }
func (f *MockCat) GetMetrics() []prometheus.Collector { return nil }
func (f *MockCat) Dump() interface{} { return f }
func (f *MockCat) SupportedDSN() []string { return []string{"mock://"} }
func (f *MockCat) ConfigureByDSN(string, string, *log.Entry) error { return fmt.Errorf("not supported") }
//----
@ -363,12 +359,12 @@ func (f *MockTail) Configure(cfg []byte, logger *log.Entry) error {
}
return nil
}
func (f *MockTail) GetMode() string { return "tail" }
func (f *MockTail) SupportedModes() []string { return []string{"tail"} }
func (f *MockTail) GetName() string { return "mock_tail" }
func (f *MockTail) GetMode() string { return "tail" }
func (f *MockTail) OneShotAcquisition(out chan types.Event, tomb *tomb.Tomb) error {
return fmt.Errorf("can't run in cat mode")
}
func (f *MockTail) LiveAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (f *MockTail) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
for i := 0; i < 10; i++ {
evt := types.Event{}
evt.Line.Src = "test"
@ -382,7 +378,6 @@ func (f *MockTail) LiveAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (f *MockTail) CanRun() error { return nil }
func (f *MockTail) GetMetrics() []prometheus.Collector { return nil }
func (f *MockTail) Dump() interface{} { return f }
func (f *MockTail) SupportedDSN() []string { return []string{"mock://"} }
func (f *MockTail) ConfigureByDSN(string, string, *log.Entry) error {
return fmt.Errorf("not supported")
}