From e41a334af489876291f9b2fa2fe38a74fef37ef7 Mon Sep 17 00:00:00 2001 From: alteredCoder Date: Wed, 13 Oct 2021 17:31:48 +0200 Subject: [PATCH] update --- pkg/apiclient/decisions_service.go | 4 +- pkg/apiserver/apic.go | 87 ++++++++++++++++++---- pkg/apiserver/apiserver.go | 18 +++++ pkg/apiserver/controllers/controller.go | 4 +- pkg/apiserver/controllers/v1/controller.go | 33 ++++---- pkg/apiserver/controllers/v1/decisions.go | 33 ++++++++ 6 files changed, 144 insertions(+), 35 deletions(-) diff --git a/pkg/apiclient/decisions_service.go b/pkg/apiclient/decisions_service.go index d32d5e168..3216a1074 100644 --- a/pkg/apiclient/decisions_service.go +++ b/pkg/apiclient/decisions_service.go @@ -127,7 +127,8 @@ func (s *DecisionsService) DeleteOne(ctx context.Context, decision_id string) (* return &deleteDecisionResponse, resp, nil } -func (s *DecisionsService) DeleteDecisions(ctx context.Context, decisionsID []string) (*SuccessReponse, *Response, error) { +// send to CAPI manually deleted decisions +func (s *DecisionsService) DeleteManualDecisions(ctx context.Context, decisionsID []string) (*SuccessReponse, *Response, error) { var successReponse SuccessReponse u := fmt.Sprintf("%s/decisions/delete", s.client.URLPrefix) @@ -135,7 +136,6 @@ func (s *DecisionsService) DeleteDecisions(ctx context.Context, decisionsID []st if err != nil { return nil, nil, err } - resp, err := s.client.Do(ctx, req, &successReponse) if err != nil { return nil, resp, err diff --git a/pkg/apiserver/apic.go b/pkg/apiserver/apic.go index 361ee271d..814a1fcb2 100644 --- a/pkg/apiserver/apic.go +++ b/pkg/apiserver/apic.go @@ -31,21 +31,22 @@ const ( ) type apic struct { - pullInterval time.Duration - pushInterval time.Duration - metricsInterval time.Duration - dbClient *database.Client - apiClient *apiclient.ApiClient - alertToPush chan []*models.Alert - mu sync.Mutex - pushTomb tomb.Tomb - pullTomb tomb.Tomb - metricsTomb tomb.Tomb - startup bool - credentials *csconfig.ApiCredentialsCfg - scenarioList []string - consoleConfig *csconfig.ConsoleConfig - decisionsToDelete chan models.Decision + pullInterval time.Duration + pushInterval time.Duration + metricsInterval time.Duration + dbClient *database.Client + apiClient *apiclient.ApiClient + alertToPush chan []*models.Alert + mu sync.Mutex + pushTomb tomb.Tomb + pullTomb tomb.Tomb + metricsTomb tomb.Tomb + deleteDecisionsTomb tomb.Tomb + startup bool + credentials *csconfig.ApiCredentialsCfg + scenarioList []string + consoleConfig *csconfig.ConsoleConfig + decisionsToDelete chan []string } func IsInSlice(a string, b []string) bool { @@ -109,7 +110,7 @@ func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, con pushTomb: tomb.Tomb{}, metricsTomb: tomb.Tomb{}, scenarioList: make([]string, 0), - decisionsToDelete: make(chan models.Decision), + decisionsToDelete: make(chan []string), consoleConfig: consoleConfig, } @@ -159,6 +160,7 @@ func (a *apic) Push() error { case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others? a.pullTomb.Kill(nil) a.metricsTomb.Kill(nil) + a.deleteDecisionsTomb.Kill(nil) log.Infof("push tomb is dying, sending cache (%d elements) before exiting", len(cache)) if len(cache) == 0 { return nil @@ -221,6 +223,57 @@ func (a *apic) Push() error { } } +func (a *apic) DeleteDecisions() error { + defer types.CatchPanic("lapi/deleteDecisionsCAPI") + + log.Infof("start crowdsec api push (interval: %s)", PushInterval) + + for { + select { + case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others? + a.pullTomb.Kill(nil) + a.metricsTomb.Kill(nil) + a.pushTomb.Kill(nil) + return nil + case decisions := <-a.decisionsToDelete: + go a.SendDeletedDecisions(decisions) + } + } +} + +func (a *apic) SendDeletedDecisions(deletedDecisions []string) { + var send []string + + bulkSize := 50 + pageStart := 0 + pageEnd := bulkSize + + for { + + if pageEnd >= len(deletedDecisions) { + send = deletedDecisions[pageStart:] + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, _, err := a.apiClient.Decisions.DeleteManualDecisions(ctx, send) + if err != nil { + log.Errorf("Error while sending final chunk to central API : %s", err) + return + } + break + } + send = deletedDecisions[pageStart:pageEnd] + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, _, err := a.apiClient.Decisions.DeleteManualDecisions(ctx, send) + if err != nil { + //we log it here as well, because the return value of func might be discarded + log.Errorf("Error while sending chunk to central API : %s", err) + } + pageStart += bulkSize + pageEnd += bulkSize + } +} + func (a *apic) Send(cacheOrig *models.AddSignalsRequest) { /*we do have a problem with this : The apic.Push background routine reads from alertToPush chan. @@ -393,6 +446,7 @@ func (a *apic) Pull() error { case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others? a.metricsTomb.Kill(nil) a.pushTomb.Kill(nil) + a.deleteDecisionsTomb.Kill(nil) return nil } } @@ -447,6 +501,7 @@ func (a *apic) SendMetrics() error { case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others? a.pullTomb.Kill(nil) a.pushTomb.Kill(nil) + a.deleteDecisionsTomb.Kill(nil) return nil } } diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 9d183e117..cb6ed546e 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -38,6 +38,7 @@ type APIServer struct { httpServer *http.Server apic *apic httpServerTomb tomb.Tomb + consoleConfig *csconfig.ConsoleConfig } // RecoveryWithWriter returns a middleware for a given writer that recovers from any panics and writes a 500 if there was one. @@ -165,6 +166,7 @@ func NewServer(config *csconfig.LocalApiServerCfg) (*APIServer, error) { return }) router.Use(CustomRecoveryWithWriter()) + controller := &controllers.Controller{ DBClient: dbClient, Ectx: context.Background(), @@ -183,9 +185,15 @@ func NewServer(config *csconfig.LocalApiServerCfg) (*APIServer, error) { return &APIServer{}, err } controller.CAPIChan = apiClient.alertToPush + if *config.ConsoleConfig.ShareDecisions { + controller.DeleteDecisionChannel = apiClient.decisionsToDelete + } else { + controller.DeleteDecisionChannel = nil + } } else { apiClient = nil controller.CAPIChan = nil + controller.DeleteDecisionChannel = nil } return &APIServer{ @@ -198,6 +206,7 @@ func NewServer(config *csconfig.LocalApiServerCfg) (*APIServer, error) { router: router, apic: apiClient, httpServerTomb: tomb.Tomb{}, + consoleConfig: config.ConsoleConfig, }, nil } @@ -236,6 +245,15 @@ func (s *APIServer) Run() error { } return nil }) + if *s.apic.consoleConfig.ShareDecisions { + s.apic.deleteDecisionsTomb.Go(func() error { + if err := s.apic.DeleteDecisions(); err != nil { + log.Errorf("capi send deleted decisions: %s", err) + return err + } + return nil + }) + } } s.httpServerTomb.Go(func() error { diff --git a/pkg/apiserver/controllers/controller.go b/pkg/apiserver/controllers/controller.go index 638019ec8..f875b91df 100644 --- a/pkg/apiserver/controllers/controller.go +++ b/pkg/apiserver/controllers/controller.go @@ -23,7 +23,7 @@ type Controller struct { PluginChannel chan csplugin.ProfileAlert Log *log.Logger ConsoleConfig *csconfig.ConsoleConfig - DeleteDecisionChannel chan models.Decision + DeleteDecisionChannel chan []string } func (c *Controller) Init() error { @@ -54,7 +54,7 @@ func serveHealth() http.HandlerFunc { } func (c *Controller) NewV1() error { - handlerV1, err := v1.New(c.DBClient, c.Ectx, c.Profiles, c.CAPIChan, c.PluginChannel) + handlerV1, err := v1.New(c.DBClient, c.Ectx, c.Profiles, c.CAPIChan, c.PluginChannel, *c.ConsoleConfig, c.DeleteDecisionChannel) if err != nil { return err } diff --git a/pkg/apiserver/controllers/v1/controller.go b/pkg/apiserver/controllers/v1/controller.go index 5533f95a0..783a7dc12 100644 --- a/pkg/apiserver/controllers/v1/controller.go +++ b/pkg/apiserver/controllers/v1/controller.go @@ -11,25 +11,28 @@ import ( ) type Controller struct { - Ectx context.Context - DBClient *database.Client - APIKeyHeader string - Middlewares *middlewares.Middlewares - Profiles []*csconfig.ProfileCfg - CAPIChan chan []*models.Alert - PluginChannel chan csplugin.ProfileAlert - ConsoleConfig map[string]interface{} + Ectx context.Context + DBClient *database.Client + APIKeyHeader string + Middlewares *middlewares.Middlewares + Profiles []*csconfig.ProfileCfg + CAPIChan chan []*models.Alert + PluginChannel chan csplugin.ProfileAlert + DeleteDecisionsChannel chan []string + ConsoleConfig csconfig.ConsoleConfig } -func New(dbClient *database.Client, ctx context.Context, profiles []*csconfig.ProfileCfg, capiChan chan []*models.Alert, pluginChannel chan csplugin.ProfileAlert) (*Controller, error) { +func New(dbClient *database.Client, ctx context.Context, profiles []*csconfig.ProfileCfg, capiChan chan []*models.Alert, pluginChannel chan csplugin.ProfileAlert, consoleConfig csconfig.ConsoleConfig, deleteDecisionsChannel chan []string) (*Controller, error) { var err error v1 := &Controller{ - Ectx: ctx, - DBClient: dbClient, - APIKeyHeader: middlewares.APIKeyHeader, - Profiles: profiles, - CAPIChan: capiChan, - PluginChannel: pluginChannel, + Ectx: ctx, + DBClient: dbClient, + APIKeyHeader: middlewares.APIKeyHeader, + Profiles: profiles, + CAPIChan: capiChan, + PluginChannel: pluginChannel, + DeleteDecisionsChannel: deleteDecisionsChannel, + ConsoleConfig: consoleConfig, } v1.Middlewares, err = middlewares.NewMiddlewares(dbClient) if err != nil { diff --git a/pkg/apiserver/controllers/v1/decisions.go b/pkg/apiserver/controllers/v1/decisions.go index 17256f7bf..8caac4207 100644 --- a/pkg/apiserver/controllers/v1/decisions.go +++ b/pkg/apiserver/controllers/v1/decisions.go @@ -83,12 +83,34 @@ func (c *Controller) DeleteDecisionById(gctx *gin.Context) { NbDeleted: "1", } + if *c.ConsoleConfig.ShareDecisions { + if c.DeleteDecisionsChannel != nil { + select { + case c.DeleteDecisionsChannel <- []string{decisionIDStr}: + log.Debug("alert sent to delete decisions channel") + default: + log.Warning("Cannot send alert to delete decisions channel") + } + } + } + gctx.JSON(http.StatusOK, deleteDecisionResp) return } func (c *Controller) DeleteDecisions(gctx *gin.Context) { var err error + decisionsIDToDelete := make([]string, 0) + + if *c.ConsoleConfig.ShareDecisions { + decisionsToDelete, err := c.DBClient.QueryDecisionWithFilter(gctx.Request.URL.Query()) + if err != nil { + log.Errorf("unable to list decisions to delete to send to console: %s", err) + } + for _, decision := range decisionsToDelete { + decisionsIDToDelete = append(decisionsIDToDelete, strconv.Itoa(decision.ID)) + } + } nbDeleted, err := c.DBClient.SoftDeleteDecisionsWithFilter(gctx.Request.URL.Query()) if err != nil { @@ -99,6 +121,17 @@ func (c *Controller) DeleteDecisions(gctx *gin.Context) { NbDeleted: nbDeleted, } + if len(decisionsIDToDelete) > 0 { + if c.DeleteDecisionsChannel != nil { + select { + case c.DeleteDecisionsChannel <- decisionsIDToDelete: + log.Debug("alert sent to delete decisions channel") + default: + log.Warning("Cannot send alert to delete decisions channel") + } + } + } + gctx.JSON(http.StatusOK, deleteDecisionResp) return }