optimize the flush function by deleting alerts based on their id (#1054)

This commit is contained in:
blotus 2021-11-17 10:15:38 +01:00 committed by GitHub
parent 0652e9ed08
commit dd03d07355
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -850,34 +850,35 @@ func (c *Client) FlushAlerts(MaxAge string, MaxItems int) error {
deletedByAge = nbDeleted
}
if MaxItems > 0 {
if totalAlerts > MaxItems {
nbToDelete := totalAlerts - MaxItems
batchSize := 500
if batchSize > nbToDelete {
batchSize = nbToDelete
}
deleted := 0
for deleted < nbToDelete {
c.Log.Debugf("FlushAlerts (before query with filter) to delete: %d", nbToDelete)
alerts, err := c.QueryAlertWithFilter(map[string][]string{
"sort": {"ASC"},
"limit": {strconv.Itoa(batchSize)},
}) // we want to delete older alerts if we reach the max number of items
//We get the highest id for the alerts
//We substract MaxItems to avoid deleting alerts that are not old enough
//This gives us the oldest alert that we want to keep
//We then delete all the alerts with an id lower than this one
//We can do this because the id is auto-increment, and the database won't reuse the same id twice
lastAlert, err := c.QueryAlertWithFilter(map[string][]string{
"sort": {"DESC"},
"limit": {"1"},
})
c.Log.Debugf("FlushAlerts (last alert): %+v", lastAlert)
if err != nil {
c.Log.Errorf("FlushAlerts: could not get last alert: %s", err)
return errors.Wrap(err, "could not get last alert")
}
if len(lastAlert) != 0 {
maxid := lastAlert[0].ID - MaxItems
c.Log.Debugf("FlushAlerts (max id): %d", maxid)
if maxid > 0 {
//This may lead to orphan alerts (at least on MySQL), but the next time the flush job will run, they will be deleted
deletedByNbItem, err = c.Ent.Alert.Delete().Where(alert.IDLT(maxid)).Exec(c.CTX)
if err != nil {
c.Log.Warningf("FlushAlerts (max items query) : %s", err)
return errors.Wrap(err, "unable to get all alerts")
}
deletedAlerts, err := c.DeleteAlertGraphBatch(alerts)
if err != nil {
c.Log.Warningf("FlushAlerts (max items query) : %s", err)
return errors.Wrap(err, "unable to delete alerts")
}
deleted += deletedAlerts
if nbToDelete-deleted < batchSize {
batchSize = nbToDelete - deleted
c.Log.Errorf("FlushAlerts: Could not delete alerts : %s", err)
return errors.Wrap(err, "could not delete alerts")
}
}
deletedByNbItem = deleted
}
}
if deletedByNbItem > 0 {