ente/server/pkg/repo/object_cleanup.go
2024-03-01 13:37:01 +05:30

128 lines
3.9 KiB
Go

package repo
import (
"context"
"database/sql"
"errors"
"github.com/ente-io/stacktrace"
log "github.com/sirupsen/logrus"
"github.com/ente-io/museum/ente"
"github.com/ente-io/museum/pkg/utils/time"
)
// ObjectCleanupRepository maintains state related to objects that might need to
// be cleaned up.
//
// In particular, all presigned urls start their life as a "temp object" that is
// liable to be cleaned up if not marked as a successful upload by the client.
type ObjectCleanupRepository struct {
DB *sql.DB
}
// AddTempObject persists a given object identifier and it's expirationTime
func (repo *ObjectCleanupRepository) AddTempObject(tempObject ente.TempObject, expirationTime int64) error {
var err error
if tempObject.IsMultipart {
_, err = repo.DB.Exec(`INSERT INTO temp_objects(object_key, expiration_time,upload_id,is_multipart)
VALUES($1, $2, $3, $4)`, tempObject.ObjectKey, expirationTime, tempObject.UploadID, tempObject.IsMultipart)
} else {
_, err = repo.DB.Exec(`INSERT INTO temp_objects(object_key, expiration_time)
VALUES($1, $2)`, tempObject.ObjectKey, expirationTime)
}
return stacktrace.Propagate(err, "")
}
// RemoveTempObjectKey removes a TempObject identified by its key and datacenter
func (repo *ObjectCleanupRepository) RemoveTempObjectKey(ctx context.Context, tx *sql.Tx, objectKey string, dc string) error {
_, err := tx.ExecContext(ctx, `DELETE FROM temp_objects WHERE object_key = $1`, objectKey)
return stacktrace.Propagate(err, "")
}
// GetExpiredObjects returns the list of object keys that have expired
func (repo *ObjectCleanupRepository) GetAndLockExpiredObjects() (*sql.Tx, []ente.TempObject, error) {
tx, err := repo.DB.Begin()
if err != nil {
return nil, nil, stacktrace.Propagate(err, "")
}
rollback := func() {
rerr := tx.Rollback()
if rerr != nil {
log.Errorf("Ignoring error when rolling back transaction: %s", rerr)
}
}
commit := func() {
cerr := tx.Commit()
if cerr != nil {
log.Errorf("Ignoring error when committing transaction: %s", cerr)
}
}
rows, err := tx.Query(`
SELECT object_key, is_multipart, upload_id FROM temp_objects
WHERE expiration_time <= $1
LIMIT 1000
FOR UPDATE SKIP LOCKED
`, time.Microseconds())
if err != nil && errors.Is(err, sql.ErrNoRows) {
commit()
return nil, nil, err
}
if err != nil {
rollback()
return nil, nil, stacktrace.Propagate(err, "")
}
defer rows.Close()
tempObjects := make([]ente.TempObject, 0)
for rows.Next() {
var tempObject ente.TempObject
var uploadID sql.NullString
err := rows.Scan(&tempObject.ObjectKey, &tempObject.IsMultipart, &uploadID)
if err != nil {
rollback()
return nil, nil, stacktrace.Propagate(err, "")
}
if tempObject.IsMultipart {
tempObject.UploadID = uploadID.String
}
tempObjects = append(tempObjects, tempObject)
}
return tx, tempObjects, nil
}
// SetExpiryForTempObject sets the expiration_time for TempObject
func (repo *ObjectCleanupRepository) SetExpiryForTempObject(tx *sql.Tx, tempObject ente.TempObject, expirationTime int64) error {
if tempObject.IsMultipart {
_, err := tx.Exec(`
UPDATE temp_objects SET expiration_time = $1 WHERE object_key = $2 AND upload_id = $3
`, expirationTime, tempObject.ObjectKey, tempObject.UploadID)
return stacktrace.Propagate(err, "")
} else {
_, err := tx.Exec(`
UPDATE temp_objects SET expiration_time = $1 WHERE object_key = $2
`, expirationTime, tempObject.ObjectKey)
return stacktrace.Propagate(err, "")
}
}
// RemoveTempObject removes a given TempObject
func (repo *ObjectCleanupRepository) RemoveTempObject(tx *sql.Tx, tempObject ente.TempObject) error {
if tempObject.IsMultipart {
_, err := tx.Exec(`
DELETE FROM temp_objects WHERE object_key = $1 AND upload_id = $2
`, tempObject.ObjectKey, tempObject.UploadID)
return stacktrace.Propagate(err, "")
} else {
_, err := tx.Exec(`
DELETE FROM temp_objects WHERE object_key = $1
`, tempObject.ObjectKey)
return stacktrace.Propagate(err, "")
}
}