optim
This commit is contained in:
parent
fa172bed56
commit
40f65de7b9
|
@ -38,6 +38,7 @@ type WafSource struct {
|
||||||
addr string
|
addr string
|
||||||
outChan chan types.Event
|
outChan chan types.Event
|
||||||
OutOfBandChan chan ParsedRequest
|
OutOfBandChan chan ParsedRequest
|
||||||
|
InBandChan chan ParsedRequest
|
||||||
|
|
||||||
inBandWaf coraza.WAF
|
inBandWaf coraza.WAF
|
||||||
outOfBandWaf coraza.WAF
|
outOfBandWaf coraza.WAF
|
||||||
|
@ -190,6 +191,7 @@ func (w *WafSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error
|
||||||
func (w *WafSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
|
func (w *WafSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
|
||||||
w.outChan = out
|
w.outChan = out
|
||||||
w.OutOfBandChan = make(chan ParsedRequest)
|
w.OutOfBandChan = make(chan ParsedRequest)
|
||||||
|
w.InBandChan = make(chan ParsedRequest)
|
||||||
t.Go(func() error {
|
t.Go(func() error {
|
||||||
defer trace.CatchPanic("crowdsec/acquis/waf/live")
|
defer trace.CatchPanic("crowdsec/acquis/waf/live")
|
||||||
|
|
||||||
|
@ -201,6 +203,14 @@ func (w *WafSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) err
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// start InBand GoRoutine
|
||||||
|
t.Go(func() error {
|
||||||
|
if err := w.ProcessInBand(t); err != nil {
|
||||||
|
return errors.Wrap(err, "Processing in-band routine failed: %s")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
w.logger.Infof("Starting WAF server on %s:%d%s", w.config.ListenAddr, w.config.ListenPort, w.config.Path)
|
w.logger.Infof("Starting WAF server on %s:%d%s", w.config.ListenAddr, w.config.ListenPort, w.config.Path)
|
||||||
t.Go(func() error {
|
t.Go(func() error {
|
||||||
err := w.server.ListenAndServe()
|
err := w.server.ListenAndServe()
|
||||||
|
@ -239,6 +249,7 @@ type ParsedRequest struct {
|
||||||
Body []byte
|
Body []byte
|
||||||
TransferEncoding []string
|
TransferEncoding []string
|
||||||
UUID string
|
UUID string
|
||||||
|
Tx corazatypes.Transaction
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewParsedRequestFromRequest(r *http.Request) (ParsedRequest, error) {
|
func NewParsedRequestFromRequest(r *http.Request) (ParsedRequest, error) {
|
||||||
|
@ -349,14 +360,14 @@ func processReqWithEngine(waf coraza.WAF, r ParsedRequest, uuid string, wafType
|
||||||
return nil, tx, nil
|
return nil, tx, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WafSource) TxToEvents(tx corazatypes.Transaction, r ParsedRequest, kind string) ([]types.Event, error) {
|
func (w *WafSource) TxToEvents(r ParsedRequest, kind string) ([]types.Event, error) {
|
||||||
evts := []types.Event{}
|
evts := []types.Event{}
|
||||||
if tx == nil {
|
if r.Tx == nil {
|
||||||
return nil, fmt.Errorf("tx is nil")
|
return nil, fmt.Errorf("tx is nil")
|
||||||
}
|
}
|
||||||
for idx, rule := range tx.MatchedRules() {
|
for idx, rule := range r.Tx.MatchedRules() {
|
||||||
log.Printf("rule %d", idx)
|
log.Printf("rule %d", idx)
|
||||||
evt, err := w.RuleMatchToEvent(rule, tx, r, kind)
|
evt, err := w.RuleMatchToEvent(rule, r.Tx, r, kind)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "Cannot convert rule match to event")
|
return nil, errors.Wrap(err, "Cannot convert rule match to event")
|
||||||
}
|
}
|
||||||
|
@ -430,8 +441,9 @@ func (w *WafSource) ProcessOutBand(t *tomb.Tomb) error {
|
||||||
log.Errorf("Error while processing request : %s", err)
|
log.Errorf("Error while processing request : %s", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
r.Tx = tx2
|
||||||
if tx2 != nil && len(tx2.MatchedRules()) > 0 {
|
if tx2 != nil && len(tx2.MatchedRules()) > 0 {
|
||||||
events, err := w.TxToEvents(tx2, r, OutOfBand)
|
events, err := w.TxToEvents(r, OutOfBand)
|
||||||
log.Infof("Request triggered by WAF, %d events to send", len(events))
|
log.Infof("Request triggered by WAF, %d events to send", len(events))
|
||||||
for _, evt := range events {
|
for _, evt := range events {
|
||||||
w.outChan <- evt
|
w.outChan <- evt
|
||||||
|
@ -446,6 +458,27 @@ func (w *WafSource) ProcessOutBand(t *tomb.Tomb) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *WafSource) ProcessInBand(t *tomb.Tomb) error {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-t.Dying():
|
||||||
|
log.Infof("OutOfBand function is dying")
|
||||||
|
return nil
|
||||||
|
case r := <-w.InBandChan:
|
||||||
|
events, err := w.TxToEvents(r, InBand)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Cannot convert transaction to events : %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Request blocked by WAF, %d events to send", len(events))
|
||||||
|
for _, evt := range events {
|
||||||
|
w.outChan <- evt
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (w *WafSource) wafHandler(rw http.ResponseWriter, r *http.Request) {
|
func (w *WafSource) wafHandler(rw http.ResponseWriter, r *http.Request) {
|
||||||
//let's gen a transaction id to keep consistance accross in-band and out-of-band
|
//let's gen a transaction id to keep consistance accross in-band and out-of-band
|
||||||
uuid := uuid.New().String()
|
uuid := uuid.New().String()
|
||||||
|
@ -468,23 +501,17 @@ func (w *WafSource) wafHandler(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
if in != nil {
|
if in != nil {
|
||||||
rw.WriteHeader(http.StatusForbidden)
|
rw.WriteHeader(http.StatusForbidden)
|
||||||
events, err := w.TxToEvents(tx, parsedRequest, InBand)
|
go func() {
|
||||||
log.Infof("Request blocked by WAF, %d events to send", len(events))
|
parsedRequest.Tx = tx
|
||||||
for _, evt := range events {
|
w.InBandChan <- parsedRequest
|
||||||
w.outChan <- evt
|
}()
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Cannot convert transaction to events : %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// we finished the inband, we can return 200
|
// we finished the inband, we can return 200
|
||||||
rw.WriteHeader(http.StatusOK)
|
rw.WriteHeader(http.StatusOK)
|
||||||
// now we can process out of band asynchronously
|
|
||||||
|
//now we can process out of band asynchronously
|
||||||
go func() {
|
go func() {
|
||||||
w.OutOfBandChan <- parsedRequest
|
w.OutOfBandChan <- parsedRequest
|
||||||
}()
|
}()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue