From b63e64ee9f164531ab9ba98ead10a76d21b87c1e Mon Sep 17 00:00:00 2001 From: "Thibault \"bui\" Koechlin" Date: Tue, 19 Mar 2024 10:29:16 +0100 Subject: [PATCH] Fix locking logic for HA + add list unsubscribe for PAPI (#2904) * add list unsubscribe operation for papi * fix the locking logic for HA --- pkg/apiserver/apic.go | 13 ++++++++----- pkg/apiserver/papi_cmd.go | 37 +++++++++++++++++++++++++++++++++---- pkg/database/lock.go | 26 +++++++++++++++++++------- 3 files changed, 60 insertions(+), 16 deletions(-) diff --git a/pkg/apiserver/apic.go b/pkg/apiserver/apic.go index f57ae685e..2136edc8b 100644 --- a/pkg/apiserver/apic.go +++ b/pkg/apiserver/apic.go @@ -639,6 +639,14 @@ func (a *apic) PullTop(forcePull bool) error { return nil } + /*defer lock release*/ + defer func() { + log.Debug("Releasing lock for pullCAPI") + if err := a.dbClient.ReleasePullCAPILock(); err != nil { + log.Errorf("while releasing lock: %v", err) + } + }() + log.Infof("Starting community-blocklist update") data, _, err := a.apiClient.Decisions.GetStreamV3(context.Background(), apiclient.DecisionsStreamOpts{Startup: a.startup}) @@ -690,11 +698,6 @@ func (a *apic) PullTop(forcePull bool) error { return fmt.Errorf("while updating blocklists: %w", err) } - log.Debug("Releasing lock for pullCAPI") - if err := a.dbClient.ReleasePullCAPILock(); err != nil { - return fmt.Errorf("while releasing lock: %w", err) - } - return nil } diff --git a/pkg/apiserver/papi_cmd.go b/pkg/apiserver/papi_cmd.go index ba0203488..fb76223b9 100644 --- a/pkg/apiserver/papi_cmd.go +++ b/pkg/apiserver/papi_cmd.go @@ -37,6 +37,10 @@ type forcePull struct { Blocklist *blocklistLink `json:"blocklist,omitempty"` } +type listUnsubscribe struct { + Name string `json:"name"` +} + func DecisionCmd(message *Message, p *Papi, sync bool) error { switch message.Header.OperationCmd { case "delete": @@ -163,13 +167,38 @@ func AlertCmd(message *Message, p *Papi, sync bool) error { func ManagementCmd(message *Message, p *Papi, sync bool) error { if sync { - log.Infof("Ignoring management command from PAPI in sync mode") + p.Logger.Infof("Ignoring management command from PAPI in sync mode") return nil } switch message.Header.OperationCmd { + + case "blocklist_unsubscribe": + data, err := json.Marshal(message.Data) + if err != nil { + return err + } + unsubscribeMsg := listUnsubscribe{} + if err := json.Unmarshal(data, &unsubscribeMsg); err != nil { + return fmt.Errorf("message for '%s' contains bad data format: %s", message.Header.OperationType, err) + } + if unsubscribeMsg.Name == "" { + return fmt.Errorf("message for '%s' contains bad data format: missing blocklist name", message.Header.OperationType) + } + p.Logger.Infof("Received blocklist_unsubscribe command from PAPI, unsubscribing from blocklist %s", unsubscribeMsg.Name) + + filter := make(map[string][]string) + filter["origin"] = []string{types.ListOrigin} + filter["scenario"] = []string{unsubscribeMsg.Name} + + _, deletedDecisions, err := p.DBClient.SoftDeleteDecisionsWithFilter(filter) + if err != nil { + return fmt.Errorf("unable to delete decisions for list %s : %w", unsubscribeMsg.Name, err) + } + p.Logger.Infof("deleted %d decisions for list %s", len(deletedDecisions), unsubscribeMsg.Name) + case "reauth": - log.Infof("Received reauth command from PAPI, resetting token") + p.Logger.Infof("Received reauth command from PAPI, resetting token") p.apiClient.GetClient().Transport.(*apiclient.JWTTransport).ResetToken() case "force_pull": data, err := json.Marshal(message.Data) @@ -182,13 +211,13 @@ func ManagementCmd(message *Message, p *Papi, sync bool) error { } if forcePullMsg.Blocklist == nil { - log.Infof("Received force_pull command from PAPI, pulling community and 3rd-party blocklists") + p.Logger.Infof("Received force_pull command from PAPI, pulling community and 3rd-party blocklists") err = p.apic.PullTop(true) if err != nil { return fmt.Errorf("failed to force pull operation: %s", err) } } else { - log.Infof("Received force_pull command from PAPI, pulling blocklist %s", forcePullMsg.Blocklist.Name) + p.Logger.Infof("Received force_pull command from PAPI, pulling blocklist %s", forcePullMsg.Blocklist.Name) err = p.apic.PullBlocklist(&modelscapi.BlocklistLink{ Name: &forcePullMsg.Blocklist.Name, URL: &forcePullMsg.Blocklist.Url, diff --git a/pkg/database/lock.go b/pkg/database/lock.go index 339226e85..d25b71870 100644 --- a/pkg/database/lock.go +++ b/pkg/database/lock.go @@ -12,10 +12,12 @@ import ( ) const ( - CAPIPullLockTimeout = 120 + CAPIPullLockTimeout = 10 + CapiPullLockName = "pullCAPI" ) func (c *Client) AcquireLock(name string) error { + log.Debugf("acquiring lock %s", name) _, err := c.Ent.Lock.Create(). SetName(name). SetCreatedAt(types.UtcNow()). @@ -30,6 +32,7 @@ func (c *Client) AcquireLock(name string) error { } func (c *Client) ReleaseLock(name string) error { + log.Debugf("releasing lock %s", name) _, err := c.Ent.Lock.Delete().Where(lock.NameEQ(name)).Exec(c.CTX) if err != nil { return errors.Wrapf(DeleteFail, "delete lock: %s", err) @@ -38,11 +41,12 @@ func (c *Client) ReleaseLock(name string) error { } func (c *Client) ReleaseLockWithTimeout(name string, timeout int) error { - log.Debugf("(%s) releasing orphin locks", name) + log.Debugf("releasing lock %s with timeout of %d minutes", name, timeout) _, err := c.Ent.Lock.Delete().Where( lock.NameEQ(name), - lock.CreatedAtLT(time.Now().Add(-time.Duration(timeout)*time.Minute)), + lock.CreatedAtLT(time.Now().UTC().Add(-time.Duration(timeout)*time.Minute)), ).Exec(c.CTX) + if err != nil { return errors.Wrapf(DeleteFail, "delete lock: %s", err) } @@ -54,14 +58,22 @@ func (c *Client) IsLocked(err error) bool { } func (c *Client) AcquirePullCAPILock() error { - lockName := "pullCAPI" - err := c.ReleaseLockWithTimeout(lockName, CAPIPullLockTimeout) + + /*delete orphan "old" lock if present*/ + err := c.ReleaseLockWithTimeout(CapiPullLockName, CAPIPullLockTimeout) if err != nil { log.Errorf("unable to release pullCAPI lock: %s", err) } - return c.AcquireLock(lockName) + return c.AcquireLock(CapiPullLockName) } func (c *Client) ReleasePullCAPILock() error { - return c.ReleaseLockWithTimeout("pullCAPI", CAPIPullLockTimeout) + log.Debugf("deleting lock %s", CapiPullLockName) + _, err := c.Ent.Lock.Delete().Where( + lock.NameEQ(CapiPullLockName), + ).Exec(c.CTX) + if err != nil { + return errors.Wrapf(DeleteFail, "delete lock: %s", err) + } + return nil }