diff --git a/server/cmd/museum/main.go b/server/cmd/museum/main.go index aa50d8e6c..8be76120d 100644 --- a/server/cmd/museum/main.go +++ b/server/cmd/museum/main.go @@ -390,7 +390,13 @@ func main() { timeout.WithHandler(healthCheckHandler.PingDBStats), timeout.WithResponse(timeOutResponse), )) - fileCopyCtrl := &file_copy.FileCopyController{FileController: fileController, CollectionCtrl: collectionController, S3Config: s3Config} + fileCopyCtrl := &file_copy.FileCopyController{ + FileController: fileController, + CollectionCtrl: collectionController, + S3Config: s3Config, + ObjectRepo: objectRepo, + FileRepo: fileRepo, + } fileHandler := &api.FileHandler{ Controller: fileController, diff --git a/server/ente/collection.go b/server/ente/collection.go index 61c46860a..d280f348a 100644 --- a/server/ente/collection.go +++ b/server/ente/collection.go @@ -103,11 +103,23 @@ type AddFilesRequest struct { Files []CollectionFileItem `json:"files" binding:"required"` } -// CopyFileSyncRequest is request object for creating copy of Files, and those copy to the destination collection +// CopyFileSyncRequest is request object for creating copy of CollectionFileItems, and those copy to the destination collection type CopyFileSyncRequest struct { - SrcCollectionID int64 `json:"srcCollectionID" binding:"required"` - DstCollection int64 `json:"dstCollectionID" binding:"required"` - Files []CollectionFileItem `json:"files" binding:"required"` + SrcCollectionID int64 `json:"srcCollectionID" binding:"required"` + DstCollection int64 `json:"dstCollectionID" binding:"required"` + CollectionFileItems []CollectionFileItem `json:"files" binding:"required"` +} + +type CopyResponse struct { + OldToNewFileIDMap map[int64]int64 `json:"oldToNewFileIDMap"` +} + +func (cfr CopyFileSyncRequest) FileIDs() []int64 { + fileIDs := make([]int64, 0, len(cfr.CollectionFileItems)) + for _, file := range cfr.CollectionFileItems { + fileIDs = append(fileIDs, file.ID) + } + return fileIDs } // RemoveFilesRequest represents a request to remove files from a collection diff --git a/server/pkg/controller/collection.go b/server/pkg/controller/collection.go index cdfbb4080..911afc6d7 100644 --- a/server/pkg/controller/collection.go +++ b/server/pkg/controller/collection.go @@ -481,8 +481,8 @@ func (c *CollectionController) IsCopyAllowed(ctx *gin.Context, actorUserID int64 return stacktrace.Propagate(err, "failed to ownership of the dstCollection access") } // verify that all FileIDs exists in the srcCollection - fileIDs := make([]int64, len(req.Files)) - for idx, file := range req.Files { + fileIDs := make([]int64, len(req.CollectionFileItems)) + for idx, file := range req.CollectionFileItems { fileIDs[idx] = file.ID } if err := c.CollectionRepo.VerifyAllFileIDsExistsInCollection(ctx, req.SrcCollectionID, fileIDs); err != nil { diff --git a/server/pkg/controller/file_copy/file_copy.go b/server/pkg/controller/file_copy/file_copy.go index 6f581c586..488ad9ef6 100644 --- a/server/pkg/controller/file_copy/file_copy.go +++ b/server/pkg/controller/file_copy/file_copy.go @@ -1,25 +1,176 @@ package file_copy import ( + "fmt" + "github.com/aws/aws-sdk-go/service/s3" "github.com/ente-io/museum/ente" "github.com/ente-io/museum/pkg/controller" + "github.com/ente-io/museum/pkg/repo" "github.com/ente-io/museum/pkg/utils/auth" "github.com/ente-io/museum/pkg/utils/s3config" + enteTime "github.com/ente-io/museum/pkg/utils/time" + "github.com/gin-contrib/requestid" "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" + "time" ) type FileCopyController struct { S3Config *s3config.S3Config FileController *controller.FileController + FileRepo *repo.FileRepository CollectionCtrl *controller.CollectionController + ObjectRepo *repo.ObjectRepository } -func (fc *FileCopyController) CopyFiles(c *gin.Context, req ente.CopyFileSyncRequest) (interface{}, error) { +type copyS3ObjectReq struct { + SourceS3Object ente.S3ObjectKey + DestObjectKey string +} + +type fileCopyInternal struct { + SourceFile ente.File + DestCollectionID int64 + // The FileKey is encrypted with the destination collection's key + EncryptedFileKey string + EncryptedFileKeyNonce string + FileCopyReq *copyS3ObjectReq + ThumbCopyReq *copyS3ObjectReq +} + +func (fci fileCopyInternal) newFile(ownedID int64) ente.File { + newFileAttributes := fci.SourceFile.File + newFileAttributes.ObjectKey = fci.FileCopyReq.DestObjectKey + newThumbAttributes := fci.SourceFile.Thumbnail + newThumbAttributes.ObjectKey = fci.ThumbCopyReq.DestObjectKey + return ente.File{ + OwnerID: ownedID, + CollectionID: fci.DestCollectionID, + EncryptedKey: fci.EncryptedFileKey, + KeyDecryptionNonce: fci.EncryptedFileKeyNonce, + File: newFileAttributes, + Thumbnail: newThumbAttributes, + Metadata: fci.SourceFile.Metadata, + UpdationTime: enteTime.Microseconds(), + IsDeleted: false, + } +} + +func (fc *FileCopyController) CopyFiles(c *gin.Context, req ente.CopyFileSyncRequest) (*ente.CopyResponse, error) { userID := auth.GetUserID(c.Request.Header) + app := auth.GetApp(c) + logger := logrus.WithFields(logrus.Fields{"req_id": requestid.Get(c), "user_id": userID}) err := fc.CollectionCtrl.IsCopyAllowed(c, userID, req) if err != nil { return nil, err } - return nil, ente.NewInternalError("yet to implement actual copy") + fileIDs := req.FileIDs() + fileToCollectionFileMap := make(map[int64]*ente.CollectionFileItem, len(req.CollectionFileItems)) + for _, collectionFileItem := range req.CollectionFileItems { + fileToCollectionFileMap[collectionFileItem.ID] = &collectionFileItem + } + s3ObjectsToCopy, err := fc.ObjectRepo.GetObjectsForFileIDs(fileIDs) + if err != nil { + return nil, err + } + // note: this assumes that preview existingFilesToCopy for videos are not tracked inside the object_keys table + if len(s3ObjectsToCopy) != 2*len(fileIDs) { + return nil, ente.NewInternalError(fmt.Sprintf("expected %d objects, got %d", 2*len(fileIDs), len(s3ObjectsToCopy))) + } + // todo:(neeraj) if the total size is greater than 1GB, do an early check if the user can upload the existingFilesToCopy + var totalSize int64 + for _, obj := range s3ObjectsToCopy { + totalSize += obj.FileSize + } + logger.WithField("totalSize", totalSize).Info("total size of existingFilesToCopy to copy") + // request the uploadUrls using existing method. This is to ensure that orphan objects are automatically cleaned up + // todo:(neeraj) optimize this method by removing the need for getting a signed url for each object + uploadUrls, err := fc.FileController.GetUploadURLs(c, userID, len(s3ObjectsToCopy), app) + if err != nil { + return nil, err + } + existingFilesToCopy, err := fc.FileRepo.GetFileAttributesForCopy(fileIDs) + if err != nil { + return nil, err + } + if len(existingFilesToCopy) != len(fileIDs) { + return nil, ente.NewInternalError(fmt.Sprintf("expected %d existingFilesToCopy, got %d", len(fileIDs), len(existingFilesToCopy))) + } + fileOGS3Object := make(map[int64]*copyS3ObjectReq) + fileThumbS3Object := make(map[int64]*copyS3ObjectReq) + for i, s3Obj := range s3ObjectsToCopy { + if s3Obj.Type == ente.FILE { + fileOGS3Object[s3Obj.FileID] = ©S3ObjectReq{ + SourceS3Object: s3Obj, + DestObjectKey: uploadUrls[i].ObjectKey, + } + } else if s3Obj.Type == ente.THUMBNAIL { + fileThumbS3Object[s3Obj.FileID] = ©S3ObjectReq{ + SourceS3Object: s3Obj, + DestObjectKey: uploadUrls[i].ObjectKey, + } + } else { + return nil, ente.NewInternalError(fmt.Sprintf("unexpected object type %s", s3Obj.Type)) + } + } + fileCopyList := make([]fileCopyInternal, 0, len(existingFilesToCopy)) + for _, file := range existingFilesToCopy { + fileCopy := fileCopyInternal{ + SourceFile: file, + DestCollectionID: req.DstCollection, + EncryptedFileKey: fileToCollectionFileMap[file.ID].EncryptedKey, + EncryptedFileKeyNonce: fileToCollectionFileMap[file.ID].KeyDecryptionNonce, + FileCopyReq: fileOGS3Object[file.ID], + ThumbCopyReq: fileThumbS3Object[file.ID], + } + fileCopyList = append(fileCopyList, fileCopy) + } + oldToNewFileIDMap := make(map[int64]int64) + for _, fileCopy := range fileCopyList { + newFile, err := fc.createCopy(c, fileCopy, userID, app) + if err != nil { + return nil, err + } + oldToNewFileIDMap[fileCopy.SourceFile.ID] = newFile.ID + } + return &ente.CopyResponse{OldToNewFileIDMap: oldToNewFileIDMap}, nil +} + +func (fc *FileCopyController) createCopy(c *gin.Context, fcInternal fileCopyInternal, userID int64, app ente.App) (*ente.File, error) { + // using HotS3Client copy the File and Thumbnail + s3Client := fc.S3Config.GetHotS3Client() + hotBucket := fc.S3Config.GetHotBucket() + err := copyS3Object(s3Client, hotBucket, fcInternal.FileCopyReq) + if err != nil { + return nil, err + } + err = copyS3Object(s3Client, hotBucket, fcInternal.ThumbCopyReq) + if err != nil { + return nil, err + } + file := fcInternal.newFile(userID) + newFile, err := fc.FileController.Create(c, userID, file, "", app) + if err != nil { + return nil, err + } + return &newFile, nil +} + +// Helper function for S3 object copying. +func copyS3Object(s3Client *s3.S3, bucket *string, req *copyS3ObjectReq) error { + copySource := fmt.Sprintf("%s/%s", *bucket, req.SourceS3Object.ObjectKey) + copyInput := &s3.CopyObjectInput{ + Bucket: bucket, + CopySource: ©Source, + Key: &req.DestObjectKey, + } + start := time.Now() + _, err := s3Client.CopyObject(copyInput) + elapsed := time.Since(start) + if err != nil { + return fmt.Errorf("failed to copy (%s) from %s to %s: %w", req.SourceS3Object.Type, copySource, req.DestObjectKey, err) + } + logrus.WithField("duration", elapsed).WithField("size", req.SourceS3Object.FileSize).Infof("copied (%s) from %s to %s", req.SourceS3Object.Type, copySource, req.DestObjectKey) + return nil } diff --git a/server/pkg/repo/file.go b/server/pkg/repo/file.go index ffa7ea048..eafc7b570 100644 --- a/server/pkg/repo/file.go +++ b/server/pkg/repo/file.go @@ -612,6 +612,24 @@ func (repo *FileRepository) GetFileAttributesFromObjectKey(objectKey string) (en return file, nil } +func (repo *FileRepository) GetFileAttributesForCopy(fileIDs []int64) ([]ente.File, error) { + result := make([]ente.File, 0) + rows, err := repo.DB.Query(`SELECT file_id, owner_id, file_decryption_header, thumbnail_decryption_header, metadata_decryption_header, encrypted_metadata, pub_magic_metadata FROM files WHERE file_id = ANY($1)`, pq.Array(fileIDs)) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + defer rows.Close() + for rows.Next() { + var file ente.File + err := rows.Scan(&file.ID, &file.OwnerID, &file.File.DecryptionHeader, &file.Thumbnail.DecryptionHeader, &file.Metadata.DecryptionHeader, &file.Metadata.EncryptedData, &file.PubicMagicMetadata) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + result = append(result, file) + } + return result, nil +} + // GetUsage gets the Storage usage of a user // Deprecated: GetUsage is deprecated, use UsageRepository.GetUsage func (repo *FileRepository) GetUsage(userID int64) (int64, error) { diff --git a/server/pkg/repo/object.go b/server/pkg/repo/object.go index f0cc5c6cf..fdbbbf52c 100644 --- a/server/pkg/repo/object.go +++ b/server/pkg/repo/object.go @@ -44,6 +44,15 @@ func (repo *ObjectRepository) MarkObjectReplicated(objectKey string, datacenter return result.RowsAffected() } +func (repo *ObjectRepository) GetObjectsForFileIDs(fileIDs []int64) ([]ente.S3ObjectKey, error) { + rows, err := repo.DB.Query(`SELECT file_id, o_type, object_key, size FROM object_keys + WHERE file_id = ANY($1) AND is_deleted=false`, pq.Array(fileIDs)) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + return convertRowsToObjectKeys(rows) +} + // GetObject returns the ente.S3ObjectKey key for a file id and type func (repo *ObjectRepository) GetObject(fileID int64, objType ente.ObjectType) (ente.S3ObjectKey, error) { // todo: handling of deleted objects