crowdsec/pkg/acquisition/file_reader_test.go
Thibault "bui" Koechlin 742435f178
Acquisition extra tests (#188)
* acquisition testing
2020-08-20 13:55:52 +02:00

316 lines
6 KiB
Go

package acquisition
import (
"fmt"
"os"
"testing"
"time"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
"github.com/crowdsecurity/crowdsec/pkg/types"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"gopkg.in/tomb.v2"
)
func TestLoadAcquisitionConfig(t *testing.T) {
testFilePath := "./tests/test.log"
tests := []struct {
csConfig *csconfig.CrowdSec
result *FileAcquisCtx
err string
}{
{
csConfig: &csconfig.CrowdSec{
SingleFile: testFilePath,
SingleFileLabel: "my_test_log",
Profiling: false,
},
result: &FileAcquisCtx{
Files: []FileCtx{
{
Type: "file",
Mode: "cat",
Filename: testFilePath,
Filenames: []string{},
Labels: map[string]string{
"type": "my_test_log",
},
Profiling: false,
},
},
Profiling: false,
},
err: "",
},
{
csConfig: &csconfig.CrowdSec{
SingleFile: testFilePath,
SingleFileLabel: "my_test_log",
Profiling: true,
},
result: &FileAcquisCtx{
Files: []FileCtx{
{
Type: "file",
Mode: "cat",
Filename: testFilePath,
Filenames: []string{},
Labels: map[string]string{
"type": "my_test_log",
},
Profiling: false,
},
},
Profiling: true,
},
err: "",
},
}
for _, test := range tests {
result, err := LoadAcquisitionConfig(test.csConfig)
assert.Equal(t, test.result, result)
if test.err == "" && err == nil {
continue
}
assert.EqualError(t, err, test.err)
}
}
func TestAcquisStartReadingTailKilled(t *testing.T) {
acquisFilePath := "./tests/acquis_test_log.yaml"
csConfig := &csconfig.CrowdSec{
AcquisitionFile: acquisFilePath,
Profiling: false,
}
fCTX, err := LoadAcquisitionConfig(csConfig)
if err != nil {
t.Fatalf(err.Error())
}
outputChan := make(chan types.Event)
acquisTomb := tomb.Tomb{}
AcquisStartReading(fCTX, outputChan, &acquisTomb)
if !acquisTomb.Alive() {
t.Fatal("acquisition tomb is not alive")
}
time.Sleep(500 * time.Millisecond)
filename := "./tests/test.log"
f, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
t.Fatal(err)
}
for i := 0; i < 5; i++ {
_, err := f.WriteString(fmt.Sprintf("ratata%d\n", i))
if err != nil {
t.Fatal(err)
}
}
f.Close()
time.Sleep(500 * time.Millisecond)
reads := 0
L:
for {
select {
case <-outputChan:
reads++
if reads == 2 {
acquisTomb.Kill(nil)
time.Sleep(100 * time.Millisecond)
}
case <-time.After(1 * time.Second):
break L
}
}
log.Printf("-> %d", reads)
if reads != 2 {
t.Fatal()
}
f, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
t.Fatal(err)
}
_, err = f.WriteString("one log line\n")
if err != nil {
t.Fatal(err)
}
f.Close()
}
func TestAcquisStartReadingTail(t *testing.T) {
acquisFilePath := "./tests/acquis_test_log.yaml"
filename := "./tests/test.log"
csConfig := &csconfig.CrowdSec{
AcquisitionFile: acquisFilePath,
Profiling: false,
}
fCTX, err := LoadAcquisitionConfig(csConfig)
if err != nil {
t.Fatalf(err.Error())
}
outputChan := make(chan types.Event)
acquisTomb := tomb.Tomb{}
AcquisStartReading(fCTX, outputChan, &acquisTomb)
if !acquisTomb.Alive() {
t.Fatal("acquisition tomb is not alive")
}
time.Sleep(500 * time.Millisecond)
f, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
t.Fatal(err)
}
for i := 0; i < 5; i++ {
_, err := f.WriteString(fmt.Sprintf("ratata%d\n", i))
if err != nil {
t.Fatal(err)
}
}
f.Close()
time.Sleep(500 * time.Millisecond)
reads := 0
L:
for {
select {
case <-outputChan:
reads++
//log.Printf("evt %+v", evt)
case <-time.After(1 * time.Second):
break L
}
}
log.Printf("-> %d", reads)
if reads != 5 {
t.Fatal()
}
acquisTomb.Kill(nil)
if err := acquisTomb.Wait(); err != nil {
t.Fatalf("Acquisition returned error : %s", err)
}
f, err = os.OpenFile(filename, os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
t.Fatal(err)
}
_, err = f.WriteString("one log line\n")
if err != nil {
t.Fatal(err)
}
f.Close()
}
func TestAcquisStartReadingCat(t *testing.T) {
testFilePath := "./tests/test.log"
f, err := os.OpenFile(testFilePath, os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
t.Fatal(err)
}
for i := 0; i < 5; i++ {
_, err := f.WriteString(fmt.Sprintf("ratata%d\n", i))
if err != nil {
t.Fatal(err)
}
}
f.Close()
csConfig := &csconfig.CrowdSec{
SingleFile: testFilePath,
SingleFileLabel: "my_test_log",
Profiling: false,
}
fCTX, err := LoadAcquisitionConfig(csConfig)
if err != nil {
t.Fatalf(err.Error())
}
outputChan := make(chan types.Event)
acquisTomb := tomb.Tomb{}
AcquisStartReading(fCTX, outputChan, &acquisTomb)
if !acquisTomb.Alive() {
t.Fatal("acquisition tomb is not alive")
}
time.Sleep(500 * time.Millisecond)
reads := 0
L:
for {
select {
case <-outputChan:
reads++
case <-time.After(1 * time.Second):
break L
}
}
log.Printf("-> %d", reads)
if reads != 5 {
t.Fatal()
}
acquisTomb.Kill(nil)
if err := acquisTomb.Wait(); err != nil {
t.Fatalf("Acquisition returned error : %s", err)
}
f, err = os.OpenFile(testFilePath, os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
t.Fatal(err)
}
_, err = f.WriteString("one log line\n")
if err != nil {
t.Fatal(err)
}
f.Close()
}
func TestAcquisStartReadingGzCat(t *testing.T) {
testFilePath := "./tests/test.log.gz"
csConfig := &csconfig.CrowdSec{
SingleFile: testFilePath,
SingleFileLabel: "my_test_log",
Profiling: false,
}
fCTX, err := LoadAcquisitionConfig(csConfig)
if err != nil {
t.Fatalf(err.Error())
}
outputChan := make(chan types.Event)
acquisTomb := tomb.Tomb{}
AcquisStartReading(fCTX, outputChan, &acquisTomb)
if !acquisTomb.Alive() {
t.Fatal("acquisition tomb is not alive")
}
time.Sleep(500 * time.Millisecond)
reads := 0
L:
for {
select {
case <-outputChan:
reads++
case <-time.After(1 * time.Second):
break L
}
}
log.Printf("-> %d", reads)
if reads != 1 {
t.Fatal()
}
}