From 85ab9c68a27f44c23c25e60dcf60c3f0b90fa745 Mon Sep 17 00:00:00 2001 From: blotus Date: Fri, 3 Mar 2023 13:46:28 +0100 Subject: [PATCH] Add `cscli papi status` and `cscli papi sync` (#2091) --- cmd/crowdsec-cli/capi.go | 7 +- cmd/crowdsec-cli/main.go | 20 +++-- cmd/crowdsec-cli/papi.go | 136 ++++++++++++++++++++++++++++++++++ pkg/apiserver/papi.go | 138 ++++++++++++++++++++++++++++------- pkg/longpollclient/client.go | 58 +++++++++++++-- pkg/types/constants.go | 6 +- 6 files changed, 321 insertions(+), 44 deletions(-) create mode 100644 cmd/crowdsec-cli/papi.go diff --git a/cmd/crowdsec-cli/capi.go b/cmd/crowdsec-cli/capi.go index 621305248..90ae4697e 100644 --- a/cmd/crowdsec-cli/capi.go +++ b/cmd/crowdsec-cli/capi.go @@ -10,6 +10,7 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/cwhub" "github.com/crowdsecurity/crowdsec/pkg/cwversion" + "github.com/crowdsecurity/crowdsec/pkg/fflag" "github.com/crowdsecurity/crowdsec/pkg/models" "github.com/crowdsecurity/crowdsec/pkg/types" "github.com/go-openapi/strfmt" @@ -19,7 +20,7 @@ import ( "gopkg.in/yaml.v2" ) -const CAPIBaseURL string = "https://api.crowdsec.net/" +const CAPIBaseURL string = "https://api.dev.crowdsec.net/" const CAPIURLPrefix = "v3" func NewCapiCmd() *cobra.Command { @@ -92,7 +93,9 @@ func NewCapiRegisterCmd() *cobra.Command { Login: capiUser, Password: password.String(), URL: types.CAPIBaseURL, - PapiURL: types.PAPIBaseURL, + } + if fflag.PapiClient.IsEnabled() { + apiCfg.PapiURL = types.PAPIBaseURL } apiConfigDump, err := yaml.Marshal(apiCfg) if err != nil { diff --git a/cmd/crowdsec-cli/main.go b/cmd/crowdsec-cli/main.go index ed4df0c1b..0da586fd8 100644 --- a/cmd/crowdsec-cli/main.go +++ b/cmd/crowdsec-cli/main.go @@ -159,16 +159,16 @@ It is meant to allow you to manage bans, parsers/scenarios/etc, api and generall } cc.Init(&cc.Config{ - RootCmd: rootCmd, - Headings: cc.Yellow, - Commands: cc.Green + cc.Bold, + RootCmd: rootCmd, + Headings: cc.Yellow, + Commands: cc.Green + cc.Bold, CmdShortDescr: cc.Cyan, - Example: cc.Italic, - ExecName: cc.Bold, - Aliases: cc.Bold + cc.Italic, + Example: cc.Italic, + ExecName: cc.Bold, + Aliases: cc.Bold + cc.Italic, FlagsDataType: cc.White, - Flags: cc.Green, - FlagsDescr: cc.Cyan, + Flags: cc.Green, + FlagsDescr: cc.Cyan, }) rootCmd.SetOut(color.Output) @@ -247,6 +247,10 @@ It is meant to allow you to manage bans, parsers/scenarios/etc, api and generall rootCmd.AddCommand(NewSetupCmd()) } + if fflag.PapiClient.IsEnabled() { + rootCmd.AddCommand(NewPapiCmd()) + } + if err := rootCmd.Execute(); err != nil { if bincoverTesting != "" { log.Debug("coverage report is enabled") diff --git a/cmd/crowdsec-cli/papi.go b/cmd/crowdsec-cli/papi.go new file mode 100644 index 000000000..ccd9cebbd --- /dev/null +++ b/cmd/crowdsec-cli/papi.go @@ -0,0 +1,136 @@ +package main + +import ( + "time" + + "github.com/crowdsecurity/crowdsec/pkg/apiserver" + "github.com/crowdsecurity/crowdsec/pkg/database" + "github.com/crowdsecurity/crowdsec/pkg/types" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "gopkg.in/tomb.v2" +) + +func NewPapiCmd() *cobra.Command { + var cmdLapi = &cobra.Command{ + Use: "papi [action]", + Short: "Manage interaction with Polling API (PAPI)", + Args: cobra.MinimumNArgs(1), + DisableAutoGenTag: true, + PersistentPreRunE: func(cmd *cobra.Command, args []string) error { + if err := csConfig.LoadAPIServer(); err != nil || csConfig.DisableAPI { + return errors.Wrap(err, "Local API is disabled, please run this command on the local API machine") + } + if csConfig.API.Server.OnlineClient == nil { + log.Fatalf("no configuration for Central API in '%s'", *csConfig.FilePath) + } + if csConfig.API.Server.OnlineClient.Credentials.PapiURL == "" { + log.Fatalf("no PAPI URL in configuration") + } + return nil + }, + } + + cmdLapi.AddCommand(NewPapiStatusCmd()) + cmdLapi.AddCommand(NewPapiSyncCmd()) + + return cmdLapi +} + +func NewPapiStatusCmd() *cobra.Command { + cmdCapiStatus := &cobra.Command{ + Use: "status", + Short: "Get status of the Polling API", + Args: cobra.MinimumNArgs(0), + DisableAutoGenTag: true, + Run: func(cmd *cobra.Command, args []string) { + var err error + dbClient, err = database.NewClient(csConfig.DbConfig) + if err != nil { + log.Fatalf("unable to initialize database client : %s", err) + } + + apic, err := apiserver.NewAPIC(csConfig.API.Server.OnlineClient, dbClient, csConfig.API.Server.ConsoleConfig) + + if err != nil { + log.Fatalf("unable to initialize API client : %s", err) + } + + papi, err := apiserver.NewPAPI(apic, dbClient, csConfig.API.Server.ConsoleConfig, log.GetLevel()) + + if err != nil { + log.Fatalf("unable to initialize PAPI client : %s", err) + } + + perms, err := papi.GetPermissions() + + if err != nil { + log.Fatalf("unable to get PAPI permissions: %s", err) + } + var lastTimestampStr *string + lastTimestampStr, err = dbClient.GetConfigItem(apiserver.PapiPullKey) + if err != nil { + lastTimestampStr = types.StrPtr("never") + } + log.Infof("You can successfully interact with Polling API (PAPI)") + log.Infof("Console plan: %s", perms.Plan) + log.Infof("Last order received: %s", *lastTimestampStr) + + log.Infof("PAPI subscriptions:") + for _, sub := range perms.Categories { + log.Infof(" - %s", sub) + } + }, + } + + return cmdCapiStatus +} + +func NewPapiSyncCmd() *cobra.Command { + cmdCapiSync := &cobra.Command{ + Use: "sync", + Short: "Sync with the Polling API, pulling all non-expired orders for the instance", + Args: cobra.MinimumNArgs(0), + DisableAutoGenTag: true, + Run: func(cmd *cobra.Command, args []string) { + var err error + t := tomb.Tomb{} + dbClient, err = database.NewClient(csConfig.DbConfig) + if err != nil { + log.Fatalf("unable to initialize database client : %s", err) + } + + apic, err := apiserver.NewAPIC(csConfig.API.Server.OnlineClient, dbClient, csConfig.API.Server.ConsoleConfig) + + if err != nil { + log.Fatalf("unable to initialize API client : %s", err) + } + + t.Go(apic.Push) + + papi, err := apiserver.NewPAPI(apic, dbClient, csConfig.API.Server.ConsoleConfig, log.GetLevel()) + + if err != nil { + log.Fatalf("unable to initialize PAPI client : %s", err) + } + t.Go(papi.SyncDecisions) + + err = papi.PullOnce(time.Time{}) + + if err != nil { + log.Fatalf("unable to sync decisions: %s", err) + } + + log.Infof("Sending acknowledgements to CAPI") + + apic.Shutdown() + papi.Shutdown() + t.Wait() + time.Sleep(5 * time.Second) //FIXME: the push done by apic.Push is run inside a sub goroutine, sleep to make sure it's done + + }, + } + + return cmdCapiSync +} diff --git a/pkg/apiserver/papi.go b/pkg/apiserver/papi.go index 4b3ec14fb..fe8c531fe 100644 --- a/pkg/apiserver/papi.go +++ b/pkg/apiserver/papi.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "net/http" "sync" "time" @@ -72,6 +73,16 @@ type Papi struct { Logger *log.Entry } +type PapiPermCheckError struct { + Error string `json:"error"` +} + +type PapiPermCheckSuccess struct { + Status string `json:"status"` + Plan string `json:"plan"` + Categories []string `json:"categories"` +} + func NewPAPI(apic *apic, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig, logLevel log.Level) (*Papi, error) { logger := logrus.New() @@ -80,8 +91,10 @@ func NewPAPI(apic *apic, dbClient *database.Client, consoleConfig *csconfig.Cons } logger.SetLevel(logLevel) + papiUrl := *apic.apiClient.PapiURL + papiUrl.Path = fmt.Sprintf("%s%s", types.PAPIVersion, types.PAPIPollUrl) longPollClient, err := longpollclient.NewLongPollClient(longpollclient.LongPollClientConfig{ - Url: *apic.apiClient.PapiURL, + Url: papiUrl, Logger: logger, HttpClient: apic.apiClient.GetClient(), }) @@ -112,6 +125,94 @@ func NewPAPI(apic *apic, dbClient *database.Client, consoleConfig *csconfig.Cons return papi, nil } +func (p *Papi) handleEvent(event longpollclient.Event) error { + logger := p.Logger.WithField("request-id", event.RequestId) + logger.Debugf("message received: %+v", event.Data) + message := &Message{} + if err := json.Unmarshal([]byte(event.Data), message); err != nil { + return fmt.Errorf("polling papi message format is not compatible: %+v: %s", event.Data, err) + } + if message.Header == nil { + return fmt.Errorf("no header in message, skipping") + } + if message.Header.Source == nil { + return fmt.Errorf("no source user in header message, skipping") + } + + if operationFunc, ok := operationMap[message.Header.OperationType]; ok { + logger.Debugf("Calling operation '%s'", message.Header.OperationType) + err := operationFunc(message, p) + if err != nil { + return fmt.Errorf("'%s %s failed: %s", message.Header.OperationType, message.Header.OperationCmd, err) + } + } else { + return fmt.Errorf("operation '%s' unknown, continue", message.Header.OperationType) + } + return nil +} + +func (p *Papi) GetPermissions() (PapiPermCheckSuccess, error) { + httpClient := p.apiClient.GetClient() + papiCheckUrl := fmt.Sprintf("%s%s%s", p.URL, types.PAPIVersion, types.PAPIPermissionsUrl) + req, err := http.NewRequest(http.MethodGet, papiCheckUrl, nil) + if err != nil { + return PapiPermCheckSuccess{}, fmt.Errorf("failed to create request : %s", err) + } + resp, err := httpClient.Do(req) + if err != nil { + log.Fatalf("failed to get response : %s", err) + } + + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + errResp := PapiPermCheckError{} + err = json.NewDecoder(resp.Body).Decode(&errResp) + if err != nil { + return PapiPermCheckSuccess{}, fmt.Errorf("failed to decode response : %s", err) + } + return PapiPermCheckSuccess{}, fmt.Errorf("unable to query PAPI : %s (%d)", errResp.Error, resp.StatusCode) + } + respBody := PapiPermCheckSuccess{} + err = json.NewDecoder(resp.Body).Decode(&respBody) + if err != nil { + return PapiPermCheckSuccess{}, fmt.Errorf("failed to decode response : %s", err) + } + return respBody, nil +} + +func reverse(s []longpollclient.Event) []longpollclient.Event { + a := make([]longpollclient.Event, len(s)) + copy(a, s) + + for i := len(a)/2 - 1; i >= 0; i-- { + opp := len(a) - 1 - i + a[i], a[opp] = a[opp], a[i] + } + + return a +} + +func (p *Papi) PullOnce(since time.Time) error { + events, err := p.Client.PullOnce(since) + if err != nil { + return err + } + + reversedEvents := reverse(events) //PAPI sends events in the reverse order, which is not an issue when pulling them in real time, but here we need the correct order + eventsCount := len(events) + p.Logger.Infof("received %d events", eventsCount) + for i, event := range reversedEvents { + if err := p.handleEvent(event); err != nil { + p.Logger.WithField("request-id", event.RequestId).Errorf("failed to handle event: %s", err) + } + p.Logger.Debugf("handled event %d/%d", i, eventsCount) + } + p.Logger.Debugf("finished handling events") + //Don't update the timestamp in DB, as a "real" LAPI might be running + //Worst case, crowdsec will receive a few duplicated events and will discard them + return nil +} + // PullPAPI is the long polling client for real-time decisions from PAPI func (p *Papi) Pull() error { @@ -149,33 +250,10 @@ func (p *Papi) Pull() error { if err != nil { return errors.Wrap(err, "failed to marshal last timestamp") } - logger.Debugf("message received: %+v", event.Data) - message := &Message{} - if err := json.Unmarshal([]byte(event.Data), message); err != nil { - logger.Errorf("polling papi message format is not compatible: %+v: %s", event.Data, err) - // do we want to continue or exit ? - continue - } - if message.Header == nil { - logger.Errorf("no header in message, skipping") - continue - } - - if message.Header.Source == nil { - logger.Errorf("no source user in header message, skipping") - continue - } - - if operationFunc, ok := operationMap[message.Header.OperationType]; ok { - logger.Debugf("Calling operation '%s'", message.Header.OperationType) - err := operationFunc(message, p) - if err != nil { - logger.Errorf("'%s %s failed: %s", message.Header.OperationType, message.Header.OperationCmd, err) - continue - } - } else { - logger.Errorf("operation '%s' unknown, continue", message.Header.OperationType) + err = p.handleEvent(event) + if err != nil { + logger.Errorf("failed to handle event: %s", err) continue } @@ -261,3 +339,9 @@ func (p *Papi) SendDeletedDecisions(cacheOrig *models.DecisionsDeleteRequest) { pageEnd += bulkSize } } + +func (p *Papi) Shutdown() { + p.Logger.Infof("Shutting down PAPI") + p.syncTomb.Kill(nil) + p.Client.Stop() +} diff --git a/pkg/longpollclient/client.go b/pkg/longpollclient/client.go index 57a68c4b1..587826452 100644 --- a/pkg/longpollclient/client.go +++ b/pkg/longpollclient/client.go @@ -19,6 +19,7 @@ type LongPollClient struct { url url.URL logger *log.Entry since int64 + timeout string httpClient *http.Client } @@ -48,13 +49,11 @@ var errUnauthorized = fmt.Errorf("user is not authorized to use PAPI") const timeoutMessage = "no events before timeout" -func (c *LongPollClient) doQuery() error { - +func (c *LongPollClient) doQuery() (*http.Response, error) { logger := c.logger.WithField("method", "doQuery") - query := c.url.Query() query.Set("since_time", fmt.Sprintf("%d", c.since)) - query.Set("timeout", "45") + query.Set("timeout", c.timeout) c.url.RawQuery = query.Encode() logger.Debugf("Query parameters: %s", c.url.RawQuery) @@ -62,15 +61,29 @@ func (c *LongPollClient) doQuery() error { req, err := http.NewRequest(http.MethodGet, c.url.String(), nil) if err != nil { logger.Errorf("failed to create request: %s", err) - return err + return nil, err } req.Header.Set("Accept", "application/json") resp, err := c.httpClient.Do(req) if err != nil { logger.Errorf("failed to execute request: %s", err) + return nil, err + } + return resp, nil +} + +func (c *LongPollClient) poll() error { + + logger := c.logger.WithField("method", "poll") + + resp, err := c.doQuery() + + if err != nil { return err } + defer resp.Body.Close() + requestId := resp.Header.Get("X-Amzn-Trace-Id") logger = logger.WithField("request-id", requestId) if resp.StatusCode != http.StatusOK { @@ -142,7 +155,7 @@ func (c *LongPollClient) pollEvents() error { return nil default: c.logger.Debug("Polling PAPI") - err := c.doQuery() + err := c.poll() if err != nil { c.logger.Errorf("failed to poll: %s", err) if err == errUnauthorized { @@ -160,6 +173,7 @@ func (c *LongPollClient) Start(since time.Time) chan Event { c.logger.Infof("starting polling client") c.c = make(chan Event) c.since = since.Unix() * 1000 + c.timeout = "45" c.t.Go(c.pollEvents) return c.c } @@ -169,6 +183,38 @@ func (c *LongPollClient) Stop() error { return nil } +func (c *LongPollClient) PullOnce(since time.Time) ([]Event, error) { + c.logger.Debug("Pulling PAPI once") + c.since = since.Unix() * 1000 + c.timeout = "1" + resp, err := c.doQuery() + if err != nil { + return nil, err + } + defer resp.Body.Close() + decoder := json.NewDecoder(resp.Body) + var pollResp pollResponse + err = decoder.Decode(&pollResp) + if err != nil { + if err == io.EOF { + c.logger.Debugf("server closed connection") + return nil, nil + } + return nil, fmt.Errorf("error decoding poll response: %v", err) + } + + c.logger.Tracef("got response: %+v", pollResp) + + if len(pollResp.ErrorMessage) > 0 { + if pollResp.ErrorMessage == timeoutMessage { + c.logger.Debugf("got timeout message") + return nil, nil + } + return nil, fmt.Errorf("longpoll API error message: %s", pollResp.ErrorMessage) + } + return pollResp.Events, nil +} + func NewLongPollClient(config LongPollClientConfig) (*LongPollClient, error) { var logger *log.Entry if config.Url == (url.URL{}) { diff --git a/pkg/types/constants.go b/pkg/types/constants.go index 4e0d445c4..a2cebd696 100644 --- a/pkg/types/constants.go +++ b/pkg/types/constants.go @@ -4,7 +4,11 @@ const ApiKeyAuthType = "api-key" const TlsAuthType = "tls" const PasswordAuthType = "password" -const PAPIBaseURL = "https://papi.api.crowdsec.net/v1/decisions/stream/poll" +const PAPIBaseURL = "https://papi.api.crowdsec.net/" +const PAPIVersion = "v1" +const PAPIPollUrl = "/decisions/stream/poll" +const PAPIPermissionsUrl = "/permissions" + const CAPIBaseURL = "https://api.crowdsec.net/" const CscliOrigin = "cscli"