crowdsec/pkg/acquisition/modules/waf/waf.go
2023-07-04 17:36:56 +02:00

543 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package wafacquisition
import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"strings"
"github.com/antonmedv/expr"
"github.com/corazawaf/coraza/v3"
"github.com/corazawaf/coraza/v3/experimental"
corazatypes "github.com/corazawaf/coraza/v3/types"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/crowdsecurity/crowdsec/pkg/waf"
"github.com/crowdsecurity/go-cs-lib/pkg/trace"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
"gopkg.in/yaml.v2"
)
const (
InBand = "inband"
OutOfBand = "outofband"
)
type WafRunner struct {
outChan chan types.Event
inChan chan waf.ParsedRequest
inBandWaf coraza.WAF
outOfBandWaf coraza.WAF
UUID string
RulesCollections []*waf.WafRulesCollection
}
type WafSourceConfig struct {
ListenAddr string `yaml:"listen_addr"`
ListenPort int `yaml:"listen_port"`
Path string `yaml:"path"`
WafRoutines int `yaml:"waf_routines"`
configuration.DataSourceCommonCfg `yaml:",inline"`
}
type WafSource struct {
config WafSourceConfig
logger *log.Entry
mux *http.ServeMux
server *http.Server
addr string
outChan chan types.Event
InChan chan waf.ParsedRequest
inBandWaf coraza.WAF
outOfBandWaf coraza.WAF
RulesCollections []*waf.WafRulesCollection
WafRunners []WafRunner
}
func (w *WafSource) GetMetrics() []prometheus.Collector {
return nil
}
func (w *WafSource) GetAggregMetrics() []prometheus.Collector {
return nil
}
func (w *WafSource) UnmarshalConfig(yamlConfig []byte) error {
wafConfig := WafSourceConfig{}
err := yaml.UnmarshalStrict(yamlConfig, &wafConfig)
if err != nil {
return errors.Wrap(err, "Cannot parse waf configuration")
}
w.config = wafConfig
if w.config.ListenAddr == "" {
return fmt.Errorf("listen_addr cannot be empty")
}
if w.config.ListenPort == 0 {
return fmt.Errorf("listen_port cannot be empty")
}
//FIXME: is that really needed ?
if w.config.Path == "" {
return fmt.Errorf("path cannot be empty")
}
if w.config.Path[0] != '/' {
w.config.Path = "/" + w.config.Path
}
if w.config.Mode == "" {
w.config.Mode = configuration.TAIL_MODE
}
return nil
}
func logError(error corazatypes.MatchedRule) {
msg := error.ErrorLog(0)
log.Infof("[logError][%s] %s", error.Rule().Severity(), msg)
}
func (w *WafSource) Configure(yamlConfig []byte, logger *log.Entry) error {
err := w.UnmarshalConfig(yamlConfig)
if err != nil {
return errors.Wrap(err, "Cannot parse waf configuration")
}
w.logger = logger
w.logger.Tracef("WAF configuration: %+v", w.config)
w.addr = fmt.Sprintf("%s:%d", w.config.ListenAddr, w.config.ListenPort)
w.mux = http.NewServeMux()
w.server = &http.Server{
Addr: w.addr,
Handler: w.mux,
}
ruleLoader := waf.NewWafRuleLoader()
rulesCollections, err := ruleLoader.LoadWafRules()
if err != nil {
return fmt.Errorf("cannot load WAF rules: %w", err)
}
w.RulesCollections = rulesCollections
var inBandRules string
var outOfBandRules string
//spew.Dump(rulesCollections)
for _, collection := range rulesCollections {
if !collection.OutOfBand {
inBandRules += collection.String() + "\n"
} else {
outOfBandRules += collection.String() + "\n"
}
}
w.logger.Infof("Loading %d in-band rules", len(strings.Split(inBandRules, "\n")))
//w.logger.Infof("Loading rules %+v", inBandRules)
fs := os.DirFS(ruleLoader.Datadir)
// always have at least one waf routine
if w.config.WafRoutines == 0 {
w.config.WafRoutines = 1
}
w.InChan = make(chan waf.ParsedRequest)
w.WafRunners = make([]WafRunner, w.config.WafRoutines)
for nbRoutine := 0; nbRoutine < w.config.WafRoutines; nbRoutine++ {
w.logger.Infof("Loading %d in-band rules", len(strings.Split(inBandRules, "\n")))
//in-band waf : kill on sight
inbandwaf, err := coraza.NewWAF(
coraza.NewWAFConfig().
//WithErrorCallback(logError).
WithDirectives(inBandRules).WithRootFS(fs),
)
if err != nil {
return errors.Wrap(err, "Cannot create WAF")
}
w.logger.Infof("Loading %d out-of-band rules", len(strings.Split(outOfBandRules, "\n")))
//out-of-band waf : log only
outofbandwaf, err := coraza.NewWAF(
coraza.NewWAFConfig().
//WithErrorCallback(logError).
WithDirectives(outOfBandRules).WithRootFS(fs),
)
if err != nil {
return errors.Wrap(err, "Cannot create WAF")
}
runner := WafRunner{
outOfBandWaf: outofbandwaf,
inBandWaf: inbandwaf,
inChan: w.InChan,
UUID: uuid.New().String(),
RulesCollections: rulesCollections,
}
w.WafRunners[nbRoutine] = runner
}
w.logger.Infof("Loading %d out-of-band rules", len(strings.Split(outOfBandRules, "\n")))
if err != nil {
return errors.Wrap(err, "Cannot create WAF")
}
//We don´t use the wrapper provided by coraza because we want to fully control what happens when a rule match to send the information in crowdsec
w.mux.HandleFunc(w.config.Path, w.wafHandler)
return nil
}
func (w *WafSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
return fmt.Errorf("WAF datasource does not support command line acquisition")
}
func (w *WafSource) GetMode() string {
return w.config.Mode
}
func (w *WafSource) GetName() string {
return "waf"
}
func (w *WafSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
return fmt.Errorf("WAF datasource does not support command line acquisition")
}
func (w *WafSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
w.outChan = out
t.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis/waf/live")
w.logger.Infof("%d waf runner to start", len(w.WafRunners))
for _, runner := range w.WafRunners {
w.logger.Infof("Running waf runner: %s", runner.UUID)
runner.outChan = out
t.Go(func() error {
return runner.Run(t)
})
}
w.logger.Infof("Starting WAF server on %s:%d%s", w.config.ListenAddr, w.config.ListenPort, w.config.Path)
t.Go(func() error {
err := w.server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
return errors.Wrap(err, "WAF server failed")
}
return nil
})
<-t.Dying()
w.logger.Infof("Stopping WAF server on %s:%d%s", w.config.ListenAddr, w.config.ListenPort, w.config.Path)
w.server.Shutdown(context.TODO())
return nil
})
return nil
}
func (w *WafSource) CanRun() error {
return nil
}
func (w *WafSource) GetUuid() string {
return w.config.UniqueId
}
func (w *WafSource) Dump() interface{} {
return w
}
func processReqWithEngine(tx experimental.FullTransaction, r waf.ParsedRequest, wafType string) (*corazatypes.Interruption, experimental.FullTransaction, error) {
var in *corazatypes.Interruption
if tx.IsRuleEngineOff() {
log.Printf("engine is off")
return nil, nil, nil
}
defer func() {
tx.ProcessLogging()
tx.Close()
}()
//this method is not exported by coraza, so we have to do it ourselves.
//ideally, this would be dealt with by expr code, and we provide helpers to manipulate the transaction object?\
//var txx experimental.FullTransaction
//txx := experimental.ToFullInterface(tx)
//txx = tx.(experimental.FullTransaction)
//txx.RemoveRuleByID(1)
tx.ProcessConnection(r.ClientIP, 0, "", 0)
//tx.ProcessURI(r.URL.String(), r.Method, r.Proto) //FIXME: get it from the headers
tx.ProcessURI(r.URI, r.Method, r.Proto) //FIXME: get it from the headers
for k, vr := range r.Headers {
for _, v := range vr {
tx.AddRequestHeader(k, v)
}
}
if r.ClientHost != "" {
tx.AddRequestHeader("Host", r.ClientHost)
// This connector relies on the host header (now host field) to populate ServerName
tx.SetServerName(r.ClientHost)
}
if r.TransferEncoding != nil {
tx.AddRequestHeader("Transfer-Encoding", r.TransferEncoding[0])
}
in = tx.ProcessRequestHeaders()
//spew.Dump(in)
//spew.Dump(tx.MatchedRules())
for _, rule := range tx.MatchedRules() {
if rule.Message() == "" {
continue
}
}
//if we're inband, we should stop here, but for outofband go to the end
if in != nil && wafType == InBand {
return in, tx, nil
}
ct := r.Headers.Get("content-type")
if r.Body != nil && len(r.Body) != 0 {
it, _, err := tx.WriteRequestBody(r.Body)
if err != nil {
return nil, nil, errors.Wrap(err, "Cannot read request body")
}
if it != nil {
return it, nil, nil
}
// from https://github.com/corazawaf/coraza/blob/main/internal/corazawaf/transaction.go#L419
// urlencoded cannot end with CRLF
if ct != "application/x-www-form-urlencoded" {
it, _, err := tx.WriteRequestBody([]byte{'\r', '\n'})
if err != nil {
return nil, nil, fmt.Errorf("cannot write to request body to buffer: %s", err.Error())
}
if it != nil {
return it, nil, nil
}
}
}
in, err := tx.ProcessRequestBody()
if err != nil {
return nil, nil, errors.Wrap(err, "Cannot process request body")
}
if in != nil && wafType == InBand {
return in, tx, nil
}
return nil, tx, nil
}
func (r *WafRunner) Run(t *tomb.Tomb) error {
for {
select {
case <-t.Dying():
log.Infof("Waf Runner is dying")
return nil
case request := <-r.inChan:
inBoundTx := r.inBandWaf.NewTransactionWithID(request.UUID)
expTx := inBoundTx.(experimental.FullTransaction)
// we use this internal transaction for the expr helpers
tx := waf.NewTransaction(expTx)
//Run the pre_eval hooks
for _, rules := range r.RulesCollections {
if len(rules.CompiledPreEval) == 0 {
continue
}
for _, compiledHook := range rules.CompiledPreEval {
if compiledHook.Filter != nil {
res, err := expr.Run(compiledHook.Filter, map[string]interface{}{
"rules": rules,
"req": request,
})
if err != nil {
log.Errorf("unable to run PreEval filter: %s", err)
continue
}
switch t := res.(type) {
case bool:
if t == false {
log.Infof("filter didnt match")
continue
}
default:
log.Errorf("Filter must return a boolean, can't filter")
continue
}
}
// here means there is no filter or the filter matched
for _, applyExpr := range compiledHook.Apply {
_, err := expr.Run(applyExpr, map[string]interface{}{
"rules": rules,
"req": request,
"RemoveRuleByID": tx.RemoveRuleByIDWithError,
})
if err != nil {
log.Errorf("unable to apply filter: %s", err)
continue
}
}
}
}
in, expTx, err := processReqWithEngine(expTx, request, InBand)
request.Tx = expTx
response := waf.NewResponseRequest(expTx, in, request.UUID, err)
// run the on_match hooks
for _, rules := range r.RulesCollections {
if len(rules.CompiledOnMatch) == 0 {
continue
}
for _, compiledHook := range rules.CompiledOnMatch {
if compiledHook.Filter != nil {
res, err := expr.Run(compiledHook.Filter, map[string]interface{}{
"rules": rules,
"req": request,
})
if err != nil {
log.Errorf("unable to run PreEval filter: %s", err)
continue
}
switch t := res.(type) {
case bool:
if t == false {
continue
}
default:
log.Errorf("Filter must return a boolean, can't filter")
continue
}
}
// here means there is no filter or the filter matched
for _, applyExpr := range compiledHook.Apply {
_, err := expr.Run(applyExpr, map[string]interface{}{
"rules": rules,
"req": request,
"RemoveRuleByID": tx.RemoveRuleByIDWithError,
"SetRemediation": response.SetRemediation,
"SetRemediationByID": response.SetRemediationByID,
"CancelEvent": response.CancelEvent,
})
if err != nil {
log.Errorf("unable to apply filter: %s", err)
continue
}
}
}
}
// send back the result to the HTTP handler for the InBand part
request.ResponseChannel <- response
if in != nil && response.SendEvents {
// Generate the events for InBand channel
events, err := TxToEvents(request, InBand)
if err != nil {
log.Errorf("Cannot convert transaction to events : %s", err)
continue
}
for _, evt := range events {
r.outChan <- evt
}
}
// Process outBand
outBandTx := r.outOfBandWaf.NewTransactionWithID(request.UUID)
expTx = outBandTx.(experimental.FullTransaction)
in, expTx, err = processReqWithEngine(expTx, request, OutOfBand)
if err != nil { //things went south
log.Errorf("Error while processing request : %s", err)
continue
}
request.Tx = expTx
if expTx != nil && len(expTx.MatchedRules()) > 0 {
events, err := TxToEvents(request, OutOfBand)
log.Infof("Request triggered by WAF, %d events to send", len(events))
for _, evt := range events {
r.outChan <- evt
}
if err != nil {
log.Errorf("Cannot convert transaction to events : %s", err)
continue
}
}
}
}
}
type BodyResponse struct {
Action string `json:"action"`
}
func (w *WafSource) wafHandler(rw http.ResponseWriter, r *http.Request) {
// parse the request only once
parsedRequest, err := waf.NewParsedRequestFromRequest(r)
if err != nil {
log.Errorf("%s", err)
rw.WriteHeader(http.StatusForbidden)
return
}
w.InChan <- parsedRequest
message := <-parsedRequest.ResponseChannel
if message.Err != nil {
log.Errorf("Error while processing InBAND: %s", err)
rw.WriteHeader(http.StatusOK)
return
}
if message.Interruption != nil {
rw.WriteHeader(http.StatusForbidden)
body, err := json.Marshal(BodyResponse{Action: message.Interruption.Action})
if err != nil {
log.Errorf("unable to build response: %s", err)
} else {
rw.Write(body)
}
return
}
rw.WriteHeader(http.StatusOK)
body, err := json.Marshal(BodyResponse{Action: "allow"})
if err != nil {
log.Errorf("unable to build response: %s", err)
} else {
rw.Write(body)
}
return
}