diff --git a/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go b/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go index 1cabcc035..d922909a4 100644 --- a/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go +++ b/pkg/acquisition/modules/cloudwatch/cloudwatch_test.go @@ -757,48 +757,25 @@ func TestOneShotAcquisition(t *testing.T) { if tc.setup != nil { tc.setup(t, &cw) } - out := make(chan types.Event) + out := make(chan types.Event, 100) tmb := tomb.Tomb{} var rcvdEvts []types.Event dbgLogger.Infof("running StreamingAcquisition") - actmb := tomb.Tomb{} - actmb.Go(func() error { - err := cw.OneShotAcquisition(out, &actmb) - dbgLogger.Infof("acquis done") - cstest.RequireErrorContains(t, err, tc.expectedStartErr) - return nil - }) - + err = cw.OneShotAcquisition(out, &tmb) + dbgLogger.Infof("acquis done") + cstest.RequireErrorContains(t, err, tc.expectedStartErr) + close(out) // let's empty output chan - tmb.Go(func() error { - for { - select { - case in := <-out: - log.Debugf("received event %+v", in) - rcvdEvts = append(rcvdEvts, in) - case <-tmb.Dying(): - log.Debugf("pumper died") - return nil - } - } - }) + for evt := range out { + rcvdEvts = append(rcvdEvts, evt) + } if tc.run != nil { tc.run(t, &cw) } else { dbgLogger.Warning("no code to run") } - - time.Sleep(5 * time.Second) - dbgLogger.Infof("killing collector") - tmb.Kill(nil) - <-tmb.Dead() - dbgLogger.Infof("killing datasource") - actmb.Kill(nil) - dbgLogger.Infof("waiting datasource death") - <-actmb.Dead() - // check results if tc.expectedResLen != -1 { if tc.expectedResLen != len(rcvdEvts) { t.Fatalf("%s : expected %d results got %d -> %v", tc.name, tc.expectedResLen, len(rcvdEvts), rcvdEvts) diff --git a/pkg/acquisition/modules/docker/docker_test.go b/pkg/acquisition/modules/docker/docker_test.go index 8928c1ae2..d019da314 100644 --- a/pkg/acquisition/modules/docker/docker_test.go +++ b/pkg/acquisition/modules/docker/docker_test.go @@ -307,29 +307,14 @@ func TestOneShot(t *testing.T) { t.Fatalf("unable to configure dsn '%s': %s", ts.dsn, err) } dockerClient.Client = new(mockDockerCli) - out := make(chan types.Event) - actualLines := 0 - if ts.expectedLines != 0 { - go func() { - READLOOP: - for { - select { - case <-out: - actualLines++ - case <-time.After(1 * time.Second): - break READLOOP - } - } - }() - } + out := make(chan types.Event, 100) tomb := tomb.Tomb{} err := dockerClient.OneShotAcquisition(out, &tomb) cstest.AssertErrorContains(t, err, ts.expectedErr) // else we do the check before actualLines is incremented ... - time.Sleep(1 * time.Second) if ts.expectedLines != 0 { - assert.Equal(t, ts.expectedLines, actualLines) + assert.Equal(t, ts.expectedLines, len(out)) } } diff --git a/pkg/acquisition/modules/journalctl/journalctl_test.go b/pkg/acquisition/modules/journalctl/journalctl_test.go index 5511a01e3..04280943a 100644 --- a/pkg/acquisition/modules/journalctl/journalctl_test.go +++ b/pkg/acquisition/modules/journalctl/journalctl_test.go @@ -151,27 +151,12 @@ journalctl_filter: }) } tomb := tomb.Tomb{} - out := make(chan types.Event) + out := make(chan types.Event, 100) j := JournalCtlSource{} err := j.Configure([]byte(ts.config), subLogger) if err != nil { t.Fatalf("Unexpected error : %s", err) } - actualLines := 0 - if ts.expectedLines != 0 { - go func() { - READLOOP: - for { - select { - case <-out: - actualLines++ - case <-time.After(1 * time.Second): - break READLOOP - } - } - }() - } - err = j.OneShotAcquisition(out, &tomb) cstest.AssertErrorContains(t, err, ts.expectedErr) if err != nil { @@ -179,7 +164,7 @@ journalctl_filter: } if ts.expectedLines != 0 { - assert.Equal(t, ts.expectedLines, actualLines) + assert.Equal(t, ts.expectedLines, len(out)) } if ts.expectedOutput != "" {