ente/server/pkg/repo/object.go
2024-03-01 13:37:01 +05:30

205 lines
7.1 KiB
Go

package repo
import (
"context"
"database/sql"
"errors"
"math/rand"
"strconv"
"github.com/ente-io/museum/ente"
"github.com/ente-io/stacktrace"
"github.com/lib/pq"
)
type ObjectRepository struct {
DB *sql.DB
QueueRepo *QueueRepository
}
func (repo *ObjectRepository) GetObjectsMissingInDC(dc string, limit int, random bool) ([]ente.S3ObjectKey, error) {
rows, err := repo.DB.Query(`SELECT file_id, o_type, object_key, size FROM object_keys
WHERE is_deleted = false AND NOT($1 = ANY(datacenters)) limit $2`, dc, limit)
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
files, err := convertRowsToObjectKeys(rows)
if err != nil {
return files, stacktrace.Propagate(err, "")
}
if random && files != nil && len(files) > 0 {
rand.Shuffle(len(files), func(i, j int) { files[i], files[j] = files[j], files[i] })
}
return files, nil
}
func (repo *ObjectRepository) MarkObjectReplicated(objectKey string, datacenter string) (int64, error) {
result, err := repo.DB.Exec(`UPDATE object_keys SET datacenters = datacenters || $1::s3region WHERE object_key = $2`,
datacenter, objectKey)
if err != nil {
return 0, stacktrace.Propagate(err, "")
}
return result.RowsAffected()
}
// GetObject returns the ente.S3ObjectKey key for a file id and type
func (repo *ObjectRepository) GetObject(fileID int64, objType ente.ObjectType) (ente.S3ObjectKey, error) {
// todo: handling of deleted objects
row := repo.DB.QueryRow(`SELECT object_key, size, o_type FROM object_keys WHERE file_id = $1 AND o_type = $2 AND is_deleted=false`,
fileID, objType)
var s3ObjectKey ente.S3ObjectKey
s3ObjectKey.FileID = fileID
err := row.Scan(&s3ObjectKey.ObjectKey, &s3ObjectKey.FileSize, &s3ObjectKey.Type)
return s3ObjectKey, stacktrace.Propagate(err, "")
}
func (repo *ObjectRepository) GetAllFileObjectsByObjectKey(objectKey string) ([]ente.S3ObjectKey, error) {
rows, err := repo.DB.Query(`SELECT file_id, o_type, object_key, size from object_keys where file_id in
(select file_id from object_keys where object_key= $1)
and is_deleted=false`, objectKey)
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
return convertRowsToObjectKeys(rows)
}
func (repo *ObjectRepository) GetDataCentersForObject(objectKey string) ([]string, error) {
rows, err := repo.DB.Query(`select jsonb_array_elements_text(to_jsonb(datacenters)) from object_keys where object_key = $1`, objectKey)
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
defer rows.Close()
datacenters := make([]string, 0)
for rows.Next() {
var dc string
err := rows.Scan(&dc)
if err != nil {
return datacenters, stacktrace.Propagate(err, "")
}
datacenters = append(datacenters, dc)
}
return datacenters, nil
}
func (repo *ObjectRepository) RemoveDataCenterFromObject(objectKey string, datacenter string) error {
_, err := repo.DB.Exec(`UPDATE object_keys SET datacenters = array_remove(datacenters, $1) WHERE object_key = $2`,
datacenter, objectKey)
return stacktrace.Propagate(err, "")
}
// RemoveObjectsForKey removes the keys of a deleted object from our tables
func (repo *ObjectRepository) RemoveObjectsForKey(objectKey string) error {
_, err := repo.DB.Exec(`DELETE FROM object_keys WHERE object_key = $1 AND is_deleted = TRUE`,
objectKey)
return stacktrace.Propagate(err, "")
}
// MarkObjectsAsDeletedForFileIDs marks the object keys corresponding to the given filesIDs as deleted
// The actual deletion happens later when the queue is processed
func (repo *ObjectRepository) MarkObjectsAsDeletedForFileIDs(ctx context.Context, tx *sql.Tx, fileIDs []int64) ([]ente.S3ObjectKey, error) {
rows, err := tx.QueryContext(ctx, `SELECT file_id, o_type, object_key, size FROM object_keys
WHERE file_id = ANY($1) AND is_deleted=false FOR UPDATE`, pq.Array(fileIDs))
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
s3ObjectKeys, err := convertRowsToObjectKeys(rows)
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
var keysToBeDeleted []string
for _, s3ObjectKey := range s3ObjectKeys {
keysToBeDeleted = append(keysToBeDeleted, s3ObjectKey.ObjectKey)
}
err = repo.QueueRepo.AddItems(ctx, tx, RemoveComplianceHoldQueue, keysToBeDeleted)
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
err = repo.QueueRepo.AddItems(ctx, tx, DeleteObjectQueue, keysToBeDeleted)
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
var embeddingsToBeDeleted []string
for _, fileID := range fileIDs {
embeddingsToBeDeleted = append(embeddingsToBeDeleted, strconv.FormatInt(fileID, 10))
}
err = repo.QueueRepo.AddItems(ctx, tx, DeleteEmbeddingsQueue, embeddingsToBeDeleted)
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
_, err = tx.ExecContext(ctx, `UPDATE object_keys SET is_deleted = TRUE WHERE file_id = ANY($1)`, pq.Array(fileIDs))
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
return s3ObjectKeys, nil
}
func convertRowsToObjectKeys(rows *sql.Rows) ([]ente.S3ObjectKey, error) {
defer rows.Close()
fileObjectKeys := make([]ente.S3ObjectKey, 0)
for rows.Next() {
var fileObjectKey ente.S3ObjectKey
err := rows.Scan(&fileObjectKey.FileID, &fileObjectKey.Type, &fileObjectKey.ObjectKey, &fileObjectKey.FileSize)
if err != nil {
return fileObjectKeys, stacktrace.Propagate(err, "")
}
fileObjectKeys = append(fileObjectKeys, fileObjectKey)
}
return fileObjectKeys, nil
}
// DoesObjectExist returns the true if there is an entry for the object key.
func (repo *ObjectRepository) DoesObjectExist(tx *sql.Tx, objectKey string) (bool, error) {
var exists bool
err := tx.QueryRow(
`SELECT EXISTS (SELECT 1 FROM object_keys WHERE object_key = $1)`,
objectKey).Scan(&exists)
return exists, stacktrace.Propagate(err, "")
}
// DoesObjectOrTempObjectExist returns the true if there is an entry for the object key in
// either the object_keys or in temp_objects table.
func (repo *ObjectRepository) DoesObjectOrTempObjectExist(objectKey string) (bool, error) {
var exists bool
err := repo.DB.QueryRow(
`SELECT (EXISTS (SELECT 1 FROM object_keys WHERE object_key = $1) OR
EXISTS (SELECT 1 FROM temp_objects WHERE object_key = $1))`,
objectKey).Scan(&exists)
return exists, stacktrace.Propagate(err, "")
}
// GetObjectState returns various bits of information about an object that are
// useful in pre-flight checks during replication.
//
// Unknown objects (i.e. objectKeys for which there are no entries) are
// considered as deleted.
func (repo *ObjectRepository) GetObjectState(tx *sql.Tx, objectKey string) (ObjectState ente.ObjectState, err error) {
row := tx.QueryRow(`
SELECT ok.is_deleted, u.encrypted_email IS NULL AS is_user_deleted, ok.size
FROM object_keys ok
JOIN files f ON ok.file_id = f.file_id
JOIN users u ON f.owner_id = u.user_id
where object_key = $1
`, objectKey)
var os ente.ObjectState
err = row.Scan(&os.IsFileDeleted, &os.IsUserDeleted, &os.Size)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
os.IsFileDeleted = true
os.IsUserDeleted = true
return os, nil
}
return os, stacktrace.Propagate(err, "Failed to fetch object state")
}
return os, nil
}