[server] Implement support for copying object

This commit is contained in:
Neeraj Gupta 2024-04-19 11:26:14 +05:30
parent c124cde471
commit 816fa330de
6 changed files with 205 additions and 9 deletions

View file

@ -390,7 +390,13 @@ func main() {
timeout.WithHandler(healthCheckHandler.PingDBStats),
timeout.WithResponse(timeOutResponse),
))
fileCopyCtrl := &file_copy.FileCopyController{FileController: fileController, CollectionCtrl: collectionController, S3Config: s3Config}
fileCopyCtrl := &file_copy.FileCopyController{
FileController: fileController,
CollectionCtrl: collectionController,
S3Config: s3Config,
ObjectRepo: objectRepo,
FileRepo: fileRepo,
}
fileHandler := &api.FileHandler{
Controller: fileController,

View file

@ -103,11 +103,23 @@ type AddFilesRequest struct {
Files []CollectionFileItem `json:"files" binding:"required"`
}
// CopyFileSyncRequest is request object for creating copy of Files, and those copy to the destination collection
// CopyFileSyncRequest is request object for creating copy of CollectionFileItems, and those copy to the destination collection
type CopyFileSyncRequest struct {
SrcCollectionID int64 `json:"srcCollectionID" binding:"required"`
DstCollection int64 `json:"dstCollectionID" binding:"required"`
Files []CollectionFileItem `json:"files" binding:"required"`
SrcCollectionID int64 `json:"srcCollectionID" binding:"required"`
DstCollection int64 `json:"dstCollectionID" binding:"required"`
CollectionFileItems []CollectionFileItem `json:"files" binding:"required"`
}
type CopyResponse struct {
OldToNewFileIDMap map[int64]int64 `json:"oldToNewFileIDMap"`
}
func (cfr CopyFileSyncRequest) FileIDs() []int64 {
fileIDs := make([]int64, 0, len(cfr.CollectionFileItems))
for _, file := range cfr.CollectionFileItems {
fileIDs = append(fileIDs, file.ID)
}
return fileIDs
}
// RemoveFilesRequest represents a request to remove files from a collection

View file

@ -481,8 +481,8 @@ func (c *CollectionController) IsCopyAllowed(ctx *gin.Context, actorUserID int64
return stacktrace.Propagate(err, "failed to ownership of the dstCollection access")
}
// verify that all FileIDs exists in the srcCollection
fileIDs := make([]int64, len(req.Files))
for idx, file := range req.Files {
fileIDs := make([]int64, len(req.CollectionFileItems))
for idx, file := range req.CollectionFileItems {
fileIDs[idx] = file.ID
}
if err := c.CollectionRepo.VerifyAllFileIDsExistsInCollection(ctx, req.SrcCollectionID, fileIDs); err != nil {

View file

@ -1,25 +1,176 @@
package file_copy
import (
"fmt"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/ente-io/museum/ente"
"github.com/ente-io/museum/pkg/controller"
"github.com/ente-io/museum/pkg/repo"
"github.com/ente-io/museum/pkg/utils/auth"
"github.com/ente-io/museum/pkg/utils/s3config"
enteTime "github.com/ente-io/museum/pkg/utils/time"
"github.com/gin-contrib/requestid"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"time"
)
type FileCopyController struct {
S3Config *s3config.S3Config
FileController *controller.FileController
FileRepo *repo.FileRepository
CollectionCtrl *controller.CollectionController
ObjectRepo *repo.ObjectRepository
}
func (fc *FileCopyController) CopyFiles(c *gin.Context, req ente.CopyFileSyncRequest) (interface{}, error) {
type copyS3ObjectReq struct {
SourceS3Object ente.S3ObjectKey
DestObjectKey string
}
type fileCopyInternal struct {
SourceFile ente.File
DestCollectionID int64
// The FileKey is encrypted with the destination collection's key
EncryptedFileKey string
EncryptedFileKeyNonce string
FileCopyReq *copyS3ObjectReq
ThumbCopyReq *copyS3ObjectReq
}
func (fci fileCopyInternal) newFile(ownedID int64) ente.File {
newFileAttributes := fci.SourceFile.File
newFileAttributes.ObjectKey = fci.FileCopyReq.DestObjectKey
newThumbAttributes := fci.SourceFile.Thumbnail
newThumbAttributes.ObjectKey = fci.ThumbCopyReq.DestObjectKey
return ente.File{
OwnerID: ownedID,
CollectionID: fci.DestCollectionID,
EncryptedKey: fci.EncryptedFileKey,
KeyDecryptionNonce: fci.EncryptedFileKeyNonce,
File: newFileAttributes,
Thumbnail: newThumbAttributes,
Metadata: fci.SourceFile.Metadata,
UpdationTime: enteTime.Microseconds(),
IsDeleted: false,
}
}
func (fc *FileCopyController) CopyFiles(c *gin.Context, req ente.CopyFileSyncRequest) (*ente.CopyResponse, error) {
userID := auth.GetUserID(c.Request.Header)
app := auth.GetApp(c)
logger := logrus.WithFields(logrus.Fields{"req_id": requestid.Get(c), "user_id": userID})
err := fc.CollectionCtrl.IsCopyAllowed(c, userID, req)
if err != nil {
return nil, err
}
return nil, ente.NewInternalError("yet to implement actual copy")
fileIDs := req.FileIDs()
fileToCollectionFileMap := make(map[int64]*ente.CollectionFileItem, len(req.CollectionFileItems))
for _, collectionFileItem := range req.CollectionFileItems {
fileToCollectionFileMap[collectionFileItem.ID] = &collectionFileItem
}
s3ObjectsToCopy, err := fc.ObjectRepo.GetObjectsForFileIDs(fileIDs)
if err != nil {
return nil, err
}
// note: this assumes that preview existingFilesToCopy for videos are not tracked inside the object_keys table
if len(s3ObjectsToCopy) != 2*len(fileIDs) {
return nil, ente.NewInternalError(fmt.Sprintf("expected %d objects, got %d", 2*len(fileIDs), len(s3ObjectsToCopy)))
}
// todo:(neeraj) if the total size is greater than 1GB, do an early check if the user can upload the existingFilesToCopy
var totalSize int64
for _, obj := range s3ObjectsToCopy {
totalSize += obj.FileSize
}
logger.WithField("totalSize", totalSize).Info("total size of existingFilesToCopy to copy")
// request the uploadUrls using existing method. This is to ensure that orphan objects are automatically cleaned up
// todo:(neeraj) optimize this method by removing the need for getting a signed url for each object
uploadUrls, err := fc.FileController.GetUploadURLs(c, userID, len(s3ObjectsToCopy), app)
if err != nil {
return nil, err
}
existingFilesToCopy, err := fc.FileRepo.GetFileAttributesForCopy(fileIDs)
if err != nil {
return nil, err
}
if len(existingFilesToCopy) != len(fileIDs) {
return nil, ente.NewInternalError(fmt.Sprintf("expected %d existingFilesToCopy, got %d", len(fileIDs), len(existingFilesToCopy)))
}
fileOGS3Object := make(map[int64]*copyS3ObjectReq)
fileThumbS3Object := make(map[int64]*copyS3ObjectReq)
for i, s3Obj := range s3ObjectsToCopy {
if s3Obj.Type == ente.FILE {
fileOGS3Object[s3Obj.FileID] = &copyS3ObjectReq{
SourceS3Object: s3Obj,
DestObjectKey: uploadUrls[i].ObjectKey,
}
} else if s3Obj.Type == ente.THUMBNAIL {
fileThumbS3Object[s3Obj.FileID] = &copyS3ObjectReq{
SourceS3Object: s3Obj,
DestObjectKey: uploadUrls[i].ObjectKey,
}
} else {
return nil, ente.NewInternalError(fmt.Sprintf("unexpected object type %s", s3Obj.Type))
}
}
fileCopyList := make([]fileCopyInternal, 0, len(existingFilesToCopy))
for _, file := range existingFilesToCopy {
fileCopy := fileCopyInternal{
SourceFile: file,
DestCollectionID: req.DstCollection,
EncryptedFileKey: fileToCollectionFileMap[file.ID].EncryptedKey,
EncryptedFileKeyNonce: fileToCollectionFileMap[file.ID].KeyDecryptionNonce,
FileCopyReq: fileOGS3Object[file.ID],
ThumbCopyReq: fileThumbS3Object[file.ID],
}
fileCopyList = append(fileCopyList, fileCopy)
}
oldToNewFileIDMap := make(map[int64]int64)
for _, fileCopy := range fileCopyList {
newFile, err := fc.createCopy(c, fileCopy, userID, app)
if err != nil {
return nil, err
}
oldToNewFileIDMap[fileCopy.SourceFile.ID] = newFile.ID
}
return &ente.CopyResponse{OldToNewFileIDMap: oldToNewFileIDMap}, nil
}
func (fc *FileCopyController) createCopy(c *gin.Context, fcInternal fileCopyInternal, userID int64, app ente.App) (*ente.File, error) {
// using HotS3Client copy the File and Thumbnail
s3Client := fc.S3Config.GetHotS3Client()
hotBucket := fc.S3Config.GetHotBucket()
err := copyS3Object(s3Client, hotBucket, fcInternal.FileCopyReq)
if err != nil {
return nil, err
}
err = copyS3Object(s3Client, hotBucket, fcInternal.ThumbCopyReq)
if err != nil {
return nil, err
}
file := fcInternal.newFile(userID)
newFile, err := fc.FileController.Create(c, userID, file, "", app)
if err != nil {
return nil, err
}
return &newFile, nil
}
// Helper function for S3 object copying.
func copyS3Object(s3Client *s3.S3, bucket *string, req *copyS3ObjectReq) error {
copySource := fmt.Sprintf("%s/%s", *bucket, req.SourceS3Object.ObjectKey)
copyInput := &s3.CopyObjectInput{
Bucket: bucket,
CopySource: &copySource,
Key: &req.DestObjectKey,
}
start := time.Now()
_, err := s3Client.CopyObject(copyInput)
elapsed := time.Since(start)
if err != nil {
return fmt.Errorf("failed to copy (%s) from %s to %s: %w", req.SourceS3Object.Type, copySource, req.DestObjectKey, err)
}
logrus.WithField("duration", elapsed).WithField("size", req.SourceS3Object.FileSize).Infof("copied (%s) from %s to %s", req.SourceS3Object.Type, copySource, req.DestObjectKey)
return nil
}

View file

@ -612,6 +612,24 @@ func (repo *FileRepository) GetFileAttributesFromObjectKey(objectKey string) (en
return file, nil
}
func (repo *FileRepository) GetFileAttributesForCopy(fileIDs []int64) ([]ente.File, error) {
result := make([]ente.File, 0)
rows, err := repo.DB.Query(`SELECT file_id, owner_id, file_decryption_header, thumbnail_decryption_header, metadata_decryption_header, encrypted_metadata, pub_magic_metadata FROM files WHERE file_id = ANY($1)`, pq.Array(fileIDs))
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
defer rows.Close()
for rows.Next() {
var file ente.File
err := rows.Scan(&file.ID, &file.OwnerID, &file.File.DecryptionHeader, &file.Thumbnail.DecryptionHeader, &file.Metadata.DecryptionHeader, &file.Metadata.EncryptedData, &file.PubicMagicMetadata)
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
result = append(result, file)
}
return result, nil
}
// GetUsage gets the Storage usage of a user
// Deprecated: GetUsage is deprecated, use UsageRepository.GetUsage
func (repo *FileRepository) GetUsage(userID int64) (int64, error) {

View file

@ -44,6 +44,15 @@ func (repo *ObjectRepository) MarkObjectReplicated(objectKey string, datacenter
return result.RowsAffected()
}
func (repo *ObjectRepository) GetObjectsForFileIDs(fileIDs []int64) ([]ente.S3ObjectKey, error) {
rows, err := repo.DB.Query(`SELECT file_id, o_type, object_key, size FROM object_keys
WHERE file_id = ANY($1) AND is_deleted=false`, pq.Array(fileIDs))
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
return convertRowsToObjectKeys(rows)
}
// GetObject returns the ente.S3ObjectKey key for a file id and type
func (repo *ObjectRepository) GetObject(fileID int64, objType ente.ObjectType) (ente.S3ObjectKey, error) {
// todo: handling of deleted objects