Fix docker flaky test (#1494)

This commit is contained in:
blotus 2022-04-29 12:16:49 +02:00 committed by GitHub
parent ddfe95e45d
commit 392708a804
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 9 deletions

View file

@ -462,10 +462,10 @@ func ReadTailScanner(scanner *bufio.Scanner, out chan string, t *tomb.Tomb) erro
for scanner.Scan() { for scanner.Scan() {
out <- scanner.Text() out <- scanner.Text()
} }
return nil return scanner.Err()
} }
func (d *DockerSource) TailDocker(container *ContainerConfig, outChan chan types.Event) error { func (d *DockerSource) TailDocker(container *ContainerConfig, outChan chan types.Event, deleteChan chan *ContainerConfig) error {
container.logger.Infof("start tail for container %s", container.Name) container.logger.Infof("start tail for container %s", container.Name)
dockerReader, err := d.Client.ContainerLogs(context.Background(), container.ID, *d.containerLogsOptions) dockerReader, err := d.Client.ContainerLogs(context.Background(), container.ID, *d.containerLogsOptions)
if err != nil { if err != nil {
@ -511,7 +511,7 @@ func (d *DockerSource) TailDocker(container *ContainerConfig, outChan chan types
//This case is to handle temporarily losing the connection to the docker socket //This case is to handle temporarily losing the connection to the docker socket
//The only known case currently is when using docker-socket-proxy (and maybe a docker daemon restart) //The only known case currently is when using docker-socket-proxy (and maybe a docker daemon restart)
d.logger.Debugf("readerTomb dying for container %s, removing it from runningContainerState", container.Name) d.logger.Debugf("readerTomb dying for container %s, removing it from runningContainerState", container.Name)
delete(d.runningContainerState, container.ID) deleteChan <- container
//Also reset the Since to avoid re-reading logs //Also reset the Since to avoid re-reading logs
d.Config.Since = time.Now().UTC().Format(time.RFC3339) d.Config.Since = time.Now().UTC().Format(time.RFC3339)
d.containerLogsOptions.Since = d.Config.Since d.containerLogsOptions.Since = d.Config.Since
@ -529,7 +529,7 @@ func (d *DockerSource) DockerManager(in chan *ContainerConfig, deleteChan chan *
newContainer.t = &tomb.Tomb{} newContainer.t = &tomb.Tomb{}
newContainer.logger = d.logger.WithFields(log.Fields{"container_name": newContainer.Name}) newContainer.logger = d.logger.WithFields(log.Fields{"container_name": newContainer.Name})
newContainer.t.Go(func() error { newContainer.t.Go(func() error {
return d.TailDocker(newContainer, outChan) return d.TailDocker(newContainer, outChan, deleteChan)
}) })
d.runningContainerState[newContainer.ID] = newContainer d.runningContainerState[newContainer.ID] = newContainer
} }

View file

@ -2,6 +2,7 @@ package dockeracquisition
import ( import (
"context" "context"
"encoding/binary"
"fmt" "fmt"
"io" "io"
"os" "os"
@ -22,6 +23,8 @@ import (
const testContainerName = "docker_test" const testContainerName = "docker_test"
var readLogs = false
func TestConfigure(t *testing.T) { func TestConfigure(t *testing.T) {
log.Infof("Test 'TestConfigure'") log.Infof("Test 'TestConfigure'")
@ -159,6 +162,7 @@ container_name_regexp:
}) })
} }
readLogs = false
dockerTomb := tomb.Tomb{} dockerTomb := tomb.Tomb{}
out := make(chan types.Event) out := make(chan types.Event)
dockerSource := DockerSource{} dockerSource := DockerSource{}
@ -188,13 +192,11 @@ container_name_regexp:
} }
} }
}) })
time.Sleep(10 * time.Second)
cstest.AssertErrorContains(t, err, ts.expectedErr) cstest.AssertErrorContains(t, err, ts.expectedErr)
if err := readerTomb.Wait(); err != nil { if err := readerTomb.Wait(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
//time.Sleep(4 * time.Second)
if ts.expectedLines != 0 { if ts.expectedLines != 0 {
assert.Equal(t, ts.expectedLines, actualLines) assert.Equal(t, ts.expectedLines, actualLines)
} }
@ -207,6 +209,9 @@ container_name_regexp:
} }
func (cli *mockDockerCli) ContainerList(ctx context.Context, options dockerTypes.ContainerListOptions) ([]dockerTypes.Container, error) { func (cli *mockDockerCli) ContainerList(ctx context.Context, options dockerTypes.ContainerListOptions) ([]dockerTypes.Container, error) {
if readLogs == true {
return []dockerTypes.Container{}, nil
}
containers := make([]dockerTypes.Container, 0) containers := make([]dockerTypes.Container, 0)
container := &dockerTypes.Container{ container := &dockerTypes.Container{
ID: "12456", ID: "12456",
@ -218,11 +223,17 @@ func (cli *mockDockerCli) ContainerList(ctx context.Context, options dockerTypes
} }
func (cli *mockDockerCli) ContainerLogs(ctx context.Context, container string, options dockerTypes.ContainerLogsOptions) (io.ReadCloser, error) { func (cli *mockDockerCli) ContainerLogs(ctx context.Context, container string, options dockerTypes.ContainerLogsOptions) (io.ReadCloser, error) {
startLineByte := "\x01\x00\x00\x00\x00\x00\x00\x1f" if readLogs == true {
data := []string{"docker", "test", "1234"} return io.NopCloser(strings.NewReader("")), nil
}
readLogs = true
data := []string{"docker\n", "test\n", "1234\n"}
ret := "" ret := ""
for _, line := range data { for _, line := range data {
ret += fmt.Sprintf("%s%s\n", startLineByte, line) startLineByte := make([]byte, 8)
binary.LittleEndian.PutUint32(startLineByte, 1) //stdout stream
binary.BigEndian.PutUint32(startLineByte[4:], uint32(len(line)))
ret += fmt.Sprintf("%s%s", startLineByte, line)
} }
r := io.NopCloser(strings.NewReader(ret)) // r type is io.ReadCloser r := io.NopCloser(strings.NewReader(ret)) // r type is io.ReadCloser
return r, nil return r, nil
@ -281,6 +292,7 @@ func TestOneShot(t *testing.T) {
}) })
} }
readLogs = false
dockerClient := &DockerSource{} dockerClient := &DockerSource{}
labels := make(map[string]string) labels := make(map[string]string)
labels["type"] = ts.logType labels["type"] = ts.logType