don't discard error, don't try to unmarshal empty string, don't return if it happen

This commit is contained in:
Thibault bui Koechlin 2020-05-22 11:23:47 +02:00
parent 552d757b42
commit 0076727c29
2 changed files with 23 additions and 5 deletions

View file

@ -289,19 +289,34 @@ func main() {
//start go-routines for parsing, buckets pour and ouputs.
for i := 0; i < nbParser; i++ {
parsersTomb.Go(func() error {
return runParse(inputLineChan, inputEventChan, *parserCTX, parserNodes)
err := runParse(inputLineChan, inputEventChan, *parserCTX, parserNodes)
if err != nil {
log.Errorf("runParse error : %s", err)
return err
}
return nil
})
}
for i := 0; i < nbParser; i++ {
bucketsTomb.Go(func() error {
return runPour(inputEventChan, holders, buckets)
err := runPour(inputEventChan, holders, buckets)
if err != nil {
log.Errorf("runPour error : %s", err)
return err
}
return nil
})
}
for i := 0; i < nbParser; i++ {
outputsTomb.Go(func() error {
return runOutput(inputEventChan, outputEventChan, holders, buckets, *postOverflowCTX, postOverflowNodes, outputProfiles, outputRunner)
err := runOutput(inputEventChan, outputEventChan, holders, buckets, *postOverflowCTX, postOverflowNodes, outputProfiles, outputRunner)
if err != nil {
log.Errorf("runPour error : %s", err)
return err
}
return nil
})
}

View file

@ -59,9 +59,12 @@ LOOP:
if cConfig.Profiling {
bucketStat.AddTime(time.Since(start))
}
if err := lastProcessedItem.UnmarshalText([]byte(parsed.MarshaledTime)); err != nil {
return fmt.Errorf("failed to unmarshal item : %s", err)
if len(parsed.MarshaledTime) != 0 {
if err := lastProcessedItem.UnmarshalText([]byte(parsed.MarshaledTime)); err != nil {
log.Debugf("failed to unmarshal time from event : %s", err)
}
}
}
}
log.Infof("Sending signal Bucketify")