This commit is contained in:
alteredCoder 2021-10-13 17:31:48 +02:00
parent 9acb0e7e7e
commit e41a334af4
6 changed files with 144 additions and 35 deletions

View file

@ -127,7 +127,8 @@ func (s *DecisionsService) DeleteOne(ctx context.Context, decision_id string) (*
return &deleteDecisionResponse, resp, nil return &deleteDecisionResponse, resp, nil
} }
func (s *DecisionsService) DeleteDecisions(ctx context.Context, decisionsID []string) (*SuccessReponse, *Response, error) { // send to CAPI manually deleted decisions
func (s *DecisionsService) DeleteManualDecisions(ctx context.Context, decisionsID []string) (*SuccessReponse, *Response, error) {
var successReponse SuccessReponse var successReponse SuccessReponse
u := fmt.Sprintf("%s/decisions/delete", s.client.URLPrefix) u := fmt.Sprintf("%s/decisions/delete", s.client.URLPrefix)
@ -135,7 +136,6 @@ func (s *DecisionsService) DeleteDecisions(ctx context.Context, decisionsID []st
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
resp, err := s.client.Do(ctx, req, &successReponse) resp, err := s.client.Do(ctx, req, &successReponse)
if err != nil { if err != nil {
return nil, resp, err return nil, resp, err

View file

@ -31,21 +31,22 @@ const (
) )
type apic struct { type apic struct {
pullInterval time.Duration pullInterval time.Duration
pushInterval time.Duration pushInterval time.Duration
metricsInterval time.Duration metricsInterval time.Duration
dbClient *database.Client dbClient *database.Client
apiClient *apiclient.ApiClient apiClient *apiclient.ApiClient
alertToPush chan []*models.Alert alertToPush chan []*models.Alert
mu sync.Mutex mu sync.Mutex
pushTomb tomb.Tomb pushTomb tomb.Tomb
pullTomb tomb.Tomb pullTomb tomb.Tomb
metricsTomb tomb.Tomb metricsTomb tomb.Tomb
startup bool deleteDecisionsTomb tomb.Tomb
credentials *csconfig.ApiCredentialsCfg startup bool
scenarioList []string credentials *csconfig.ApiCredentialsCfg
consoleConfig *csconfig.ConsoleConfig scenarioList []string
decisionsToDelete chan models.Decision consoleConfig *csconfig.ConsoleConfig
decisionsToDelete chan []string
} }
func IsInSlice(a string, b []string) bool { func IsInSlice(a string, b []string) bool {
@ -109,7 +110,7 @@ func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, con
pushTomb: tomb.Tomb{}, pushTomb: tomb.Tomb{},
metricsTomb: tomb.Tomb{}, metricsTomb: tomb.Tomb{},
scenarioList: make([]string, 0), scenarioList: make([]string, 0),
decisionsToDelete: make(chan models.Decision), decisionsToDelete: make(chan []string),
consoleConfig: consoleConfig, consoleConfig: consoleConfig,
} }
@ -159,6 +160,7 @@ func (a *apic) Push() error {
case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others? case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others?
a.pullTomb.Kill(nil) a.pullTomb.Kill(nil)
a.metricsTomb.Kill(nil) a.metricsTomb.Kill(nil)
a.deleteDecisionsTomb.Kill(nil)
log.Infof("push tomb is dying, sending cache (%d elements) before exiting", len(cache)) log.Infof("push tomb is dying, sending cache (%d elements) before exiting", len(cache))
if len(cache) == 0 { if len(cache) == 0 {
return nil return nil
@ -221,6 +223,57 @@ func (a *apic) Push() error {
} }
} }
func (a *apic) DeleteDecisions() error {
defer types.CatchPanic("lapi/deleteDecisionsCAPI")
log.Infof("start crowdsec api push (interval: %s)", PushInterval)
for {
select {
case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others?
a.pullTomb.Kill(nil)
a.metricsTomb.Kill(nil)
a.pushTomb.Kill(nil)
return nil
case decisions := <-a.decisionsToDelete:
go a.SendDeletedDecisions(decisions)
}
}
}
func (a *apic) SendDeletedDecisions(deletedDecisions []string) {
var send []string
bulkSize := 50
pageStart := 0
pageEnd := bulkSize
for {
if pageEnd >= len(deletedDecisions) {
send = deletedDecisions[pageStart:]
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, _, err := a.apiClient.Decisions.DeleteManualDecisions(ctx, send)
if err != nil {
log.Errorf("Error while sending final chunk to central API : %s", err)
return
}
break
}
send = deletedDecisions[pageStart:pageEnd]
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, _, err := a.apiClient.Decisions.DeleteManualDecisions(ctx, send)
if err != nil {
//we log it here as well, because the return value of func might be discarded
log.Errorf("Error while sending chunk to central API : %s", err)
}
pageStart += bulkSize
pageEnd += bulkSize
}
}
func (a *apic) Send(cacheOrig *models.AddSignalsRequest) { func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
/*we do have a problem with this : /*we do have a problem with this :
The apic.Push background routine reads from alertToPush chan. The apic.Push background routine reads from alertToPush chan.
@ -393,6 +446,7 @@ func (a *apic) Pull() error {
case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others? case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others?
a.metricsTomb.Kill(nil) a.metricsTomb.Kill(nil)
a.pushTomb.Kill(nil) a.pushTomb.Kill(nil)
a.deleteDecisionsTomb.Kill(nil)
return nil return nil
} }
} }
@ -447,6 +501,7 @@ func (a *apic) SendMetrics() error {
case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others? case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others?
a.pullTomb.Kill(nil) a.pullTomb.Kill(nil)
a.pushTomb.Kill(nil) a.pushTomb.Kill(nil)
a.deleteDecisionsTomb.Kill(nil)
return nil return nil
} }
} }

View file

@ -38,6 +38,7 @@ type APIServer struct {
httpServer *http.Server httpServer *http.Server
apic *apic apic *apic
httpServerTomb tomb.Tomb httpServerTomb tomb.Tomb
consoleConfig *csconfig.ConsoleConfig
} }
// RecoveryWithWriter returns a middleware for a given writer that recovers from any panics and writes a 500 if there was one. // RecoveryWithWriter returns a middleware for a given writer that recovers from any panics and writes a 500 if there was one.
@ -165,6 +166,7 @@ func NewServer(config *csconfig.LocalApiServerCfg) (*APIServer, error) {
return return
}) })
router.Use(CustomRecoveryWithWriter()) router.Use(CustomRecoveryWithWriter())
controller := &controllers.Controller{ controller := &controllers.Controller{
DBClient: dbClient, DBClient: dbClient,
Ectx: context.Background(), Ectx: context.Background(),
@ -183,9 +185,15 @@ func NewServer(config *csconfig.LocalApiServerCfg) (*APIServer, error) {
return &APIServer{}, err return &APIServer{}, err
} }
controller.CAPIChan = apiClient.alertToPush controller.CAPIChan = apiClient.alertToPush
if *config.ConsoleConfig.ShareDecisions {
controller.DeleteDecisionChannel = apiClient.decisionsToDelete
} else {
controller.DeleteDecisionChannel = nil
}
} else { } else {
apiClient = nil apiClient = nil
controller.CAPIChan = nil controller.CAPIChan = nil
controller.DeleteDecisionChannel = nil
} }
return &APIServer{ return &APIServer{
@ -198,6 +206,7 @@ func NewServer(config *csconfig.LocalApiServerCfg) (*APIServer, error) {
router: router, router: router,
apic: apiClient, apic: apiClient,
httpServerTomb: tomb.Tomb{}, httpServerTomb: tomb.Tomb{},
consoleConfig: config.ConsoleConfig,
}, nil }, nil
} }
@ -236,6 +245,15 @@ func (s *APIServer) Run() error {
} }
return nil return nil
}) })
if *s.apic.consoleConfig.ShareDecisions {
s.apic.deleteDecisionsTomb.Go(func() error {
if err := s.apic.DeleteDecisions(); err != nil {
log.Errorf("capi send deleted decisions: %s", err)
return err
}
return nil
})
}
} }
s.httpServerTomb.Go(func() error { s.httpServerTomb.Go(func() error {

View file

@ -23,7 +23,7 @@ type Controller struct {
PluginChannel chan csplugin.ProfileAlert PluginChannel chan csplugin.ProfileAlert
Log *log.Logger Log *log.Logger
ConsoleConfig *csconfig.ConsoleConfig ConsoleConfig *csconfig.ConsoleConfig
DeleteDecisionChannel chan models.Decision DeleteDecisionChannel chan []string
} }
func (c *Controller) Init() error { func (c *Controller) Init() error {
@ -54,7 +54,7 @@ func serveHealth() http.HandlerFunc {
} }
func (c *Controller) NewV1() error { func (c *Controller) NewV1() error {
handlerV1, err := v1.New(c.DBClient, c.Ectx, c.Profiles, c.CAPIChan, c.PluginChannel) handlerV1, err := v1.New(c.DBClient, c.Ectx, c.Profiles, c.CAPIChan, c.PluginChannel, *c.ConsoleConfig, c.DeleteDecisionChannel)
if err != nil { if err != nil {
return err return err
} }

View file

@ -11,25 +11,28 @@ import (
) )
type Controller struct { type Controller struct {
Ectx context.Context Ectx context.Context
DBClient *database.Client DBClient *database.Client
APIKeyHeader string APIKeyHeader string
Middlewares *middlewares.Middlewares Middlewares *middlewares.Middlewares
Profiles []*csconfig.ProfileCfg Profiles []*csconfig.ProfileCfg
CAPIChan chan []*models.Alert CAPIChan chan []*models.Alert
PluginChannel chan csplugin.ProfileAlert PluginChannel chan csplugin.ProfileAlert
ConsoleConfig map[string]interface{} DeleteDecisionsChannel chan []string
ConsoleConfig csconfig.ConsoleConfig
} }
func New(dbClient *database.Client, ctx context.Context, profiles []*csconfig.ProfileCfg, capiChan chan []*models.Alert, pluginChannel chan csplugin.ProfileAlert) (*Controller, error) { func New(dbClient *database.Client, ctx context.Context, profiles []*csconfig.ProfileCfg, capiChan chan []*models.Alert, pluginChannel chan csplugin.ProfileAlert, consoleConfig csconfig.ConsoleConfig, deleteDecisionsChannel chan []string) (*Controller, error) {
var err error var err error
v1 := &Controller{ v1 := &Controller{
Ectx: ctx, Ectx: ctx,
DBClient: dbClient, DBClient: dbClient,
APIKeyHeader: middlewares.APIKeyHeader, APIKeyHeader: middlewares.APIKeyHeader,
Profiles: profiles, Profiles: profiles,
CAPIChan: capiChan, CAPIChan: capiChan,
PluginChannel: pluginChannel, PluginChannel: pluginChannel,
DeleteDecisionsChannel: deleteDecisionsChannel,
ConsoleConfig: consoleConfig,
} }
v1.Middlewares, err = middlewares.NewMiddlewares(dbClient) v1.Middlewares, err = middlewares.NewMiddlewares(dbClient)
if err != nil { if err != nil {

View file

@ -83,12 +83,34 @@ func (c *Controller) DeleteDecisionById(gctx *gin.Context) {
NbDeleted: "1", NbDeleted: "1",
} }
if *c.ConsoleConfig.ShareDecisions {
if c.DeleteDecisionsChannel != nil {
select {
case c.DeleteDecisionsChannel <- []string{decisionIDStr}:
log.Debug("alert sent to delete decisions channel")
default:
log.Warning("Cannot send alert to delete decisions channel")
}
}
}
gctx.JSON(http.StatusOK, deleteDecisionResp) gctx.JSON(http.StatusOK, deleteDecisionResp)
return return
} }
func (c *Controller) DeleteDecisions(gctx *gin.Context) { func (c *Controller) DeleteDecisions(gctx *gin.Context) {
var err error var err error
decisionsIDToDelete := make([]string, 0)
if *c.ConsoleConfig.ShareDecisions {
decisionsToDelete, err := c.DBClient.QueryDecisionWithFilter(gctx.Request.URL.Query())
if err != nil {
log.Errorf("unable to list decisions to delete to send to console: %s", err)
}
for _, decision := range decisionsToDelete {
decisionsIDToDelete = append(decisionsIDToDelete, strconv.Itoa(decision.ID))
}
}
nbDeleted, err := c.DBClient.SoftDeleteDecisionsWithFilter(gctx.Request.URL.Query()) nbDeleted, err := c.DBClient.SoftDeleteDecisionsWithFilter(gctx.Request.URL.Query())
if err != nil { if err != nil {
@ -99,6 +121,17 @@ func (c *Controller) DeleteDecisions(gctx *gin.Context) {
NbDeleted: nbDeleted, NbDeleted: nbDeleted,
} }
if len(decisionsIDToDelete) > 0 {
if c.DeleteDecisionsChannel != nil {
select {
case c.DeleteDecisionsChannel <- decisionsIDToDelete:
log.Debug("alert sent to delete decisions channel")
default:
log.Warning("Cannot send alert to delete decisions channel")
}
}
}
gctx.JSON(http.StatusOK, deleteDecisionResp) gctx.JSON(http.StatusOK, deleteDecisionResp)
return return
} }