ente/server/pkg/repo/queue.go
2024-03-11 15:41:18 +05:30

153 lines
5.3 KiB
Go

package repo
import (
"context"
"database/sql"
"fmt"
"github.com/sirupsen/logrus"
"strconv"
"strings"
"github.com/ente-io/museum/pkg/utils/time"
"github.com/ente-io/stacktrace"
)
// QueueRepository defines methods to insert, delete items from queue
type QueueRepository struct {
DB *sql.DB
}
// itemDeletionDelayInMinMap tracks the delay (in min) after which an item is ready to be processed.
// -ve entry indicates that the item should be processed immediately, without any delay.
var itemDeletionDelayInMinMap = map[string]int64{
DropFileEncMedataQueue: -1 * 24 * 60, // -ve value to ensure attributes are immediately removed
DeleteObjectQueue: 45 * 24 * 60, // 45 days in minutes
DeleteEmbeddingsQueue: -1 * 24 * 60, // -ve value to ensure embeddings are immediately removed
TrashCollectionQueueV3: -1 * 24 * 60, // -ve value to ensure collections are immediately marked as trashed
TrashEmptyQueue: -1 * 24 * 60, // -ve value to ensure empty trash request are processed in next cron run
RemoveComplianceHoldQueue: -1 * 24 * 60, // -ve value to ensure compliance hold is removed in next cron run
}
const (
DropFileEncMedataQueue string = "dropFileEncMetata"
DeleteObjectQueue string = "deleteObject"
DeleteEmbeddingsQueue string = "deleteEmbedding"
OutdatedObjectsQueue string = "outdatedObject"
// Deprecated: Keeping it till we clean up items from the queue DB.
TrashCollectionQueue string = "trashCollection"
TrashCollectionQueueV3 string = "trashCollectionV3"
TrashEmptyQueue string = "trashEmpty"
RemoveComplianceHoldQueue string = "removeComplianceHold"
BatchSize int = 30000
)
type QueueItem struct {
Id int64
Item string
}
// InsertItem adds entry in the queue with given queueName and item. If entry already exists, it's no-op
func (repo *QueueRepository) InsertItem(ctx context.Context, queueName string, item string) error {
_, err := repo.DB.ExecContext(ctx, `INSERT INTO queue(queue_name, item) VALUES($1, $2)
ON CONFLICT (queue_name, item) DO NOTHING`, queueName, item)
if err != nil {
return stacktrace.Propagate(err, "")
}
return nil
}
func (repo *QueueRepository) UpdateItem(ctx context.Context, queueName string, queueID int64, item string) error {
rows, err := repo.DB.ExecContext(ctx, `UPDATE queue SET item = $1 WHERE queue_name = $2 AND queue_id = $3 AND is_deleted=false`, item, queueName, queueID)
if err != nil {
return stacktrace.Propagate(err, "")
}
count, err := rows.RowsAffected()
if err != nil {
return stacktrace.Propagate(err, "")
}
if count == 0 {
return fmt.Errorf("no item found with queueID: %d for queue %s", queueID, queueName)
}
return nil
}
func (repo *QueueRepository) RequeueItem(ctx context.Context, queueName string, queueID int64) error {
rows, err := repo.DB.ExecContext(ctx, `UPDATE queue SET is_deleted = false WHERE queue_name = $1 AND queue_id = $2`, queueName, queueID)
if err != nil {
return stacktrace.Propagate(err, "")
}
count, err := rows.RowsAffected()
if err != nil {
return stacktrace.Propagate(err, "")
}
if count == 0 {
return fmt.Errorf("no item found with queueID: %d for queue %s", queueID, queueName)
}
logrus.Infof("Re-queued %d item with queueID: %d for queue %s", count, queueID, queueName)
return nil
}
// AddItems adds a list of item against a specified queue
func (repo *QueueRepository) AddItems(ctx context.Context, tx *sql.Tx, queueName string, items []string) error {
if len(items) == 0 {
return nil
}
lb := 0
size := len(items)
for lb < size {
ub := lb + BatchSize
if ub > size {
ub = size
}
slicedList := items[lb:ub]
query := "INSERT INTO queue(queue_name, item) VALUES "
var inserts []string
var params []interface{}
for i, v := range slicedList {
inserts = append(inserts, `($`+strconv.Itoa(2*i+1)+`,$`+strconv.Itoa(2*i+2)+`)`)
params = append(params, queueName, v)
}
queryVals := strings.Join(inserts, ",")
query = query + queryVals
query = query + " ON CONFLICT (queue_name, item) DO NOTHING"
_, err := tx.ExecContext(ctx, query, params...)
if err != nil {
return stacktrace.Propagate(err, "")
}
lb += BatchSize
}
return nil
}
func (repo *QueueRepository) DeleteItem(queueName string, item string) error {
_, err := repo.DB.Exec(`UPDATE queue SET is_deleted = $1 WHERE queue_name = $2 AND item=$3`, true, queueName, item)
return stacktrace.Propagate(err, "")
}
// GetItemsReadyForDeletion method, for a given queue name, returns a list of QueueItem which are ready for deletion
func (repo *QueueRepository) GetItemsReadyForDeletion(queueName string, count int) ([]QueueItem, error) {
delayInMin, ok := itemDeletionDelayInMinMap[queueName]
if !ok {
return nil, stacktrace.Propagate(fmt.Errorf("missing delay for %s", queueName), "")
}
rows, err := repo.DB.Query(`SELECT queue_id, item FROM queue WHERE
queue_name=$1 and created_at <= $2 and is_deleted = false order by created_at ASC LIMIT $3`,
queueName, time.MicrosecondsBeforeMinutes(delayInMin), count)
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
defer rows.Close()
items := make([]QueueItem, 0)
for rows.Next() {
var item QueueItem
err = rows.Scan(&item.Id, &item.Item)
if err != nil {
return items, stacktrace.Propagate(err, "")
}
items = append(items, item)
}
return items, stacktrace.Propagate(err, "")
}