2020-11-30 09:37:17 +00:00
package apiserver
import (
"context"
"fmt"
2022-10-28 11:55:59 +00:00
"math/rand"
2023-02-06 13:06:14 +00:00
"net/http"
2020-11-30 09:37:17 +00:00
"net/url"
2021-08-25 09:45:29 +00:00
"strconv"
2020-11-30 09:37:17 +00:00
"strings"
"sync"
"time"
2022-10-28 11:55:59 +00:00
"github.com/go-openapi/strfmt"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
2020-11-30 09:37:17 +00:00
"github.com/crowdsecurity/crowdsec/pkg/apiclient"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
"github.com/crowdsecurity/crowdsec/pkg/cwversion"
"github.com/crowdsecurity/crowdsec/pkg/database"
2021-08-25 09:45:29 +00:00
"github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
"github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
2020-11-30 09:37:17 +00:00
"github.com/crowdsecurity/crowdsec/pkg/models"
2023-02-06 13:06:14 +00:00
"github.com/crowdsecurity/crowdsec/pkg/modelscapi"
2020-11-30 09:37:17 +00:00
"github.com/crowdsecurity/crowdsec/pkg/types"
)
2022-03-29 12:20:26 +00:00
var (
2022-10-28 11:55:59 +00:00
pullIntervalDefault = time . Hour * 2
pullIntervalDelta = 5 * time . Minute
2023-01-31 13:47:44 +00:00
pushIntervalDefault = time . Second * 10
pushIntervalDelta = time . Second * 7
2022-10-28 11:55:59 +00:00
metricsIntervalDefault = time . Minute * 30
metricsIntervalDelta = time . Minute * 15
2020-11-30 09:37:17 +00:00
)
2023-01-31 13:47:44 +00:00
var SCOPE_CAPI_ALIAS_ALIAS string = "crowdsecurity/community-blocklist" //we don't use "CAPI" directly, to make it less confusing for the user
2022-03-29 12:20:26 +00:00
2020-11-30 09:37:17 +00:00
type apic struct {
2022-10-28 11:55:59 +00:00
// when changing the intervals in tests, always set *First too
// or they can be negative
pullInterval time . Duration
pullIntervalFirst time . Duration
pushInterval time . Duration
pushIntervalFirst time . Duration
metricsInterval time . Duration
metricsIntervalFirst time . Duration
dbClient * database . Client
apiClient * apiclient . ApiClient
2023-01-31 13:47:44 +00:00
AlertsAddChan chan [ ] * models . Alert
mu sync . Mutex
pushTomb tomb . Tomb
pullTomb tomb . Tomb
metricsTomb tomb . Tomb
startup bool
credentials * csconfig . ApiCredentialsCfg
scenarioList [ ] string
consoleConfig * csconfig . ConsoleConfig
2022-10-28 11:55:59 +00:00
}
// randomDuration returns a duration value between d-delta and d+delta
func randomDuration ( d time . Duration , delta time . Duration ) time . Duration {
return time . Duration ( float64 ( d ) + float64 ( delta ) * ( - 1.0 + 2.0 * rand . Float64 ( ) ) )
2020-11-30 09:37:17 +00:00
}
func ( a * apic ) FetchScenariosListFromDB ( ) ( [ ] string , error ) {
scenarios := make ( [ ] string , 0 )
machines , err := a . dbClient . ListMachines ( )
if err != nil {
return nil , errors . Wrap ( err , "while listing machines" )
}
//merge all scenarios together
for _ , v := range machines {
machineScenarios := strings . Split ( v . Scenarios , "," )
log . Debugf ( "%d scenarios for machine %d" , len ( machineScenarios ) , v . ID )
for _ , sv := range machineScenarios {
2022-03-29 12:20:26 +00:00
if ! types . InSlice ( sv , scenarios ) && sv != "" {
2020-11-30 09:37:17 +00:00
scenarios = append ( scenarios , sv )
}
}
}
log . Debugf ( "Returning list of scenarios : %+v" , scenarios )
return scenarios , nil
}
2023-01-31 13:47:44 +00:00
func decisionsToApiDecisions ( decisions [ ] * models . Decision ) models . AddSignalsRequestItemDecisions {
apiDecisions := models . AddSignalsRequestItemDecisions { }
for _ , decision := range decisions {
x := & models . AddSignalsRequestItemDecisionsItem {
Duration : types . StrPtr ( * decision . Duration ) ,
ID : new ( int64 ) ,
Origin : types . StrPtr ( * decision . Origin ) ,
Scenario : types . StrPtr ( * decision . Scenario ) ,
Scope : types . StrPtr ( * decision . Scope ) ,
//Simulated: *decision.Simulated,
Type : types . StrPtr ( * decision . Type ) ,
Until : decision . Until ,
Value : types . StrPtr ( * decision . Value ) ,
UUID : decision . UUID ,
}
* x . ID = decision . ID
if decision . Simulated != nil {
x . Simulated = * decision . Simulated
}
apiDecisions = append ( apiDecisions , x )
}
return apiDecisions
}
2023-01-04 15:50:02 +00:00
func alertToSignal ( alert * models . Alert , scenarioTrust string , shareContext bool ) * models . AddSignalsRequestItem {
signal := & models . AddSignalsRequestItem {
2020-11-30 09:37:17 +00:00
Message : alert . Message ,
Scenario : alert . Scenario ,
ScenarioHash : alert . ScenarioHash ,
ScenarioVersion : alert . ScenarioVersion ,
2023-01-31 13:47:44 +00:00
Source : & models . AddSignalsRequestItemSource {
AsName : alert . Source . AsName ,
AsNumber : alert . Source . AsNumber ,
Cn : alert . Source . Cn ,
IP : alert . Source . IP ,
Latitude : alert . Source . Latitude ,
Longitude : alert . Source . Longitude ,
Range : alert . Source . Range ,
Scope : alert . Source . Scope ,
Value : alert . Source . Value ,
} ,
StartAt : alert . StartAt ,
StopAt : alert . StopAt ,
CreatedAt : alert . CreatedAt ,
MachineID : alert . MachineID ,
ScenarioTrust : scenarioTrust ,
Decisions : decisionsToApiDecisions ( alert . Decisions ) ,
UUID : alert . UUID ,
2020-11-30 09:37:17 +00:00
}
2023-01-04 15:50:02 +00:00
if shareContext {
signal . Context = make ( [ ] * models . AddSignalsRequestItemContextItems0 , 0 )
for _ , meta := range alert . Meta {
contextItem := models . AddSignalsRequestItemContextItems0 {
Key : meta . Key ,
Value : meta . Value ,
}
signal . Context = append ( signal . Context , & contextItem )
}
}
return signal
2020-11-30 09:37:17 +00:00
}
2022-01-13 15:46:16 +00:00
func NewAPIC ( config * csconfig . OnlineApiClientCfg , dbClient * database . Client , consoleConfig * csconfig . ConsoleConfig ) ( * apic , error ) {
2020-11-30 09:37:17 +00:00
var err error
ret := & apic {
2023-01-31 13:47:44 +00:00
AlertsAddChan : make ( chan [ ] * models . Alert ) ,
2022-10-28 11:55:59 +00:00
dbClient : dbClient ,
mu : sync . Mutex { } ,
startup : true ,
credentials : config . Credentials ,
pullTomb : tomb . Tomb { } ,
pushTomb : tomb . Tomb { } ,
metricsTomb : tomb . Tomb { } ,
scenarioList : make ( [ ] string , 0 ) ,
consoleConfig : consoleConfig ,
pullInterval : pullIntervalDefault ,
pullIntervalFirst : randomDuration ( pullIntervalDefault , pullIntervalDelta ) ,
pushInterval : pushIntervalDefault ,
pushIntervalFirst : randomDuration ( pushIntervalDefault , pushIntervalDelta ) ,
metricsInterval : metricsIntervalDefault ,
metricsIntervalFirst : randomDuration ( metricsIntervalDefault , metricsIntervalDelta ) ,
2020-11-30 09:37:17 +00:00
}
password := strfmt . Password ( config . Credentials . Password )
apiURL , err := url . Parse ( config . Credentials . URL )
if err != nil {
return nil , errors . Wrapf ( err , "while parsing '%s'" , config . Credentials . URL )
}
2023-01-31 13:47:44 +00:00
papiURL , err := url . Parse ( config . Credentials . PapiURL )
if err != nil {
return nil , errors . Wrapf ( err , "while parsing '%s'" , config . Credentials . PapiURL )
}
2020-11-30 09:37:17 +00:00
ret . scenarioList , err = ret . FetchScenariosListFromDB ( )
if err != nil {
return nil , errors . Wrap ( err , "while fetching scenarios from db" )
}
ret . apiClient , err = apiclient . NewClient ( & apiclient . Config {
MachineID : config . Credentials . Login ,
Password : password ,
UserAgent : fmt . Sprintf ( "crowdsec/%s" , cwversion . VersionStr ( ) ) ,
URL : apiURL ,
2023-01-31 13:47:44 +00:00
PapiURL : papiURL ,
2023-02-06 13:06:14 +00:00
VersionPrefix : "v3" ,
2020-11-30 09:37:17 +00:00
Scenarios : ret . scenarioList ,
UpdateScenario : ret . FetchScenariosListFromDB ,
} )
2023-01-31 13:47:44 +00:00
if err != nil {
return nil , errors . Wrap ( err , "while creating api client" )
}
scenarios , err := ret . FetchScenariosListFromDB ( )
if err != nil {
return ret , errors . Wrapf ( err , "get scenario in db: %s" , err )
}
if _ , err = ret . apiClient . Auth . AuthenticateWatcher ( context . Background ( ) , models . WatcherAuthRequest {
MachineID : & config . Credentials . Login ,
Password : & password ,
Scenarios : scenarios ,
} ) ; err != nil {
return ret , errors . Wrapf ( err , "authenticate watcher (%s)" , config . Credentials . Login )
}
2021-04-07 12:51:00 +00:00
return ret , err
2020-11-30 09:37:17 +00:00
}
2022-03-29 12:20:26 +00:00
// keep track of all alerts in cache and push it to CAPI every PushInterval.
2020-11-30 09:37:17 +00:00
func ( a * apic ) Push ( ) error {
defer types . CatchPanic ( "lapi/pushToAPIC" )
var cache models . AddSignalsRequest
2022-10-28 11:55:59 +00:00
ticker := time . NewTicker ( a . pushIntervalFirst )
log . Infof ( "Start push to CrowdSec Central API (interval: %s once, then %s)" , a . pushIntervalFirst . Round ( time . Second ) , a . pushInterval )
2020-11-30 09:37:17 +00:00
for {
select {
case <- a . pushTomb . Dying ( ) : // if one apic routine is dying, do we kill the others?
a . pullTomb . Kill ( nil )
a . metricsTomb . Kill ( nil )
log . Infof ( "push tomb is dying, sending cache (%d elements) before exiting" , len ( cache ) )
if len ( cache ) == 0 {
return nil
}
2020-11-30 16:46:02 +00:00
go a . Send ( & cache )
return nil
2020-11-30 09:37:17 +00:00
case <- ticker . C :
2022-10-28 11:55:59 +00:00
ticker . Reset ( a . pushInterval )
2020-11-30 09:37:17 +00:00
if len ( cache ) > 0 {
a . mu . Lock ( )
cacheCopy := cache
cache = make ( models . AddSignalsRequest , 0 )
a . mu . Unlock ( )
log . Infof ( "Signal push: %d signals to push" , len ( cacheCopy ) )
2020-11-30 16:46:02 +00:00
go a . Send ( & cacheCopy )
2020-11-30 09:37:17 +00:00
}
2023-01-31 13:47:44 +00:00
case alerts := <- a . AlertsAddChan :
2020-11-30 09:37:17 +00:00
var signals [ ] * models . AddSignalsRequestItem
for _ , alert := range alerts {
2022-03-29 12:20:26 +00:00
if ok := shouldShareAlert ( alert , a . consoleConfig ) ; ok {
2023-01-04 15:50:02 +00:00
signals = append ( signals , alertToSignal ( alert , getScenarioTrustOfAlert ( alert ) , * a . consoleConfig . ShareContext ) )
2022-01-13 15:46:16 +00:00
}
2020-11-30 09:37:17 +00:00
}
a . mu . Lock ( )
cache = append ( cache , signals ... )
a . mu . Unlock ( )
}
}
}
2022-03-29 12:20:26 +00:00
func getScenarioTrustOfAlert ( alert * models . Alert ) string {
scenarioTrust := "certified"
if alert . ScenarioHash == nil || * alert . ScenarioHash == "" {
scenarioTrust = "custom"
} else if alert . ScenarioVersion == nil || * alert . ScenarioVersion == "" || * alert . ScenarioVersion == "?" {
scenarioTrust = "tainted"
}
if len ( alert . Decisions ) > 0 {
2023-01-31 13:47:44 +00:00
if * alert . Decisions [ 0 ] . Origin == types . CscliOrigin {
2022-03-29 12:20:26 +00:00
scenarioTrust = "manual"
}
}
return scenarioTrust
}
func shouldShareAlert ( alert * models . Alert , consoleConfig * csconfig . ConsoleConfig ) bool {
if * alert . Simulated {
log . Debugf ( "simulation enabled for alert (id:%d), will not be sent to CAPI" , alert . ID )
return false
}
switch scenarioTrust := getScenarioTrustOfAlert ( alert ) ; scenarioTrust {
case "manual" :
if ! * consoleConfig . ShareManualDecisions {
log . Debugf ( "manual decision generated an alert, doesn't send it to CAPI because options is disabled" )
return false
}
case "tainted" :
if ! * consoleConfig . ShareTaintedScenarios {
log . Debugf ( "tainted scenario generated an alert, doesn't send it to CAPI because options is disabled" )
return false
}
case "custom" :
if ! * consoleConfig . ShareCustomScenarios {
log . Debugf ( "custom scenario generated an alert, doesn't send it to CAPI because options is disabled" )
return false
}
}
return true
}
2020-11-30 16:46:02 +00:00
func ( a * apic ) Send ( cacheOrig * models . AddSignalsRequest ) {
2020-11-30 09:37:17 +00:00
/ * we do have a problem with this :
The apic . Push background routine reads from alertToPush chan .
This chan is filled by Controller . CreateAlert
If the chan apic . Send hangs , the alertToPush chan will become full ,
with means that Controller . CreateAlert is going to hang , blocking API worker ( s ) .
So instead , we prefer to cancel write .
I don ' t know enough about gin to tell how much of an issue it can be .
* /
2020-11-30 16:46:02 +00:00
var cache [ ] * models . AddSignalsRequestItem = * cacheOrig
var send models . AddSignalsRequest
bulkSize := 50
pageStart := 0
pageEnd := bulkSize
for {
if pageEnd >= len ( cache ) {
send = cache [ pageStart : ]
ctx , cancel := context . WithTimeout ( context . Background ( ) , 5 * time . Second )
defer cancel ( )
_ , _ , err := a . apiClient . Signal . Add ( ctx , & send )
if err != nil {
2023-01-31 13:47:44 +00:00
log . Errorf ( "sending signal to central API: %s" , err )
2020-11-30 16:46:02 +00:00
return
}
break
}
send = cache [ pageStart : pageEnd ]
ctx , cancel := context . WithTimeout ( context . Background ( ) , 5 * time . Second )
defer cancel ( )
_ , _ , err := a . apiClient . Signal . Add ( ctx , & send )
if err != nil {
//we log it here as well, because the return value of func might be discarded
2023-01-31 13:47:44 +00:00
log . Errorf ( "sending signal to central API: %s" , err )
2020-11-30 16:46:02 +00:00
}
pageStart += bulkSize
pageEnd += bulkSize
}
2020-11-30 09:37:17 +00:00
}
2022-03-29 12:20:26 +00:00
func ( a * apic ) CAPIPullIsOld ( ) ( bool , error ) {
2021-08-25 09:45:29 +00:00
/*only pull community blocklist if it's older than 1h30 */
alerts := a . dbClient . Ent . Alert . Query ( )
alerts = alerts . Where ( alert . HasDecisionsWith ( decision . OriginEQ ( database . CapiMachineID ) ) )
2022-06-16 12:41:54 +00:00
alerts = alerts . Where ( alert . CreatedAtGTE ( time . Now ( ) . UTC ( ) . Add ( - time . Duration ( 1 * time . Hour + 30 * time . Minute ) ) ) ) //nolint:unconvert
2021-08-25 09:45:29 +00:00
count , err := alerts . Count ( a . dbClient . CTX )
if err != nil {
2022-03-29 12:20:26 +00:00
return false , errors . Wrap ( err , "while looking for CAPI alert" )
2021-08-25 09:45:29 +00:00
}
if count > 0 {
log . Printf ( "last CAPI pull is newer than 1h30, skip." )
2022-03-29 12:20:26 +00:00
return false , nil
2020-11-30 09:37:17 +00:00
}
2022-03-29 12:20:26 +00:00
return true , nil
}
2022-01-11 13:31:51 +00:00
2022-03-29 12:20:26 +00:00
func ( a * apic ) HandleDeletedDecisions ( deletedDecisions [ ] * models . Decision , delete_counters map [ string ] map [ string ] int ) ( int , error ) {
2020-11-30 09:37:17 +00:00
var filter map [ string ] [ ] string
2021-08-25 09:45:29 +00:00
var nbDeleted int
2022-03-29 12:20:26 +00:00
for _ , decision := range deletedDecisions {
2020-11-30 09:37:17 +00:00
if strings . ToLower ( * decision . Scope ) == "ip" {
filter = make ( map [ string ] [ ] string , 1 )
filter [ "value" ] = [ ] string { * decision . Value }
} else {
filter = make ( map [ string ] [ ] string , 3 )
filter [ "value" ] = [ ] string { * decision . Value }
filter [ "type" ] = [ ] string { * decision . Type }
2022-03-29 12:20:26 +00:00
filter [ "scopes" ] = [ ] string { * decision . Scope }
2020-11-30 09:37:17 +00:00
}
2022-03-29 12:20:26 +00:00
filter [ "origin" ] = [ ] string { * decision . Origin }
2020-11-30 09:37:17 +00:00
2023-01-31 13:47:44 +00:00
dbCliRet , _ , err := a . dbClient . SoftDeleteDecisionsWithFilter ( filter )
2020-11-30 09:37:17 +00:00
if err != nil {
2022-03-29 12:20:26 +00:00
return 0 , errors . Wrap ( err , "deleting decisions error" )
2020-11-30 09:37:17 +00:00
}
2021-08-25 09:45:29 +00:00
dbCliDel , err := strconv . Atoi ( dbCliRet )
if err != nil {
2022-03-29 12:20:26 +00:00
return 0 , errors . Wrapf ( err , "converting db ret %d" , dbCliDel )
2021-08-25 09:45:29 +00:00
}
2023-02-06 13:06:14 +00:00
updateCounterForDecision ( delete_counters , decision . Origin , decision . Scenario , dbCliDel )
2021-08-25 09:45:29 +00:00
nbDeleted += dbCliDel
2020-11-30 09:37:17 +00:00
}
2022-03-29 12:20:26 +00:00
return nbDeleted , nil
2023-02-06 13:06:14 +00:00
}
func ( a * apic ) HandleDeletedDecisionsV3 ( deletedDecisions [ ] * modelscapi . GetDecisionsStreamResponseDeletedItem , delete_counters map [ string ] map [ string ] int ) ( int , error ) {
var filter map [ string ] [ ] string
var nbDeleted int
for _ , decisions := range deletedDecisions {
scope := decisions . Scope
for _ , decision := range decisions . Decisions {
if strings . ToLower ( * scope ) == "ip" {
filter = make ( map [ string ] [ ] string , 1 )
filter [ "value" ] = [ ] string { decision }
} else {
filter = make ( map [ string ] [ ] string , 2 )
filter [ "value" ] = [ ] string { decision }
filter [ "scopes" ] = [ ] string { * scope }
}
filter [ "origin" ] = [ ] string { types . CAPIOrigin }
2020-11-30 09:37:17 +00:00
2023-02-06 13:06:14 +00:00
dbCliRet , _ , err := a . dbClient . SoftDeleteDecisionsWithFilter ( filter )
if err != nil {
return 0 , errors . Wrap ( err , "deleting decisions error" )
}
dbCliDel , err := strconv . Atoi ( dbCliRet )
if err != nil {
return 0 , errors . Wrapf ( err , "converting db ret %d" , dbCliDel )
}
updateCounterForDecision ( delete_counters , types . StrPtr ( types . CAPIOrigin ) , nil , dbCliDel )
nbDeleted += dbCliDel
}
}
return nbDeleted , nil
2022-03-29 12:20:26 +00:00
}
2022-01-11 13:31:51 +00:00
2022-03-29 12:20:26 +00:00
func createAlertsForDecisions ( decisions [ ] * models . Decision ) [ ] * models . Alert {
newAlerts := make ( [ ] * models . Alert , 0 )
for _ , decision := range decisions {
2022-01-11 13:31:51 +00:00
found := false
2022-03-29 12:20:26 +00:00
for _ , sub := range newAlerts {
2022-01-11 13:31:51 +00:00
if sub . Source . Scope == nil {
log . Warningf ( "nil scope in %+v" , sub )
continue
}
2023-01-31 13:47:44 +00:00
if * decision . Origin == types . CAPIOrigin {
if * sub . Source . Scope == types . CAPIOrigin {
2022-01-11 13:31:51 +00:00
found = true
break
}
2023-01-31 13:47:44 +00:00
} else if * decision . Origin == types . ListOrigin {
2022-01-11 13:31:51 +00:00
if * sub . Source . Scope == * decision . Origin {
if sub . Scenario == nil {
log . Warningf ( "nil scenario in %+v" , sub )
}
if * sub . Scenario == * decision . Scenario {
found = true
break
}
}
} else {
log . Warningf ( "unknown origin %s : %+v" , * decision . Origin , decision )
}
}
if ! found {
log . Debugf ( "Create entry for origin:%s scenario:%s" , * decision . Origin , * decision . Scenario )
2022-03-29 12:20:26 +00:00
newAlerts = append ( newAlerts , createAlertForDecision ( decision ) )
2022-01-11 13:31:51 +00:00
}
}
2022-03-29 12:20:26 +00:00
return newAlerts
}
2022-01-11 13:31:51 +00:00
2022-03-29 12:20:26 +00:00
func createAlertForDecision ( decision * models . Decision ) * models . Alert {
newAlert := & models . Alert { }
newAlert . Source = & models . Source { }
newAlert . Source . Scope = types . StrPtr ( "" )
2023-01-31 13:47:44 +00:00
if * decision . Origin == types . CAPIOrigin { //to make things more user friendly, we replace CAPI with community-blocklist
newAlert . Scenario = types . StrPtr ( types . CAPIOrigin )
newAlert . Source . Scope = types . StrPtr ( types . CAPIOrigin )
} else if * decision . Origin == types . ListOrigin {
2022-03-29 12:20:26 +00:00
newAlert . Scenario = types . StrPtr ( * decision . Scenario )
2023-01-31 13:47:44 +00:00
newAlert . Source . Scope = types . StrPtr ( types . ListOrigin )
2022-03-29 12:20:26 +00:00
} else {
log . Warningf ( "unknown origin %s" , * decision . Origin )
}
newAlert . Message = types . StrPtr ( "" )
newAlert . Source . Value = types . StrPtr ( "" )
newAlert . StartAt = types . StrPtr ( time . Now ( ) . UTC ( ) . Format ( time . RFC3339 ) )
newAlert . StopAt = types . StrPtr ( time . Now ( ) . UTC ( ) . Format ( time . RFC3339 ) )
newAlert . Capacity = types . Int32Ptr ( 0 )
newAlert . Simulated = types . BoolPtr ( false )
newAlert . EventsCount = types . Int32Ptr ( 0 )
newAlert . Leakspeed = types . StrPtr ( "" )
newAlert . ScenarioHash = types . StrPtr ( "" )
newAlert . ScenarioVersion = types . StrPtr ( "" )
newAlert . MachineID = database . CapiMachineID
return newAlert
}
// This function takes in list of parent alerts and decisions and then pairs them up.
func fillAlertsWithDecisions ( alerts [ ] * models . Alert , decisions [ ] * models . Decision , add_counters map [ string ] map [ string ] int ) [ ] * models . Alert {
for _ , decision := range decisions {
2022-01-11 13:31:51 +00:00
//count and create separate alerts for each list
2023-02-06 13:06:14 +00:00
updateCounterForDecision ( add_counters , decision . Origin , decision . Scenario , 1 )
2021-01-14 15:27:45 +00:00
2021-07-02 09:23:46 +00:00
/*CAPI might send lower case scopes, unify it.*/
switch strings . ToLower ( * decision . Scope ) {
case "ip" :
* decision . Scope = types . Ip
case "range" :
* decision . Scope = types . Range
}
2022-01-11 13:31:51 +00:00
found := false
//add the individual decisions to the right list
2022-03-29 12:20:26 +00:00
for idx , alert := range alerts {
2023-01-31 13:47:44 +00:00
if * decision . Origin == types . CAPIOrigin {
if * alert . Source . Scope == types . CAPIOrigin {
2022-03-29 12:20:26 +00:00
alerts [ idx ] . Decisions = append ( alerts [ idx ] . Decisions , decision )
2022-01-11 13:31:51 +00:00
found = true
break
}
2023-01-31 13:47:44 +00:00
} else if * decision . Origin == types . ListOrigin {
if * alert . Source . Scope == types . ListOrigin && * alert . Scenario == * decision . Scenario {
2022-03-29 12:20:26 +00:00
alerts [ idx ] . Decisions = append ( alerts [ idx ] . Decisions , decision )
2022-01-11 13:31:51 +00:00
found = true
break
}
} else {
log . Warningf ( "unknown origin %s" , * decision . Origin )
}
}
if ! found {
log . Warningf ( "Orphaned decision for %s - %s" , * decision . Origin , * decision . Scenario )
}
2020-11-30 09:37:17 +00:00
}
2022-03-29 12:20:26 +00:00
return alerts
}
2023-02-06 13:06:14 +00:00
// we receive a list of decisions and links for blocklist and we need to create a list of alerts :
2022-03-29 12:20:26 +00:00
// one alert for "community blocklist"
// one alert per list we're subscribed to
func ( a * apic ) PullTop ( ) error {
var err error
if lastPullIsOld , err := a . CAPIPullIsOld ( ) ; err != nil {
return err
} else if ! lastPullIsOld {
return nil
}
2022-11-08 09:44:25 +00:00
log . Infof ( "Starting community-blocklist update" )
2023-02-06 13:06:14 +00:00
data , _ , err := a . apiClient . Decisions . GetStreamV3 ( context . Background ( ) , apiclient . DecisionsStreamOpts { Startup : a . startup } )
2022-03-29 12:20:26 +00:00
if err != nil {
return errors . Wrap ( err , "get stream" )
}
a . startup = false
2022-04-19 09:25:27 +00:00
/*to count additions/deletions across lists*/
2022-03-29 12:20:26 +00:00
2022-10-26 08:48:17 +00:00
log . Debugf ( "Received %d new decisions" , len ( data . New ) )
log . Debugf ( "Received %d deleted decisions" , len ( data . Deleted ) )
2023-02-06 13:06:14 +00:00
if data . Links != nil {
log . Debugf ( "Received %d blocklists links" , len ( data . Links . Blocklists ) )
}
2022-10-26 08:48:17 +00:00
2022-03-29 12:20:26 +00:00
add_counters , delete_counters := makeAddAndDeleteCounters ( )
// process deleted decisions
2023-02-06 13:06:14 +00:00
if nbDeleted , err := a . HandleDeletedDecisionsV3 ( data . Deleted , delete_counters ) ; err != nil {
2022-03-29 12:20:26 +00:00
return err
} else {
log . Printf ( "capi/community-blocklist : %d explicit deletions" , nbDeleted )
}
if len ( data . New ) == 0 {
2022-04-01 13:31:33 +00:00
log . Infof ( "capi/community-blocklist : received 0 new entries (expected if you just installed crowdsec)" )
2022-03-29 12:20:26 +00:00
return nil
}
2023-02-06 13:06:14 +00:00
// create one alert for community blocklist using the first decision
decisions := a . apiClient . Decisions . GetDecisionsFromGroups ( data . New )
alert := createAlertForDecision ( decisions [ 0 ] )
alertsFromCapi := [ ] * models . Alert { alert }
alertsFromCapi = fillAlertsWithDecisions ( alertsFromCapi , decisions , add_counters )
err = a . SaveAlerts ( alertsFromCapi , add_counters , delete_counters )
if err != nil {
return errors . Wrap ( err , "while saving alerts" )
}
// update blocklists
if err := a . UpdateBlocklists ( data . Links , add_counters ) ; err != nil {
return errors . Wrap ( err , "while updating blocklists" )
}
return nil
}
2021-08-25 09:45:29 +00:00
2023-02-06 13:06:14 +00:00
func ( a * apic ) SaveAlerts ( alertsFromCapi [ ] * models . Alert , add_counters map [ string ] map [ string ] int , delete_counters map [ string ] map [ string ] int ) error {
2022-01-11 13:31:51 +00:00
for idx , alert := range alertsFromCapi {
2022-03-29 12:20:26 +00:00
alertsFromCapi [ idx ] = setAlertScenario ( add_counters , delete_counters , alert )
2022-01-11 13:31:51 +00:00
log . Debugf ( "%s has %d decisions" , * alertsFromCapi [ idx ] . Source . Scope , len ( alertsFromCapi [ idx ] . Decisions ) )
2022-10-26 08:48:17 +00:00
if a . dbClient . Type == "sqlite" && ( a . dbClient . WalMode == nil || ! * a . dbClient . WalMode ) {
log . Warningf ( "sqlite is not using WAL mode, LAPI might become unresponsive when inserting the community blocklist" )
}
2022-01-11 13:31:51 +00:00
alertID , inserted , deleted , err := a . dbClient . UpdateCommunityBlocklist ( alertsFromCapi [ idx ] )
if err != nil {
return errors . Wrapf ( err , "while saving alert from %s" , * alertsFromCapi [ idx ] . Source . Scope )
}
log . Printf ( "%s : added %d entries, deleted %d entries (alert:%d)" , * alertsFromCapi [ idx ] . Source . Scope , inserted , deleted , alertID )
}
2023-02-06 13:06:14 +00:00
return nil
}
func ( a * apic ) UpdateBlocklists ( links * modelscapi . GetDecisionsStreamResponseLinks , add_counters map [ string ] map [ string ] int ) error {
if links == nil {
return nil
}
if links . Blocklists == nil {
return nil
}
// we must use a different http client than apiClient's because the transport of apiClient is jwtTransport or here we have signed apis that are incompatibles
// we can use the same baseUrl as the urls are absolute and the parse will take care of it
defaultClient , err := apiclient . NewDefaultClient ( a . apiClient . BaseURL , "" , "" , & http . Client { } )
if err != nil {
return errors . Wrap ( err , "while creating default client" )
}
for _ , blocklist := range links . Blocklists {
if blocklist . Scope == nil {
log . Warningf ( "blocklist has no scope" )
continue
}
if blocklist . Duration == nil {
log . Warningf ( "blocklist has no duration" )
continue
}
decisions , err := defaultClient . Decisions . GetDecisionsFromBlocklist ( context . Background ( ) , blocklist )
if err != nil {
return errors . Wrapf ( err , "while getting decisions from blocklist %s" , * blocklist . Name )
}
if len ( decisions ) == 0 {
log . Infof ( "blocklist %s has no decisions" , * blocklist . Name )
continue
}
alert := createAlertForDecision ( decisions [ 0 ] )
alertsFromCapi := [ ] * models . Alert { alert }
alertsFromCapi = fillAlertsWithDecisions ( alertsFromCapi , decisions , add_counters )
err = a . SaveAlerts ( alertsFromCapi , add_counters , nil )
if err != nil {
return errors . Wrapf ( err , "while saving alert from blocklist %s" , * blocklist . Name )
}
}
2020-11-30 09:37:17 +00:00
return nil
}
2022-03-29 12:20:26 +00:00
func setAlertScenario ( add_counters map [ string ] map [ string ] int , delete_counters map [ string ] map [ string ] int , alert * models . Alert ) * models . Alert {
2023-01-31 13:47:44 +00:00
if * alert . Source . Scope == types . CAPIOrigin {
* alert . Source . Scope = SCOPE_CAPI_ALIAS_ALIAS
alert . Scenario = types . StrPtr ( fmt . Sprintf ( "update : +%d/-%d IPs" , add_counters [ types . CAPIOrigin ] [ "all" ] , delete_counters [ types . CAPIOrigin ] [ "all" ] ) )
} else if * alert . Source . Scope == types . ListOrigin {
* alert . Source . Scope = fmt . Sprintf ( "%s:%s" , types . ListOrigin , * alert . Scenario )
alert . Scenario = types . StrPtr ( fmt . Sprintf ( "update : +%d/-%d IPs" , add_counters [ types . ListOrigin ] [ * alert . Scenario ] , delete_counters [ types . ListOrigin ] [ * alert . Scenario ] ) )
2022-03-29 12:20:26 +00:00
}
return alert
}
2020-11-30 09:37:17 +00:00
func ( a * apic ) Pull ( ) error {
defer types . CatchPanic ( "lapi/pullFromAPIC" )
2020-12-14 10:54:16 +00:00
toldOnce := false
2020-11-30 09:37:17 +00:00
for {
2022-03-29 12:20:26 +00:00
scenario , err := a . FetchScenariosListFromDB ( )
if err != nil {
log . Errorf ( "unable to fetch scenarios from db: %s" , err )
}
2020-11-30 09:37:17 +00:00
if len ( scenario ) > 0 {
break
}
2020-12-14 10:54:16 +00:00
if ! toldOnce {
2022-06-22 07:38:23 +00:00
log . Warning ( "scenario list is empty, will not pull yet" )
2020-12-14 10:54:16 +00:00
toldOnce = true
}
2020-11-30 09:37:17 +00:00
time . Sleep ( 1 * time . Second )
}
if err := a . PullTop ( ) ; err != nil {
log . Errorf ( "capi pull top: %s" , err )
}
2022-10-28 11:55:59 +00:00
log . Infof ( "Start pull from CrowdSec Central API (interval: %s once, then %s)" , a . pullIntervalFirst . Round ( time . Second ) , a . pullInterval )
ticker := time . NewTicker ( a . pullIntervalFirst )
2020-11-30 09:37:17 +00:00
for {
select {
case <- ticker . C :
2022-10-28 11:55:59 +00:00
ticker . Reset ( a . pullInterval )
2020-11-30 09:37:17 +00:00
if err := a . PullTop ( ) ; err != nil {
log . Errorf ( "capi pull top: %s" , err )
continue
}
case <- a . pullTomb . Dying ( ) : // if one apic routine is dying, do we kill the others?
a . metricsTomb . Kill ( nil )
a . pushTomb . Kill ( nil )
return nil
}
}
}
2021-11-02 11:16:33 +00:00
func ( a * apic ) GetMetrics ( ) ( * models . Metrics , error ) {
metric := & models . Metrics {
2022-03-29 12:20:26 +00:00
ApilVersion : types . StrPtr ( cwversion . VersionStr ( ) ) ,
2022-01-13 15:46:16 +00:00
Machines : make ( [ ] * models . MetricsAgentInfo , 0 ) ,
Bouncers : make ( [ ] * models . MetricsBouncerInfo , 0 ) ,
2021-11-02 11:16:33 +00:00
}
machines , err := a . dbClient . ListMachines ( )
if err != nil {
return metric , err
}
bouncers , err := a . dbClient . ListBouncers ( )
if err != nil {
return metric , err
}
2022-01-20 17:10:40 +00:00
var lastpush string
2021-11-02 11:16:33 +00:00
for _ , machine := range machines {
2022-01-20 17:10:40 +00:00
if machine . LastPush == nil {
lastpush = time . Time { } . String ( )
} else {
lastpush = machine . LastPush . String ( )
}
2022-01-13 15:46:16 +00:00
m := & models . MetricsAgentInfo {
Version : machine . Version ,
Name : machine . MachineId ,
LastUpdate : machine . UpdatedAt . String ( ) ,
2022-01-20 17:10:40 +00:00
LastPush : lastpush ,
2021-11-02 11:16:33 +00:00
}
metric . Machines = append ( metric . Machines , m )
}
for _ , bouncer := range bouncers {
2022-01-13 15:46:16 +00:00
m := & models . MetricsBouncerInfo {
Version : bouncer . Version ,
CustomName : bouncer . Name ,
Name : bouncer . Type ,
LastPull : bouncer . LastPull . String ( ) ,
2021-11-02 11:16:33 +00:00
}
metric . Bouncers = append ( metric . Bouncers , m )
}
return metric , nil
}
2022-09-30 14:01:42 +00:00
func ( a * apic ) SendMetrics ( stop chan ( bool ) ) {
2020-11-30 09:37:17 +00:00
defer types . CatchPanic ( "lapi/metricsToAPIC" )
2022-10-28 11:55:59 +00:00
ticker := time . NewTicker ( a . metricsIntervalFirst )
log . Infof ( "Start send metrics to CrowdSec Central API (interval: %s once, then %s)" , a . metricsIntervalFirst . Round ( time . Second ) , a . metricsInterval )
2020-11-30 09:37:17 +00:00
for {
2022-09-30 14:01:42 +00:00
metrics , err := a . GetMetrics ( )
if err != nil {
log . Errorf ( "unable to get metrics (%s), will retry" , err )
}
_ , _ , err = a . apiClient . Metrics . Add ( context . Background ( ) , metrics )
if err != nil {
log . Errorf ( "capi metrics: failed: %s" , err )
} else {
log . Infof ( "capi metrics: metrics sent successfully" )
}
2020-11-30 09:37:17 +00:00
select {
2022-09-30 14:01:42 +00:00
case <- stop :
return
2020-11-30 09:37:17 +00:00
case <- ticker . C :
2022-10-28 11:55:59 +00:00
ticker . Reset ( a . metricsInterval )
2020-11-30 09:37:17 +00:00
case <- a . metricsTomb . Dying ( ) : // if one apic routine is dying, do we kill the others?
a . pullTomb . Kill ( nil )
a . pushTomb . Kill ( nil )
2022-09-30 14:01:42 +00:00
return
2020-11-30 09:37:17 +00:00
}
}
}
func ( a * apic ) Shutdown ( ) {
a . pushTomb . Kill ( nil )
a . pullTomb . Kill ( nil )
a . metricsTomb . Kill ( nil )
}
2022-03-29 12:20:26 +00:00
func makeAddAndDeleteCounters ( ) ( map [ string ] map [ string ] int , map [ string ] map [ string ] int ) {
add_counters := make ( map [ string ] map [ string ] int )
2023-01-31 13:47:44 +00:00
add_counters [ types . CAPIOrigin ] = make ( map [ string ] int )
add_counters [ types . ListOrigin ] = make ( map [ string ] int )
2022-03-29 12:20:26 +00:00
delete_counters := make ( map [ string ] map [ string ] int )
2023-01-31 13:47:44 +00:00
delete_counters [ types . CAPIOrigin ] = make ( map [ string ] int )
delete_counters [ types . ListOrigin ] = make ( map [ string ] int )
2022-03-29 12:20:26 +00:00
return add_counters , delete_counters
}
2023-02-06 13:06:14 +00:00
func updateCounterForDecision ( counter map [ string ] map [ string ] int , origin * string , scenario * string , totalDecisions int ) {
if * origin == types . CAPIOrigin {
counter [ * origin ] [ "all" ] += totalDecisions
} else if * origin == types . ListOrigin {
counter [ * origin ] [ * scenario ] += totalDecisions
2022-10-06 09:48:06 +00:00
} else {
2023-02-06 13:06:14 +00:00
log . Warningf ( "Unknown origin %s" , * origin )
2022-03-29 12:20:26 +00:00
}
}