diff --git a/pkg/apiserver/controllers/v1/decisions.go b/pkg/apiserver/controllers/v1/decisions.go index 6f3f947ed..24aac198b 100644 --- a/pkg/apiserver/controllers/v1/decisions.go +++ b/pkg/apiserver/controllers/v1/decisions.go @@ -1,30 +1,24 @@ package v1 import ( + "encoding/json" + "fmt" "net/http" "strconv" "time" "github.com/crowdsecurity/crowdsec/pkg/database/ent" + "github.com/crowdsecurity/crowdsec/pkg/fflag" "github.com/crowdsecurity/crowdsec/pkg/models" "github.com/gin-gonic/gin" log "github.com/sirupsen/logrus" ) -//Format decisions for the bouncers, and deduplicate them by keeping only the longest one -func FormatDecisions(decisions []*ent.Decision, dedup bool) ([]*models.Decision, error) { +// Format decisions for the bouncers +func FormatDecisions(decisions []*ent.Decision) []*models.Decision { var results []*models.Decision - seen := make(map[string]struct{}, 0) - for _, dbDecision := range decisions { - if dedup { - key := dbDecision.Value + dbDecision.Scope + dbDecision.Type - if _, ok := seen[key]; ok { - continue - } - seen[key] = struct{}{} - } duration := dbDecision.Until.Sub(time.Now().UTC()).String() decision := models.Decision{ ID: int64(dbDecision.ID), @@ -38,7 +32,7 @@ func FormatDecisions(decisions []*ent.Decision, dedup bool) ([]*models.Decision, } results = append(results, &decision) } - return results, nil + return results } func (c *Controller) GetDecision(gctx *gin.Context) { @@ -58,11 +52,7 @@ func (c *Controller) GetDecision(gctx *gin.Context) { return } - results, err = FormatDecisions(data, false) - if err != nil { - gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()}) - return - } + results = FormatDecisions(data) /*let's follow a naive logic : when a bouncer queries /decisions, if the answer is empty, we assume there is no decision for this ip/user/..., but if it's non-empty, it means that there is one or more decisions for this target*/ if len(results) > 0 { @@ -100,7 +90,7 @@ func (c *Controller) DeleteDecisionById(gctx *gin.Context) { return } //transform deleted decisions to be sendable to capi - deletedDecisions, err := FormatDecisions(deletedFromDB, false) + deletedDecisions := FormatDecisions(deletedFromDB) if err != nil { log.Warningf("failed to format decisions: %v", err) } @@ -124,7 +114,7 @@ func (c *Controller) DeleteDecisions(gctx *gin.Context) { return } //transform deleted decisions to be sendable to capi - deletedDecisions, err := FormatDecisions(deletedFromDB, false) + deletedDecisions := FormatDecisions(deletedFromDB) if err != nil { log.Warningf("failed to format decisions: %v", err) } @@ -139,72 +129,190 @@ func (c *Controller) DeleteDecisions(gctx *gin.Context) { gctx.JSON(http.StatusOK, deleteDecisionResp) } -func (c *Controller) StreamDecision(gctx *gin.Context) { +func writeStartupDecisions(gctx *gin.Context, filters map[string][]string, dbFunc func(map[string][]string) ([]*ent.Decision, error)) error { + // respBuffer := bytes.NewBuffer([]byte{}) + limit := 30000 //FIXME : make it configurable + needComma := false + lastId := 0 + + limitStr := fmt.Sprintf("%d", limit) + filters["limit"] = []string{limitStr} + for { + if lastId > 0 { + lastIdStr := fmt.Sprintf("%d", lastId) + filters["id_gt"] = []string{lastIdStr} + } + + data, err := dbFunc(filters) + if err != nil { + return err + } + if len(data) > 0 { + lastId = data[len(data)-1].ID + results := FormatDecisions(data) + for _, decision := range results { + decisionJSON, _ := json.Marshal(decision) + if needComma { + //respBuffer.Write([]byte(",")) + gctx.Writer.Write([]byte(",")) + } else { + needComma = true + } + //respBuffer.Write(decisionJSON) + //_, err := gctx.Writer.Write(respBuffer.Bytes()) + _, err := gctx.Writer.Write(decisionJSON) + if err != nil { + gctx.Writer.Flush() + return err + } + //respBuffer.Reset() + } + } + log.Debugf("startup: %d decisions returned (limit: %d, lastid: %d)", len(data), limit, lastId) + if len(data) < limit { + gctx.Writer.Flush() + break + } + } + return nil +} + +func writeDeltaDecisions(gctx *gin.Context, filters map[string][]string, lastPull time.Time, dbFunc func(time.Time, map[string][]string) ([]*ent.Decision, error)) error { + //respBuffer := bytes.NewBuffer([]byte{}) + limit := 30000 //FIXME : make it configurable + needComma := false + lastId := 0 + + limitStr := fmt.Sprintf("%d", limit) + filters["limit"] = []string{limitStr} + for { + if lastId > 0 { + lastIdStr := fmt.Sprintf("%d", lastId) + filters["id_gt"] = []string{lastIdStr} + } + + data, err := dbFunc(lastPull, filters) + if err != nil { + return err + } + if len(data) > 0 { + lastId = data[len(data)-1].ID + results := FormatDecisions(data) + for _, decision := range results { + decisionJSON, _ := json.Marshal(decision) + if needComma { + //respBuffer.Write([]byte(",")) + gctx.Writer.Write([]byte(",")) + } else { + needComma = true + } + //respBuffer.Write(decisionJSON) + //_, err := gctx.Writer.Write(respBuffer.Bytes()) + _, err := gctx.Writer.Write(decisionJSON) + if err != nil { + gctx.Writer.Flush() + return err + } + //respBuffer.Reset() + } + } + log.Debugf("startup: %d decisions returned (limit: %d, lastid: %d)", len(data), limit, lastId) + if len(data) < limit { + gctx.Writer.Flush() + break + } + } + return nil +} + +func (c *Controller) StreamDecisionChunked(gctx *gin.Context, bouncerInfo *ent.Bouncer, streamStartTime time.Time, filters map[string][]string) error { + var err error + + gctx.Writer.Header().Set("Content-Type", "application/json") + gctx.Writer.Header().Set("Transfer-Encoding", "chunked") + gctx.Writer.WriteHeader(http.StatusOK) + gctx.Writer.Write([]byte(`{"new": [`)) //No need to check for errors, the doc says it always returns nil + + // if the blocker just start, return all decisions + if val, ok := gctx.Request.URL.Query()["startup"]; ok && val[0] == "true" { + //Active decisions + + err := writeStartupDecisions(gctx, filters, c.DBClient.QueryAllDecisionsWithFilters) + + if err != nil { + log.Errorf("failed sending new decisions for startup: %v", err) + gctx.Writer.Write([]byte(`], "deleted": []}`)) + gctx.Writer.Flush() + return err + } + + gctx.Writer.Write([]byte(`], "deleted": [`)) + //Expired decisions + err = writeStartupDecisions(gctx, filters, c.DBClient.QueryExpiredDecisionsWithFilters) + if err != nil { + log.Errorf("failed sending expired decisions for startup: %v", err) + gctx.Writer.Write([]byte(`]}`)) + gctx.Writer.Flush() + return err + } + + gctx.Writer.Write([]byte(`]}`)) + gctx.Writer.Flush() + } else { + err = writeDeltaDecisions(gctx, filters, bouncerInfo.LastPull, c.DBClient.QueryNewDecisionsSinceWithFilters) + if err != nil { + log.Errorf("failed sending new decisions for delta: %v", err) + gctx.Writer.Write([]byte(`], "deleted": []}`)) + gctx.Writer.Flush() + return err + } + + gctx.Writer.Write([]byte(`], "deleted": [`)) + + err = writeDeltaDecisions(gctx, filters, bouncerInfo.LastPull, c.DBClient.QueryExpiredDecisionsSinceWithFilters) + + if err != nil { + log.Errorf("failed sending expired decisions for delta: %v", err) + gctx.Writer.Write([]byte(`]}`)) + gctx.Writer.Flush() + return err + } + + gctx.Writer.Write([]byte(`]}`)) + gctx.Writer.Flush() + } + return nil +} + +func (c *Controller) StreamDecisionNonChunked(gctx *gin.Context, bouncerInfo *ent.Bouncer, streamStartTime time.Time, filters map[string][]string) error { var data []*ent.Decision var err error ret := make(map[string][]*models.Decision, 0) ret["new"] = []*models.Decision{} ret["deleted"] = []*models.Decision{} - streamStartTime := time.Now().UTC() - bouncerInfo, err := getBouncerFromContext(gctx) - if err != nil { - gctx.JSON(http.StatusUnauthorized, gin.H{"message": "not allowed"}) - return - } - - filters := gctx.Request.URL.Query() - if _, ok := filters["scopes"]; !ok { - filters["scopes"] = []string{"ip,range"} - } - - dedup := true - if v, ok := filters["dedup"]; ok && v[0] == "false" { - dedup = false - } - - // if the blocker just start, return all decisions if val, ok := gctx.Request.URL.Query()["startup"]; ok { if val[0] == "true" { data, err = c.DBClient.QueryAllDecisionsWithFilters(filters) if err != nil { log.Errorf("failed querying decisions: %v", err) gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()}) - return + return err } //data = KeepLongestDecision(data) - ret["new"], err = FormatDecisions(data, dedup) - if err != nil { - log.Errorf("unable to format expired decision for '%s' : %v", bouncerInfo.Name, err) - gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()}) - return - } + ret["new"] = FormatDecisions(data) // getting expired decisions data, err = c.DBClient.QueryExpiredDecisionsWithFilters(filters) if err != nil { log.Errorf("unable to query expired decision for '%s' : %v", bouncerInfo.Name, err) gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()}) - return - } - ret["deleted"], err = FormatDecisions(data, dedup) - if err != nil { - log.Errorf("unable to format expired decision for '%s' : %v", bouncerInfo.Name, err) - gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()}) - return + return err } + ret["deleted"] = FormatDecisions(data) - if err := c.DBClient.UpdateBouncerLastPull(streamStartTime, bouncerInfo.ID); err != nil { - log.Errorf("unable to update bouncer '%s' pull: %v", bouncerInfo.Name, err) - gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()}) - return - } - if gctx.Request.Method == http.MethodHead { - gctx.String(http.StatusOK, "") - return - } gctx.JSON(http.StatusOK, ret) - return + return nil } } @@ -213,35 +321,55 @@ func (c *Controller) StreamDecision(gctx *gin.Context) { if err != nil { log.Errorf("unable to query new decision for '%s' : %v", bouncerInfo.Name, err) gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()}) - return + return err } //data = KeepLongestDecision(data) - ret["new"], err = FormatDecisions(data, dedup) - if err != nil { - log.Errorf("unable to format new decision for '%s' : %v", bouncerInfo.Name, err) - gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()}) - return - } + ret["new"] = FormatDecisions(data) // getting expired decisions data, err = c.DBClient.QueryExpiredDecisionsSinceWithFilters(bouncerInfo.LastPull.Add((-2 * time.Second)), filters) // do we want to give exactly lastPull time ? if err != nil { log.Errorf("unable to query expired decision for '%s' : %v", bouncerInfo.Name, err) gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()}) - return + return err } - ret["deleted"], err = FormatDecisions(data, dedup) - if err != nil { - log.Errorf("unable to format expired decision for '%s' : %v", bouncerInfo.Name, err) - gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()}) - return - } - - if err := c.DBClient.UpdateBouncerLastPull(streamStartTime, bouncerInfo.ID); err != nil { - log.Errorf("unable to update bouncer '%s' pull: %v", bouncerInfo.Name, err) - gctx.JSON(http.StatusInternalServerError, gin.H{"message": err.Error()}) - return - } - + ret["deleted"] = FormatDecisions(data) gctx.JSON(http.StatusOK, ret) + return nil +} + +func (c *Controller) StreamDecision(gctx *gin.Context) { + var err error + + streamStartTime := time.Now().UTC() + bouncerInfo, err := getBouncerFromContext(gctx) + if err != nil { + gctx.JSON(http.StatusUnauthorized, gin.H{"message": "not allowed"}) + return + } + + if gctx.Request.Method == http.MethodHead { + //For HEAD, just return as the bouncer won't get a body anyway, so no need to query the db + //We also don't update the last pull time, as it would mess with the delta sent on the next request (if done without startup=true) + gctx.String(http.StatusOK, "") + return + } + + filters := gctx.Request.URL.Query() + if _, ok := filters["scopes"]; !ok { + filters["scopes"] = []string{"ip,range"} + } + + if fflag.ChunkedDecisionsStream.IsEnabled() { + err = c.StreamDecisionChunked(gctx, bouncerInfo, streamStartTime, filters) + } else { + err = c.StreamDecisionNonChunked(gctx, bouncerInfo, streamStartTime, filters) + } + + if err == nil { + //Only update the last pull time if no error occurred when sending the decisions to avoid missing decisions + if err := c.DBClient.UpdateBouncerLastPull(streamStartTime, bouncerInfo.ID); err != nil { + log.Errorf("unable to update bouncer '%s' pull: %v", bouncerInfo.Name, err) + } + } } diff --git a/pkg/database/decisions.go b/pkg/database/decisions.go index 9445bb1ce..7ad1a1206 100644 --- a/pkg/database/decisions.go +++ b/pkg/database/decisions.go @@ -86,6 +86,24 @@ func BuildDecisionRequestWithFilter(query *ent.DecisionQuery, filter map[string] if err != nil { return nil, errors.Wrapf(InvalidIPOrRange, "unable to convert '%s' to int: %s", value[0], err) } + case "limit": + limit, err := strconv.Atoi(value[0]) + if err != nil { + return nil, errors.Wrapf(InvalidFilter, "invalid limit value : %s", err) + } + query = query.Limit(limit) + case "offset": + offset, err := strconv.Atoi(value[0]) + if err != nil { + return nil, errors.Wrapf(InvalidFilter, "invalid offset value : %s", err) + } + query = query.Offset(offset) + case "id_gt": + id, err := strconv.Atoi(value[0]) + if err != nil { + return nil, errors.Wrapf(InvalidFilter, "invalid id_gt value : %s", err) + } + query = query.Where(decision.IDGT(id)) } } query, err = applyStartIpEndIpFilter(query, contains, ip_sz, start_ip, start_sfx, end_ip, end_sfx) @@ -110,6 +128,8 @@ func (c *Client) QueryAllDecisionsWithFilters(filters map[string][]string) ([]*e return []*ent.Decision{}, errors.Wrap(QueryFail, "get all decisions with filters") } + query = query.Order(ent.Asc(decision.FieldID)) + data, err := query.All(c.CTX) if err != nil { c.Log.Warningf("QueryAllDecisionsWithFilters : %s", err) @@ -129,6 +149,8 @@ func (c *Client) QueryExpiredDecisionsWithFilters(filters map[string][]string) ( query, err := BuildDecisionRequestWithFilter(query, filters) + query = query.Order(ent.Asc(decision.FieldID)) + if err != nil { c.Log.Warningf("QueryExpiredDecisionsWithFilters : %s", err) return []*ent.Decision{}, errors.Wrap(QueryFail, "get expired decisions with filters") @@ -238,6 +260,8 @@ func (c *Client) QueryExpiredDecisionsSinceWithFilters(since time.Time, filters return []*ent.Decision{}, errors.Wrap(QueryFail, "expired decisions with filters") } + query = query.Order(ent.Asc(decision.FieldID)) + data, err := query.All(c.CTX) if err != nil { c.Log.Warningf("QueryExpiredDecisionsSinceWithFilters : %s", err) @@ -261,6 +285,9 @@ func (c *Client) QueryNewDecisionsSinceWithFilters(since time.Time, filters map[ c.Log.Warningf("QueryNewDecisionsSinceWithFilters : %s", err) return []*ent.Decision{}, errors.Wrapf(QueryFail, "new decisions since '%s'", since.String()) } + + query = query.Order(ent.Asc(decision.FieldID)) + data, err := query.All(c.CTX) if err != nil { c.Log.Warningf("QueryNewDecisionsSinceWithFilters : %s", err) diff --git a/pkg/fflag/crowdsec.go b/pkg/fflag/crowdsec.go index 16a50d33b..9f4db57b0 100644 --- a/pkg/fflag/crowdsec.go +++ b/pkg/fflag/crowdsec.go @@ -4,6 +4,7 @@ var Crowdsec = FeatureRegister{EnvPrefix: "CROWDSEC_FEATURE_"} var CscliSetup = &Feature{Name: "cscli_setup", Description: "Enable cscli setup command (service detection)"} var DisableHttpRetryBackoff = &Feature{Name: "disable_http_retry_backoff", Description: "Disable http retry backoff"} +var ChunkedDecisionsStream = &Feature{Name: "chunked_decisions_stream", Description: "Enable chunked decisions stream"} var PapiClient = &Feature{Name: "papi_client", Description: "Enable Polling API client"} func RegisterAllFeatures() error { @@ -15,6 +16,10 @@ func RegisterAllFeatures() error { if err != nil { return err } + err = Crowdsec.RegisterFeature(ChunkedDecisionsStream) + if err != nil { + return err + } err = Crowdsec.RegisterFeature(PapiClient) if err != nil { return err