diff --git a/pkg/acquisition/modules/syslog/internal/syslogserver.go b/pkg/acquisition/modules/syslog/internal/syslogserver.go index 0a08dcfdd..4fc31f203 100644 --- a/pkg/acquisition/modules/syslog/internal/syslogserver.go +++ b/pkg/acquisition/modules/syslog/internal/syslogserver.go @@ -12,6 +12,7 @@ type SyslogServer struct { proto string listenAddr string port int + channel chan string tcpListener *net.TCPListener udpConn *net.UDPConn parsingTombs []*tomb.Tomb @@ -54,3 +55,7 @@ func (s *SyslogServer) Listen(listenAddr string, port int) error { } return nil } + +func (s *SyslogServer) SetChannel(c chan string) { + s.channel = c +} diff --git a/pkg/acquisition/modules/syslog/syslog.go b/pkg/acquisition/modules/syslog/syslog.go index 2fee51e64..cf9330006 100644 --- a/pkg/acquisition/modules/syslog/syslog.go +++ b/pkg/acquisition/modules/syslog/syslog.go @@ -6,6 +6,7 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" syslogserver "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/syslog/internal" "github.com/crowdsecurity/crowdsec/pkg/types" + "github.com/davecgh/go-spew/spew" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" @@ -82,11 +83,17 @@ func (s *SyslogSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) //channel := make(syslog.LogPartsChannel) //handler := syslog.NewChannelHandler(channel) + c := make(chan string) s.server = &syslogserver.SyslogServer{} err := s.server.SetProtocol(s.config.Proto) + s.server.SetChannel(c) if err != nil { return errors.Wrap(err, "could not set syslog server protocol") } + err = s.server.Listen(s.config.Addr, s.config.Port) + if err != nil { + return errors.Wrap(err, "could not start syslog server") + } //s.server.SetHandler(handler) //err := s.server.ListenUDP(fmt.Sprintf("%s:%d", s.config.Addr, s.config.Port)) /*if err != nil { @@ -95,25 +102,25 @@ func (s *SyslogSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) err = s.server.Boot() if err != nil { return errors.Wrap(err, "could not start syslog server") - } + }*/ t.Go(func() error { defer types.CatchPanic("crowdsec/acquis/syslog/live") - return s.handleSyslogMsg(out, t, channel) - })*/ + return s.handleSyslogMsg(out, t, c) + }) return nil } -func (s *SyslogSource) handleSyslogMsg(out chan types.Event, t *tomb.Tomb) error { +func (s *SyslogSource) handleSyslogMsg(out chan types.Event, t *tomb.Tomb, c chan string) error { for { select { case <-t.Dying(): s.logger.Info("Syslog datasource is dying") - /*case logParts := <-channel: - var line string - spew.Dump(logParts) + case syslogLine := <-c: + //var line string + spew.Dump(syslogLine) //rebuild the syslog line from the part //TODO: handle the RFC format and cases such as missing PID, or PID embedded in the app_name - if logParts["content"] == nil { + /*if logParts["content"] == nil { line = fmt.Sprintf("%s %s %s[%s]: %s", logParts["timestamp"], logParts["hostname"], logParts["app_name"], logParts["proc_id"], logParts["message"]) } else { @@ -127,6 +134,7 @@ func (s *SyslogSource) handleSyslogMsg(out chan types.Event, t *tomb.Tomb) error l.Src = logParts["client"].(string) l.Process = true out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: leaky.LIVE}*/ + } } }