Fix locking logic for HA + add list unsubscribe for PAPI (#2904)
* add list unsubscribe operation for papi * fix the locking logic for HA
This commit is contained in:
parent
6de62a1468
commit
b63e64ee9f
|
@ -639,6 +639,14 @@ func (a *apic) PullTop(forcePull bool) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*defer lock release*/
|
||||||
|
defer func() {
|
||||||
|
log.Debug("Releasing lock for pullCAPI")
|
||||||
|
if err := a.dbClient.ReleasePullCAPILock(); err != nil {
|
||||||
|
log.Errorf("while releasing lock: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
log.Infof("Starting community-blocklist update")
|
log.Infof("Starting community-blocklist update")
|
||||||
|
|
||||||
data, _, err := a.apiClient.Decisions.GetStreamV3(context.Background(), apiclient.DecisionsStreamOpts{Startup: a.startup})
|
data, _, err := a.apiClient.Decisions.GetStreamV3(context.Background(), apiclient.DecisionsStreamOpts{Startup: a.startup})
|
||||||
|
@ -690,11 +698,6 @@ func (a *apic) PullTop(forcePull bool) error {
|
||||||
return fmt.Errorf("while updating blocklists: %w", err)
|
return fmt.Errorf("while updating blocklists: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("Releasing lock for pullCAPI")
|
|
||||||
if err := a.dbClient.ReleasePullCAPILock(); err != nil {
|
|
||||||
return fmt.Errorf("while releasing lock: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,10 @@ type forcePull struct {
|
||||||
Blocklist *blocklistLink `json:"blocklist,omitempty"`
|
Blocklist *blocklistLink `json:"blocklist,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type listUnsubscribe struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
}
|
||||||
|
|
||||||
func DecisionCmd(message *Message, p *Papi, sync bool) error {
|
func DecisionCmd(message *Message, p *Papi, sync bool) error {
|
||||||
switch message.Header.OperationCmd {
|
switch message.Header.OperationCmd {
|
||||||
case "delete":
|
case "delete":
|
||||||
|
@ -163,13 +167,38 @@ func AlertCmd(message *Message, p *Papi, sync bool) error {
|
||||||
|
|
||||||
func ManagementCmd(message *Message, p *Papi, sync bool) error {
|
func ManagementCmd(message *Message, p *Papi, sync bool) error {
|
||||||
if sync {
|
if sync {
|
||||||
log.Infof("Ignoring management command from PAPI in sync mode")
|
p.Logger.Infof("Ignoring management command from PAPI in sync mode")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
switch message.Header.OperationCmd {
|
switch message.Header.OperationCmd {
|
||||||
|
|
||||||
|
case "blocklist_unsubscribe":
|
||||||
|
data, err := json.Marshal(message.Data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
unsubscribeMsg := listUnsubscribe{}
|
||||||
|
if err := json.Unmarshal(data, &unsubscribeMsg); err != nil {
|
||||||
|
return fmt.Errorf("message for '%s' contains bad data format: %s", message.Header.OperationType, err)
|
||||||
|
}
|
||||||
|
if unsubscribeMsg.Name == "" {
|
||||||
|
return fmt.Errorf("message for '%s' contains bad data format: missing blocklist name", message.Header.OperationType)
|
||||||
|
}
|
||||||
|
p.Logger.Infof("Received blocklist_unsubscribe command from PAPI, unsubscribing from blocklist %s", unsubscribeMsg.Name)
|
||||||
|
|
||||||
|
filter := make(map[string][]string)
|
||||||
|
filter["origin"] = []string{types.ListOrigin}
|
||||||
|
filter["scenario"] = []string{unsubscribeMsg.Name}
|
||||||
|
|
||||||
|
_, deletedDecisions, err := p.DBClient.SoftDeleteDecisionsWithFilter(filter)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to delete decisions for list %s : %w", unsubscribeMsg.Name, err)
|
||||||
|
}
|
||||||
|
p.Logger.Infof("deleted %d decisions for list %s", len(deletedDecisions), unsubscribeMsg.Name)
|
||||||
|
|
||||||
case "reauth":
|
case "reauth":
|
||||||
log.Infof("Received reauth command from PAPI, resetting token")
|
p.Logger.Infof("Received reauth command from PAPI, resetting token")
|
||||||
p.apiClient.GetClient().Transport.(*apiclient.JWTTransport).ResetToken()
|
p.apiClient.GetClient().Transport.(*apiclient.JWTTransport).ResetToken()
|
||||||
case "force_pull":
|
case "force_pull":
|
||||||
data, err := json.Marshal(message.Data)
|
data, err := json.Marshal(message.Data)
|
||||||
|
@ -182,13 +211,13 @@ func ManagementCmd(message *Message, p *Papi, sync bool) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if forcePullMsg.Blocklist == nil {
|
if forcePullMsg.Blocklist == nil {
|
||||||
log.Infof("Received force_pull command from PAPI, pulling community and 3rd-party blocklists")
|
p.Logger.Infof("Received force_pull command from PAPI, pulling community and 3rd-party blocklists")
|
||||||
err = p.apic.PullTop(true)
|
err = p.apic.PullTop(true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to force pull operation: %s", err)
|
return fmt.Errorf("failed to force pull operation: %s", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Infof("Received force_pull command from PAPI, pulling blocklist %s", forcePullMsg.Blocklist.Name)
|
p.Logger.Infof("Received force_pull command from PAPI, pulling blocklist %s", forcePullMsg.Blocklist.Name)
|
||||||
err = p.apic.PullBlocklist(&modelscapi.BlocklistLink{
|
err = p.apic.PullBlocklist(&modelscapi.BlocklistLink{
|
||||||
Name: &forcePullMsg.Blocklist.Name,
|
Name: &forcePullMsg.Blocklist.Name,
|
||||||
URL: &forcePullMsg.Blocklist.Url,
|
URL: &forcePullMsg.Blocklist.Url,
|
||||||
|
|
|
@ -12,10 +12,12 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
CAPIPullLockTimeout = 120
|
CAPIPullLockTimeout = 10
|
||||||
|
CapiPullLockName = "pullCAPI"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *Client) AcquireLock(name string) error {
|
func (c *Client) AcquireLock(name string) error {
|
||||||
|
log.Debugf("acquiring lock %s", name)
|
||||||
_, err := c.Ent.Lock.Create().
|
_, err := c.Ent.Lock.Create().
|
||||||
SetName(name).
|
SetName(name).
|
||||||
SetCreatedAt(types.UtcNow()).
|
SetCreatedAt(types.UtcNow()).
|
||||||
|
@ -30,6 +32,7 @@ func (c *Client) AcquireLock(name string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) ReleaseLock(name string) error {
|
func (c *Client) ReleaseLock(name string) error {
|
||||||
|
log.Debugf("releasing lock %s", name)
|
||||||
_, err := c.Ent.Lock.Delete().Where(lock.NameEQ(name)).Exec(c.CTX)
|
_, err := c.Ent.Lock.Delete().Where(lock.NameEQ(name)).Exec(c.CTX)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(DeleteFail, "delete lock: %s", err)
|
return errors.Wrapf(DeleteFail, "delete lock: %s", err)
|
||||||
|
@ -38,11 +41,12 @@ func (c *Client) ReleaseLock(name string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) ReleaseLockWithTimeout(name string, timeout int) error {
|
func (c *Client) ReleaseLockWithTimeout(name string, timeout int) error {
|
||||||
log.Debugf("(%s) releasing orphin locks", name)
|
log.Debugf("releasing lock %s with timeout of %d minutes", name, timeout)
|
||||||
_, err := c.Ent.Lock.Delete().Where(
|
_, err := c.Ent.Lock.Delete().Where(
|
||||||
lock.NameEQ(name),
|
lock.NameEQ(name),
|
||||||
lock.CreatedAtLT(time.Now().Add(-time.Duration(timeout)*time.Minute)),
|
lock.CreatedAtLT(time.Now().UTC().Add(-time.Duration(timeout)*time.Minute)),
|
||||||
).Exec(c.CTX)
|
).Exec(c.CTX)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(DeleteFail, "delete lock: %s", err)
|
return errors.Wrapf(DeleteFail, "delete lock: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -54,14 +58,22 @@ func (c *Client) IsLocked(err error) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) AcquirePullCAPILock() error {
|
func (c *Client) AcquirePullCAPILock() error {
|
||||||
lockName := "pullCAPI"
|
|
||||||
err := c.ReleaseLockWithTimeout(lockName, CAPIPullLockTimeout)
|
/*delete orphan "old" lock if present*/
|
||||||
|
err := c.ReleaseLockWithTimeout(CapiPullLockName, CAPIPullLockTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("unable to release pullCAPI lock: %s", err)
|
log.Errorf("unable to release pullCAPI lock: %s", err)
|
||||||
}
|
}
|
||||||
return c.AcquireLock(lockName)
|
return c.AcquireLock(CapiPullLockName)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) ReleasePullCAPILock() error {
|
func (c *Client) ReleasePullCAPILock() error {
|
||||||
return c.ReleaseLockWithTimeout("pullCAPI", CAPIPullLockTimeout)
|
log.Debugf("deleting lock %s", CapiPullLockName)
|
||||||
|
_, err := c.Ent.Lock.Delete().Where(
|
||||||
|
lock.NameEQ(CapiPullLockName),
|
||||||
|
).Exec(c.CTX)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(DeleteFail, "delete lock: %s", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue