diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index 101d8d0fc..4ae9ed951 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -462,10 +462,10 @@ func ReadTailScanner(scanner *bufio.Scanner, out chan string, t *tomb.Tomb) erro for scanner.Scan() { out <- scanner.Text() } - return nil + return scanner.Err() } -func (d *DockerSource) TailDocker(container *ContainerConfig, outChan chan types.Event) error { +func (d *DockerSource) TailDocker(container *ContainerConfig, outChan chan types.Event, deleteChan chan *ContainerConfig) error { container.logger.Infof("start tail for container %s", container.Name) dockerReader, err := d.Client.ContainerLogs(context.Background(), container.ID, *d.containerLogsOptions) if err != nil { @@ -511,7 +511,7 @@ func (d *DockerSource) TailDocker(container *ContainerConfig, outChan chan types //This case is to handle temporarily losing the connection to the docker socket //The only known case currently is when using docker-socket-proxy (and maybe a docker daemon restart) d.logger.Debugf("readerTomb dying for container %s, removing it from runningContainerState", container.Name) - delete(d.runningContainerState, container.ID) + deleteChan <- container //Also reset the Since to avoid re-reading logs d.Config.Since = time.Now().UTC().Format(time.RFC3339) d.containerLogsOptions.Since = d.Config.Since @@ -529,7 +529,7 @@ func (d *DockerSource) DockerManager(in chan *ContainerConfig, deleteChan chan * newContainer.t = &tomb.Tomb{} newContainer.logger = d.logger.WithFields(log.Fields{"container_name": newContainer.Name}) newContainer.t.Go(func() error { - return d.TailDocker(newContainer, outChan) + return d.TailDocker(newContainer, outChan, deleteChan) }) d.runningContainerState[newContainer.ID] = newContainer } diff --git a/pkg/acquisition/modules/docker/docker_test.go b/pkg/acquisition/modules/docker/docker_test.go index e1bbf07e5..75ee234d0 100644 --- a/pkg/acquisition/modules/docker/docker_test.go +++ b/pkg/acquisition/modules/docker/docker_test.go @@ -2,6 +2,7 @@ package dockeracquisition import ( "context" + "encoding/binary" "fmt" "io" "os" @@ -22,6 +23,8 @@ import ( const testContainerName = "docker_test" +var readLogs = false + func TestConfigure(t *testing.T) { log.Infof("Test 'TestConfigure'") @@ -159,6 +162,7 @@ container_name_regexp: }) } + readLogs = false dockerTomb := tomb.Tomb{} out := make(chan types.Event) dockerSource := DockerSource{} @@ -188,13 +192,11 @@ container_name_regexp: } } }) - time.Sleep(10 * time.Second) cstest.AssertErrorContains(t, err, ts.expectedErr) if err := readerTomb.Wait(); err != nil { t.Fatal(err) } - //time.Sleep(4 * time.Second) if ts.expectedLines != 0 { assert.Equal(t, ts.expectedLines, actualLines) } @@ -207,6 +209,9 @@ container_name_regexp: } func (cli *mockDockerCli) ContainerList(ctx context.Context, options dockerTypes.ContainerListOptions) ([]dockerTypes.Container, error) { + if readLogs == true { + return []dockerTypes.Container{}, nil + } containers := make([]dockerTypes.Container, 0) container := &dockerTypes.Container{ ID: "12456", @@ -218,11 +223,17 @@ func (cli *mockDockerCli) ContainerList(ctx context.Context, options dockerTypes } func (cli *mockDockerCli) ContainerLogs(ctx context.Context, container string, options dockerTypes.ContainerLogsOptions) (io.ReadCloser, error) { - startLineByte := "\x01\x00\x00\x00\x00\x00\x00\x1f" - data := []string{"docker", "test", "1234"} + if readLogs == true { + return io.NopCloser(strings.NewReader("")), nil + } + readLogs = true + data := []string{"docker\n", "test\n", "1234\n"} ret := "" for _, line := range data { - ret += fmt.Sprintf("%s%s\n", startLineByte, line) + startLineByte := make([]byte, 8) + binary.LittleEndian.PutUint32(startLineByte, 1) //stdout stream + binary.BigEndian.PutUint32(startLineByte[4:], uint32(len(line))) + ret += fmt.Sprintf("%s%s", startLineByte, line) } r := io.NopCloser(strings.NewReader(ret)) // r type is io.ReadCloser return r, nil @@ -281,6 +292,7 @@ func TestOneShot(t *testing.T) { }) } + readLogs = false dockerClient := &DockerSource{} labels := make(map[string]string) labels["type"] = ts.logType