diff --git a/go.mod b/go.mod index fcd078bbf..95ffeb75d 100644 --- a/go.mod +++ b/go.mod @@ -103,6 +103,7 @@ require ( github.com/go-playground/universal-translator v0.18.0 // indirect github.com/go-playground/validator/v10 v10.10.0 // indirect github.com/go-stack/stack v1.8.0 // indirect + github.com/gofrs/uuid v4.0.0+incompatible // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v4 v4.2.0 // indirect github.com/golang/glog v0.0.0-20210429001901-424d2337a529 // indirect @@ -121,6 +122,7 @@ require ( github.com/jackc/pgproto3/v2 v2.2.0 // indirect github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect github.com/jackc/pgtype v1.9.1 // indirect + github.com/jcuga/golongpoll v1.3.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect diff --git a/go.sum b/go.sum index ce01315ad..96e7a8b8e 100644 --- a/go.sum +++ b/go.sum @@ -480,6 +480,8 @@ github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dv github.com/jackc/puddle v1.2.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jarcoal/httpmock v1.1.0 h1:F47ChZj1Y2zFsCXxNkBPwNNKnAyOATcdQibk0qEdVCE= github.com/jarcoal/httpmock v1.1.0/go.mod h1:ATjnClrvW/3tijVmpL/va5Z3aAyGvqU3gCT8nX0Txik= +github.com/jcuga/golongpoll v1.3.0 h1:00lQC7C1a/4YcGnWcdWi1YzJYfat1Hal2+Cnlvgyado= +github.com/jcuga/golongpoll v1.3.0/go.mod h1:1ijFh83w68ylU44F+xSEyrXChP/7NnoAvgCVHWMggWA= github.com/jhump/protoreflect v1.6.0 h1:h5jfMVslIg6l29nsMs0D8Wj17RDVdNYti0vDN/PZZoE= github.com/jhump/protoreflect v1.6.0/go.mod h1:eaTn3RZAmMBcV0fifFvlm6VHNz3wSkYyXYWUh7ymB74= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= diff --git a/pkg/apiclient/client.go b/pkg/apiclient/client.go index 0f758f1e3..ee8c0f519 100644 --- a/pkg/apiclient/client.go +++ b/pkg/apiclient/client.go @@ -12,6 +12,8 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/models" "github.com/pkg/errors" + + longpollClient "github.com/jcuga/golongpoll/client" ) var ( @@ -70,6 +72,37 @@ func NewClient(config *Config) (*ApiClient, error) { return c, nil } +func NewLongpollClient(config *Config) (*longpollClient.Client, error) { + t := &JWTTransport{ + MachineID: &config.MachineID, + Password: &config.Password, + Scenarios: config.Scenarios, + URL: config.URL, + UserAgent: config.UserAgent, + VersionPrefix: config.VersionPrefix, + UpdateScenario: config.UpdateScenario, + } + tlsconfig := tls.Config{InsecureSkipVerify: InsecureSkipVerify} + if Cert != nil { + tlsconfig.RootCAs = CaCertPool + tlsconfig.Certificates = []tls.Certificate{*Cert} + } + http.DefaultTransport.(*http.Transport).TLSClientConfig = &tlsconfig + + // TODO manage OnFailure + // subUrl, _ := url.Parse(config.URL.String() + "decisions/stream/poll") + // hardcode server as long as we're in test mode + subUrl, _ := url.Parse("http://127.0.0.1:8101/" + "decisions/stream/poll") + c, err := longpollClient.NewClient(longpollClient.ClientOptions{ + SubscribeUrl: *subUrl, + Category: config.MachineID, + HttpClient: t.Client(), + PollTimeoutSeconds: 110, + }) + + return c, err +} + func NewDefaultClient(URL *url.URL, prefix string, userAgent string, client *http.Client) (*ApiClient, error) { if client == nil { client = &http.Client{} diff --git a/pkg/apiserver/apic.go b/pkg/apiserver/apic.go index 1adc51afe..053991f0f 100644 --- a/pkg/apiserver/apic.go +++ b/pkg/apiserver/apic.go @@ -2,6 +2,7 @@ package apiserver import ( "context" + "encoding/json" "fmt" "net/url" "strconv" @@ -21,6 +22,7 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" + longpollClient "github.com/jcuga/golongpoll/client" "gopkg.in/tomb.v2" ) @@ -28,6 +30,7 @@ var ( PullInterval = time.Hour * 2 PushInterval = time.Second * 30 MetricsInterval = time.Minute * 30 + PollTimeout = time.Second * 110 ) var SCOPE_CAPI string = "CAPI" @@ -40,10 +43,12 @@ type apic struct { metricsInterval time.Duration dbClient *database.Client apiClient *apiclient.ApiClient + longpollClient *longpollClient.Client alertToPush chan []*models.Alert mu sync.Mutex pushTomb tomb.Tomb pullTomb tomb.Tomb + longpollTomb tomb.Tomb metricsTomb tomb.Tomb startup bool credentials *csconfig.ApiCredentialsCfg @@ -95,6 +100,7 @@ func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, con startup: true, credentials: config.Credentials, pullTomb: tomb.Tomb{}, + longpollTomb: tomb.Tomb{}, pushTomb: tomb.Tomb{}, metricsTomb: tomb.Tomb{}, scenarioList: make([]string, 0), @@ -122,7 +128,23 @@ func NewAPIC(config *csconfig.OnlineApiClientCfg, dbClient *database.Client, con Scenarios: ret.scenarioList, UpdateScenario: ret.FetchScenariosListFromDB, }) - return ret, err + if err != nil { + return nil, errors.Wrap(err, "creating api client") + } + // TODO configure polling url + ret.longpollClient, err = apiclient.NewLongpollClient(&apiclient.Config{ + MachineID: config.Credentials.Login, + Password: password, + UserAgent: fmt.Sprintf("crowdsec/%s", cwversion.VersionStr()), + URL: apiURL, + VersionPrefix: "v2", + Scenarios: ret.scenarioList, + UpdateScenario: ret.FetchScenariosListFromDB, + }) + if err != nil { + return nil, errors.Wrap(err, "creating polling client") + } + return ret, nil } // keep track of all alerts in cache and push it to CAPI every PushInterval. @@ -137,6 +159,7 @@ func (a *apic) Push() error { select { case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others? a.pullTomb.Kill(nil) + a.longpollTomb.Kill(nil) a.metricsTomb.Kill(nil) log.Infof("push tomb is dying, sending cache (%d elements) before exiting", len(cache)) if len(cache) == 0 { @@ -400,25 +423,7 @@ func fillAlertsWithDecisions(alerts []*models.Alert, decisions []*models.Decisio return alerts } -//we receive only one list of decisions, that we need to break-up : -// one alert for "community blocklist" -// one alert per list we're subscribed to -func (a *apic) PullTop() error { - var err error - - if lastPullIsOld, err := a.CAPIPullIsOld(); err != nil { - return err - } else if !lastPullIsOld { - return nil - } - - data, _, err := a.apiClient.Decisions.GetStream(context.Background(), apiclient.DecisionsStreamOpts{Startup: a.startup}) - if err != nil { - return errors.Wrap(err, "get stream") - } - a.startup = false - /*to count additions/deletions across lists*/ - +func (a *apic) HandleDecisionsStream(data *models.DecisionsStreamResponse) error { add_counters, delete_counters := makeAddAndDeleteCounters() // process deleted decisions if nbDeleted, err := a.HandleDeletedDecisions(data.Deleted, delete_counters); err != nil { @@ -450,6 +455,28 @@ func (a *apic) PullTop() error { return nil } +//we receive only one list of decisions, that we need to break-up : +// one alert for "community blocklist" +// one alert per list we're subscribed to +func (a *apic) PullTop() error { + var err error + + if lastPullIsOld, err := a.CAPIPullIsOld(); err != nil { + return err + } else if !lastPullIsOld { + return nil + } + + data, _, err := a.apiClient.Decisions.GetStream(context.Background(), apiclient.DecisionsStreamOpts{Startup: a.startup}) + if err != nil { + return errors.Wrap(err, "get stream") + } + a.startup = false + /*to count additions/deletions across lists*/ + + return a.HandleDecisionsStream(data) +} + func setAlertScenario(add_counters map[string]map[string]int, delete_counters map[string]map[string]int, alert *models.Alert) *models.Alert { if *alert.Source.Scope == SCOPE_CAPI { *alert.Source.Scope = SCOPE_CAPI_ALIAS @@ -494,6 +521,61 @@ func (a *apic) Pull() error { case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others? a.metricsTomb.Kill(nil) a.pushTomb.Kill(nil) + a.longpollTomb.Kill(nil) + return nil + } + } +} + +func (a *apic) LongPoll() error { + defer types.CatchPanic("lapi/longpollFromAPIC") + log.Infof("Start longpoll from CrowdSec Central API (timeout: %s)", PollTimeout) + + toldOnce := false + for { + scenario, err := a.FetchScenariosListFromDB() + if err != nil { + log.Errorf("unable to fetch scenarios from db: %s", err) + } + if len(scenario) > 0 { + break + } + if !toldOnce { + log.Warning("scenario list is empty, will not pull yet") + toldOnce = true + } + time.Sleep(1 * time.Second) + } + // TODO start since the last event timestamp + pollChannel := a.longpollClient.Start(time.Now()) + for { + select { + case event := <-pollChannel: + // get the decision + decision := models.Decision{} + stringData, err := json.Marshal(event.Data) + if err != nil { + log.Errorf("marshal error: %s", err) + continue + } + json.Unmarshal(stringData, &decision) + + // do something with the decision + // TODO decision should have a type new/deleted + data := models.DecisionsStreamResponse{Deleted: make([]*models.Decision, 0, 1), New: make([]*models.Decision, 0, 1)} + data.New = append(data.New, &decision) + + err = a.HandleDecisionsStream(&data) + if err != nil { + log.Errorf("handle stream error: %s", err) + continue + } + + // TODO store last event timestamp + case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others? + a.metricsTomb.Kill(nil) + a.pushTomb.Kill(nil) + a.pullTomb.Kill(nil) return nil } } @@ -570,6 +652,7 @@ func (a *apic) SendMetrics() error { } case <-a.metricsTomb.Dying(): // if one apic routine is dying, do we kill the others? a.pullTomb.Kill(nil) + a.longpollTomb.Kill(nil) a.pushTomb.Kill(nil) return nil } @@ -579,6 +662,7 @@ func (a *apic) SendMetrics() error { func (a *apic) Shutdown() { a.pushTomb.Kill(nil) a.pullTomb.Kill(nil) + a.longpollTomb.Kill(nil) a.metricsTomb.Kill(nil) } diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index d67c0bdc4..c90c85b16 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -311,6 +311,13 @@ func (s *APIServer) Run(apiReady chan bool) error { } return nil }) + s.apic.longpollTomb.Go(func() error { + if err := s.apic.LongPoll(); err != nil { + log.Errorf("capi longpoll: %s", err) + return err + } + return nil + }) s.apic.metricsTomb.Go(func() error { if err := s.apic.SendMetrics(); err != nil { log.Errorf("capi metrics: %s", err)