From be446651289d2d3e38b045a7be5da8e1525cbac8 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Mon, 13 May 2024 16:33:36 +0530 Subject: [PATCH 01/30] [server] Refactor embedding fetch --- server/pkg/controller/embedding/controller.go | 55 ++++++++++++++----- 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index bf317ccfe..b14f5d893 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -309,7 +309,7 @@ func (c *Controller) getEmbeddingObjectsParallel(objectKeys []string) ([]ente.Em defer wg.Done() defer func() { <-globalDiffFetchSemaphore }() // Release back to global semaphore - obj, err := c.getEmbeddingObject(context.Background(), objectKey, downloader) + obj, err := c.getEmbeddingObject(context.Background(), objectKey, downloader, nil) if err != nil { errs = append(errs, err) log.Error("error fetching embedding object: "+objectKey, err) @@ -346,9 +346,7 @@ func (c *Controller) getEmbeddingObjectsParallelV2(userID int64, dbEmbeddingRows defer wg.Done() defer func() { <-globalFileFetchSemaphore }() // Release back to global semaphore objectKey := c.getObjectKey(userID, dbEmbeddingRow.FileID, dbEmbeddingRow.Model) - ctx, cancel := context.WithTimeout(context.Background(), embeddingFetchTimeout) - defer cancel() - obj, err := c.getEmbeddingObjectWithRetries(ctx, objectKey, downloader, 0) + obj, err := c.getEmbeddingObject(context.Background(), objectKey, downloader, nil) if err != nil { log.Error("error fetching embedding object: "+objectKey, err) embeddingObjects[i] = embeddingObjectResult{ @@ -368,11 +366,45 @@ func (c *Controller) getEmbeddingObjectsParallelV2(userID int64, dbEmbeddingRows return embeddingObjects, nil } -func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, downloader *s3manager.Downloader) (ente.EmbeddingObject, error) { - return c.getEmbeddingObjectWithRetries(ctx, objectKey, downloader, 3) +type getOptions struct { + RetryCount int + FetchTimeOut gTime.Duration } -func (c *Controller) getEmbeddingObjectWithRetries(ctx context.Context, objectKey string, downloader *s3manager.Downloader, retryCount int) (ente.EmbeddingObject, error) { +func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, downloader *s3manager.Downloader, opt *getOptions) (ente.EmbeddingObject, error) { + if opt == nil { + opt = &getOptions{ + RetryCount: 3, + FetchTimeOut: embeddingFetchTimeout, + } + } + ctxLogger := log.WithField("objectKey", objectKey) + totalAttempts := opt.RetryCount + 1 + for i := 0; i < totalAttempts; i++ { + // Create a new context with a timeout for each fetch + fetchCtx, cancel := context.WithTimeout(ctx, opt.FetchTimeOut) + select { + case <-ctx.Done(): + cancel() + return ente.EmbeddingObject{}, stacktrace.Propagate(ctx.Err(), "") + default: + obj, err := c.downloadObject(fetchCtx, objectKey, downloader) + cancel() // Ensure cancel is called to release resources + if err == nil { + return obj, nil + } + // Check if the error is due to context timeout or cancellation + if fetchCtx.Err() != nil { + ctxLogger.Error("Fetch timed out or cancelled: ", fetchCtx.Err()) + } else { + ctxLogger.Error("Failed to fetch object: ", err) + } + } + } + return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("failed to fetch object"), "") +} + +func (c *Controller) downloadObject(ctx context.Context, objectKey string, downloader *s3manager.Downloader) (ente.EmbeddingObject, error) { var obj ente.EmbeddingObject buff := &aws.WriteAtBuffer{} _, err := downloader.DownloadWithContext(ctx, buff, &s3.GetObjectInput{ @@ -380,16 +412,11 @@ func (c *Controller) getEmbeddingObjectWithRetries(ctx context.Context, objectKe Key: &objectKey, }) if err != nil { - log.Error(err) - if retryCount > 0 { - return c.getEmbeddingObjectWithRetries(ctx, objectKey, downloader, retryCount-1) - } - return obj, stacktrace.Propagate(err, "") + return obj, stacktrace.Propagate(err, "downloadFailed") } err = json.Unmarshal(buff.Bytes(), &obj) if err != nil { - log.Error(err) - return obj, stacktrace.Propagate(err, "") + return obj, stacktrace.Propagate(err, "unmarshal failed") } return obj, nil } From acd61fc084117f9cc5a0cb60745d380824891f30 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Mon, 13 May 2024 16:33:48 +0530 Subject: [PATCH 02/30] Fixed typo --- server/pkg/controller/embedding/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index b14f5d893..134014d13 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -146,7 +146,7 @@ func (c *Controller) GetFilesEmbedding(ctx *gin.Context, req ente.GetFilesEmbedd embeddingsWithData := make([]ente.Embedding, 0) noEmbeddingFileIds := make([]int64, 0) dbFileIds := make([]int64, 0) - // fileIDs that were indexed but they don't contain any embedding information + // fileIDs that were indexed, but they don't contain any embedding information for i := range userFileEmbeddings { dbFileIds = append(dbFileIds, userFileEmbeddings[i].FileID) if userFileEmbeddings[i].Size != nil && *userFileEmbeddings[i].Size < minEmbeddingDataSize { From bce3f40a16ada9eaa05fe58ff6a72920007ed195 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Tue, 14 May 2024 11:44:41 +0530 Subject: [PATCH 03/30] Avoid retry for 404 error --- server/pkg/controller/embedding/controller.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index 134014d13..e8d6c347a 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/ente-io/museum/pkg/utils/array" "strconv" "sync" @@ -397,6 +398,13 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d if fetchCtx.Err() != nil { ctxLogger.Error("Fetch timed out or cancelled: ", fetchCtx.Err()) } else { + // check if the error is due to object not found + if s3Err, ok := err.(awserr.Error); ok { + if s3Err.Code() == s3.ErrCodeNoSuchKey { + ctxLogger.Warn("Object not found: ", s3Err) + return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "") + } + } ctxLogger.Error("Failed to fetch object: ", err) } } From 3e7b16288f84c203d1c16af8d4b91405877de3e5 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Tue, 14 May 2024 14:03:35 +0530 Subject: [PATCH 04/30] Add support for configuring diff bucket for embeddings --- server/cmd/museum/main.go | 2 +- server/configurations/local.yaml | 10 +++++++ server/pkg/controller/embedding/controller.go | 30 +++++++++++++++---- server/pkg/utils/s3config/s3config.go | 25 ++++++++++++++-- 4 files changed, 58 insertions(+), 9 deletions(-) diff --git a/server/cmd/museum/main.go b/server/cmd/museum/main.go index 84c34189d..8ccb43cc0 100644 --- a/server/cmd/museum/main.go +++ b/server/cmd/museum/main.go @@ -678,7 +678,7 @@ func main() { pushHandler := &api.PushHandler{PushController: pushController} privateAPI.POST("/push/token", pushHandler.AddToken) - embeddingController := &embeddingCtrl.Controller{Repo: embeddingRepo, AccessCtrl: accessCtrl, ObjectCleanupController: objectCleanupController, S3Config: s3Config, FileRepo: fileRepo, CollectionRepo: collectionRepo, QueueRepo: queueRepo, TaskLockingRepo: taskLockingRepo, HostName: hostName} + embeddingController := embeddingCtrl.New(embeddingRepo, accessCtrl, objectCleanupController, s3Config, queueRepo, taskLockingRepo, fileRepo, collectionRepo, hostName) embeddingHandler := &api.EmbeddingHandler{Controller: embeddingController} privateAPI.PUT("/embeddings", embeddingHandler.InsertOrUpdate) diff --git a/server/configurations/local.yaml b/server/configurations/local.yaml index 196c56f1f..a3cb71c9d 100644 --- a/server/configurations/local.yaml +++ b/server/configurations/local.yaml @@ -125,6 +125,16 @@ s3: endpoint: region: bucket: + wasabi-eu-central-2-embeddings: + key: + secret: + endpoint: + region: + bucket: + # Embeddings bucket is used for storing embeddings and other derived data from a file. + # By default, it is the same as the hot storage bucket. + # embeddings-bucket: wasabi-eu-central-2-embeddings + # If true, enable some workarounds to allow us to use a local minio instance # for object storage. # diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index e8d6c347a..0212fb0de 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -46,6 +46,24 @@ type Controller struct { CollectionRepo *repo.CollectionRepository HostName string cleanupCronRunning bool + embeddingS3Client *s3.S3 + embeddingBucket *string +} + +func New(repo *embedding.Repository, accessCtrl access.Controller, objectCleanupController *controller.ObjectCleanupController, s3Config *s3config.S3Config, queueRepo *repo.QueueRepository, taskLockingRepo *repo.TaskLockRepository, fileRepo *repo.FileRepository, collectionRepo *repo.CollectionRepository, hostName string) *Controller { + return &Controller{ + Repo: repo, + AccessCtrl: accessCtrl, + ObjectCleanupController: objectCleanupController, + S3Config: s3Config, + QueueRepo: queueRepo, + TaskLockingRepo: taskLockingRepo, + FileRepo: fileRepo, + CollectionRepo: collectionRepo, + HostName: hostName, + embeddingS3Client: s3Config.GetEmbeddingsS3Client(), + embeddingBucket: s3Config.GetEmbeddingsBucket(), + } } func (c *Controller) InsertOrUpdate(ctx *gin.Context, req ente.InsertOrUpdateEmbeddingRequest) (*ente.Embedding, error) { @@ -245,7 +263,7 @@ func (c *Controller) deleteEmbedding(qItem repo.QueueItem) { } prefix := c.getEmbeddingObjectPrefix(ownerID, fileID) - err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetHotDataCenter()) + err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetEmbeddingsDataCenter()) if err != nil { ctxLogger.WithError(err).Error("Failed to delete all objects") return @@ -277,9 +295,9 @@ func (c *Controller) getEmbeddingObjectPrefix(userID int64, fileID int64) string // uploadObject uploads the embedding object to the object store and returns the object size func (c *Controller) uploadObject(obj ente.EmbeddingObject, key string) (int, error) { embeddingObj, _ := json.Marshal(obj) - uploader := s3manager.NewUploaderWithClient(c.S3Config.GetHotS3Client()) + uploader := s3manager.NewUploaderWithClient(c.embeddingS3Client) up := s3manager.UploadInput{ - Bucket: c.S3Config.GetHotBucket(), + Bucket: c.embeddingBucket, Key: &key, Body: bytes.NewReader(embeddingObj), } @@ -301,7 +319,7 @@ func (c *Controller) getEmbeddingObjectsParallel(objectKeys []string) ([]ente.Em var wg sync.WaitGroup var errs []error embeddingObjects := make([]ente.EmbeddingObject, len(objectKeys)) - downloader := s3manager.NewDownloaderWithClient(c.S3Config.GetHotS3Client()) + downloader := s3manager.NewDownloaderWithClient(c.embeddingS3Client) for i, objectKey := range objectKeys { wg.Add(1) @@ -338,7 +356,7 @@ type embeddingObjectResult struct { func (c *Controller) getEmbeddingObjectsParallelV2(userID int64, dbEmbeddingRows []ente.Embedding) ([]embeddingObjectResult, error) { var wg sync.WaitGroup embeddingObjects := make([]embeddingObjectResult, len(dbEmbeddingRows)) - downloader := s3manager.NewDownloaderWithClient(c.S3Config.GetHotS3Client()) + downloader := s3manager.NewDownloaderWithClient(c.embeddingS3Client) for i, dbEmbeddingRow := range dbEmbeddingRows { wg.Add(1) @@ -416,7 +434,7 @@ func (c *Controller) downloadObject(ctx context.Context, objectKey string, downl var obj ente.EmbeddingObject buff := &aws.WriteAtBuffer{} _, err := downloader.DownloadWithContext(ctx, buff, &s3.GetObjectInput{ - Bucket: c.S3Config.GetHotBucket(), + Bucket: c.embeddingBucket, Key: &objectKey, }) if err != nil { diff --git a/server/pkg/utils/s3config/s3config.go b/server/pkg/utils/s3config/s3config.go index 9b273bd61..02a7fbd26 100644 --- a/server/pkg/utils/s3config/s3config.go +++ b/server/pkg/utils/s3config/s3config.go @@ -28,6 +28,8 @@ type S3Config struct { hotDC string // Secondary (hot) data center secondaryHotDC string + // Bucket for storing ml embeddings & preview files + embeddingsDC string // A map from data centers to S3 configurations s3Configs map[string]*aws.Config // A map from data centers to pre-created S3 clients @@ -71,6 +73,7 @@ var ( dcWasabiEuropeCentralDeprecated string = "wasabi-eu-central-2" dcWasabiEuropeCentral_v3 string = "wasabi-eu-central-2-v3" dcSCWEuropeFrance_v3 string = "scw-eu-fr-v3" + dcWasabiEuropeCentral string = "wasabi-eu-central-2-embeddings" ) // Number of days that the wasabi bucket is configured to retain objects. @@ -86,9 +89,9 @@ func NewS3Config() *S3Config { } func (config *S3Config) initialize() { - dcs := [5]string{ + dcs := [6]string{ dcB2EuropeCentral, dcSCWEuropeFranceLockedDeprecated, dcWasabiEuropeCentralDeprecated, - dcWasabiEuropeCentral_v3, dcSCWEuropeFrance_v3} + dcWasabiEuropeCentral_v3, dcSCWEuropeFrance_v3, dcWasabiEuropeCentral} config.hotDC = dcB2EuropeCentral config.secondaryHotDC = dcWasabiEuropeCentral_v3 @@ -99,6 +102,12 @@ func (config *S3Config) initialize() { config.secondaryHotDC = hs2 log.Infof("Hot storage: %s (secondary: %s)", hs1, hs2) } + config.embeddingsDC = config.hotDC + embeddingsDC := viper.GetString("s3.embeddings-bucket") + if embeddingsDC != "" && array.StringInList(embeddingsDC, dcs[:]) { + config.embeddingsDC = embeddingsDC + log.Infof("Embeddings bucket: %s", embeddingsDC) + } config.buckets = make(map[string]string) config.s3Configs = make(map[string]*aws.Config) @@ -171,6 +180,18 @@ func (config *S3Config) GetHotS3Client() *s3.S3 { return &s3Client } +func (config *S3Config) GetEmbeddingsDataCenter() string { + return config.embeddingsDC +} +func (config *S3Config) GetEmbeddingsBucket() *string { + return config.GetBucket(config.embeddingsDC) +} + +func (config *S3Config) GetEmbeddingsS3Client() *s3.S3 { + s3Client := config.GetS3Client(config.embeddingsDC) + return &s3Client +} + // Return the name of the hot Backblaze data center func (config *S3Config) GetHotBackblazeDC() string { return dcB2EuropeCentral From 18d1bb60ca1cec2100962e14f6edeaf5a0f41054 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Tue, 14 May 2024 14:18:50 +0530 Subject: [PATCH 05/30] Delete embeddings from hot bucket if different from embedding bucket --- server/pkg/controller/embedding/controller.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index 0212fb0de..e955a8b66 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -268,6 +268,14 @@ func (c *Controller) deleteEmbedding(qItem repo.QueueItem) { ctxLogger.WithError(err).Error("Failed to delete all objects") return } + // if Embeddings DC is different from hot DC, delete from hot DC as well + if c.S3Config.GetEmbeddingsDataCenter() != c.S3Config.GetHotDataCenter() { + err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetHotDataCenter()) + if err != nil { + ctxLogger.WithError(err).Error("Failed to delete all objects from hot DC") + return + } + } err = c.Repo.Delete(fileID) if err != nil { From 74a6e32538cd5f2b71eca6e7fa7371a8a3e8d9aa Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Tue, 14 May 2024 14:37:24 +0530 Subject: [PATCH 06/30] Fix error check for no-object found --- server/pkg/controller/embedding/controller.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index e955a8b66..70c24b919 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -421,11 +421,11 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d return obj, nil } // Check if the error is due to context timeout or cancellation - if fetchCtx.Err() != nil { + if err == nil && fetchCtx.Err() != nil { ctxLogger.Error("Fetch timed out or cancelled: ", fetchCtx.Err()) } else { // check if the error is due to object not found - if s3Err, ok := err.(awserr.Error); ok { + if s3Err, ok := errors.Unwrap(err).(awserr.Error); ok { if s3Err.Code() == s3.ErrCodeNoSuchKey { ctxLogger.Warn("Object not found: ", s3Err) return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "") @@ -446,7 +446,7 @@ func (c *Controller) downloadObject(ctx context.Context, objectKey string, downl Key: &objectKey, }) if err != nil { - return obj, stacktrace.Propagate(err, "downloadFailed") + return obj, err } err = json.Unmarshal(buff.Bytes(), &obj) if err != nil { From 87b087f295f8dd5393f253dce94f831eb07f137a Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Tue, 14 May 2024 14:52:07 +0530 Subject: [PATCH 07/30] Minor refactor --- server/pkg/controller/embedding/controller.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index 70c24b919..25edbdaa6 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -415,7 +415,7 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d cancel() return ente.EmbeddingObject{}, stacktrace.Propagate(ctx.Err(), "") default: - obj, err := c.downloadObject(fetchCtx, objectKey, downloader) + obj, err := c.downloadObject(fetchCtx, objectKey, downloader, c.embeddingBucket) cancel() // Ensure cancel is called to release resources if err == nil { return obj, nil @@ -438,11 +438,11 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("failed to fetch object"), "") } -func (c *Controller) downloadObject(ctx context.Context, objectKey string, downloader *s3manager.Downloader) (ente.EmbeddingObject, error) { +func (c *Controller) downloadObject(ctx context.Context, objectKey string, downloader *s3manager.Downloader, bucket *string) (ente.EmbeddingObject, error) { var obj ente.EmbeddingObject buff := &aws.WriteAtBuffer{} _, err := downloader.DownloadWithContext(ctx, buff, &s3.GetObjectInput{ - Bucket: c.embeddingBucket, + Bucket: bucket, Key: &objectKey, }) if err != nil { From 835a773f13acf80b91fe131df02a4af65ab3998e Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Tue, 14 May 2024 17:00:16 +0530 Subject: [PATCH 08/30] Add fallback logic to read embedding from hot bucket --- server/pkg/controller/embedding/controller.go | 87 +++++++++++++------ 1 file changed, 60 insertions(+), 27 deletions(-) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index 25edbdaa6..2df2d1b56 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -36,33 +36,35 @@ const ( ) type Controller struct { - Repo *embedding.Repository - AccessCtrl access.Controller - ObjectCleanupController *controller.ObjectCleanupController - S3Config *s3config.S3Config - QueueRepo *repo.QueueRepository - TaskLockingRepo *repo.TaskLockRepository - FileRepo *repo.FileRepository - CollectionRepo *repo.CollectionRepository - HostName string - cleanupCronRunning bool - embeddingS3Client *s3.S3 - embeddingBucket *string + Repo *embedding.Repository + AccessCtrl access.Controller + ObjectCleanupController *controller.ObjectCleanupController + S3Config *s3config.S3Config + QueueRepo *repo.QueueRepository + TaskLockingRepo *repo.TaskLockRepository + FileRepo *repo.FileRepository + CollectionRepo *repo.CollectionRepository + HostName string + cleanupCronRunning bool + embeddingS3Client *s3.S3 + embeddingBucket *string + areEmbeddingAndHotBucketSame bool } func New(repo *embedding.Repository, accessCtrl access.Controller, objectCleanupController *controller.ObjectCleanupController, s3Config *s3config.S3Config, queueRepo *repo.QueueRepository, taskLockingRepo *repo.TaskLockRepository, fileRepo *repo.FileRepository, collectionRepo *repo.CollectionRepository, hostName string) *Controller { return &Controller{ - Repo: repo, - AccessCtrl: accessCtrl, - ObjectCleanupController: objectCleanupController, - S3Config: s3Config, - QueueRepo: queueRepo, - TaskLockingRepo: taskLockingRepo, - FileRepo: fileRepo, - CollectionRepo: collectionRepo, - HostName: hostName, - embeddingS3Client: s3Config.GetEmbeddingsS3Client(), - embeddingBucket: s3Config.GetEmbeddingsBucket(), + Repo: repo, + AccessCtrl: accessCtrl, + ObjectCleanupController: objectCleanupController, + S3Config: s3Config, + QueueRepo: queueRepo, + TaskLockingRepo: taskLockingRepo, + FileRepo: fileRepo, + CollectionRepo: collectionRepo, + HostName: hostName, + embeddingS3Client: s3Config.GetEmbeddingsS3Client(), + embeddingBucket: s3Config.GetEmbeddingsBucket(), + areEmbeddingAndHotBucketSame: s3Config.GetEmbeddingsBucket() == s3Config.GetHotBucket(), } } @@ -269,7 +271,7 @@ func (c *Controller) deleteEmbedding(qItem repo.QueueItem) { return } // if Embeddings DC is different from hot DC, delete from hot DC as well - if c.S3Config.GetEmbeddingsDataCenter() != c.S3Config.GetHotDataCenter() { + if !c.areEmbeddingAndHotBucketSame { err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetHotDataCenter()) if err != nil { ctxLogger.WithError(err).Error("Failed to delete all objects from hot DC") @@ -425,10 +427,21 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d ctxLogger.Error("Fetch timed out or cancelled: ", fetchCtx.Err()) } else { // check if the error is due to object not found - if s3Err, ok := errors.Unwrap(err).(awserr.Error); ok { + if s3Err, ok := err.(awserr.RequestFailure); ok { if s3Err.Code() == s3.ErrCodeNoSuchKey { - ctxLogger.Warn("Object not found: ", s3Err) - return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "") + if c.areEmbeddingAndHotBucketSame { + ctxLogger.Error("Object not found: ", s3Err) + } else { + // If embedding and hot bucket are different, try to copy from hot bucket + copyEmbeddingObject, err := c.copyEmbeddingObject(ctx, objectKey) + if err == nil { + ctxLogger.Info("Got the object from hot bucket object") + return *copyEmbeddingObject, nil + } else { + ctxLogger.WithError(err).Error("Failed to copy from hot bucket object") + } + return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "") + } } } ctxLogger.Error("Failed to fetch object: ", err) @@ -455,6 +468,26 @@ func (c *Controller) downloadObject(ctx context.Context, objectKey string, downl return obj, nil } +// download the embedding object from hot bucket and upload to embeddings bucket +func (c *Controller) copyEmbeddingObject(ctx context.Context, objectKey string) (*ente.EmbeddingObject, error) { + if c.embeddingBucket == c.S3Config.GetHotBucket() { + return nil, stacktrace.Propagate(errors.New("embedding bucket and hot bucket are same"), "") + } + downloader := s3manager.NewDownloaderWithClient(c.S3Config.GetHotS3Client()) + obj, err := c.downloadObject(ctx, objectKey, downloader, c.S3Config.GetHotBucket()) + if err != nil { + return nil, stacktrace.Propagate(err, "failed to download from hot bucket") + } + go func() { + _, err = c.uploadObject(obj, objectKey) + if err != nil { + log.WithField("object", objectKey).Error("Failed to copy to embeddings bucket: ", err) + } + }() + + return &obj, nil +} + func (c *Controller) _validateGetFileEmbeddingsRequest(ctx *gin.Context, userID int64, req ente.GetFilesEmbeddingRequest) error { if req.Model == "" { return ente.NewBadRequestWithMessage("model is required") From cc457eca98000dc94c595cc012b7a35eb2765a3c Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Wed, 15 May 2024 14:25:28 +0530 Subject: [PATCH 09/30] Add log when embedding is fetched after retry --- server/pkg/controller/embedding/controller.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index 2df2d1b56..5deb32d9a 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -420,6 +420,9 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d obj, err := c.downloadObject(fetchCtx, objectKey, downloader, c.embeddingBucket) cancel() // Ensure cancel is called to release resources if err == nil { + if i > 0 { + ctxLogger.Infof("Fetched object after %d attempts", i) + } return obj, nil } // Check if the error is due to context timeout or cancellation From 7eabea38847171cbd89620401e3ffd8ce827b63f Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Wed, 15 May 2024 16:37:23 +0530 Subject: [PATCH 10/30] Rename embedding dc to derived storage --- server/configurations/local.yaml | 6 +- server/pkg/controller/embedding/controller.go | 70 +++++++++---------- server/pkg/utils/s3config/s3config.go | 22 +++--- 3 files changed, 49 insertions(+), 49 deletions(-) diff --git a/server/configurations/local.yaml b/server/configurations/local.yaml index a3cb71c9d..87502c271 100644 --- a/server/configurations/local.yaml +++ b/server/configurations/local.yaml @@ -125,15 +125,15 @@ s3: endpoint: region: bucket: - wasabi-eu-central-2-embeddings: + wasabi-eu-central-2-derived: key: secret: endpoint: region: bucket: - # Embeddings bucket is used for storing embeddings and other derived data from a file. + # Derived storage bucket is used for storing derived data like embeddings, preview etc. # By default, it is the same as the hot storage bucket. - # embeddings-bucket: wasabi-eu-central-2-embeddings + # derived-storage: wasabi-eu-central-2-derived # If true, enable some workarounds to allow us to use a local minio instance # for object storage. diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index 5deb32d9a..81cc059bf 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -36,35 +36,35 @@ const ( ) type Controller struct { - Repo *embedding.Repository - AccessCtrl access.Controller - ObjectCleanupController *controller.ObjectCleanupController - S3Config *s3config.S3Config - QueueRepo *repo.QueueRepository - TaskLockingRepo *repo.TaskLockRepository - FileRepo *repo.FileRepository - CollectionRepo *repo.CollectionRepository - HostName string - cleanupCronRunning bool - embeddingS3Client *s3.S3 - embeddingBucket *string - areEmbeddingAndHotBucketSame bool + Repo *embedding.Repository + AccessCtrl access.Controller + ObjectCleanupController *controller.ObjectCleanupController + S3Config *s3config.S3Config + QueueRepo *repo.QueueRepository + TaskLockingRepo *repo.TaskLockRepository + FileRepo *repo.FileRepository + CollectionRepo *repo.CollectionRepository + HostName string + cleanupCronRunning bool + derivedStorageS3Client *s3.S3 + derivedStorageBucket *string + areDerivedAndHotBucketSame bool } func New(repo *embedding.Repository, accessCtrl access.Controller, objectCleanupController *controller.ObjectCleanupController, s3Config *s3config.S3Config, queueRepo *repo.QueueRepository, taskLockingRepo *repo.TaskLockRepository, fileRepo *repo.FileRepository, collectionRepo *repo.CollectionRepository, hostName string) *Controller { return &Controller{ - Repo: repo, - AccessCtrl: accessCtrl, - ObjectCleanupController: objectCleanupController, - S3Config: s3Config, - QueueRepo: queueRepo, - TaskLockingRepo: taskLockingRepo, - FileRepo: fileRepo, - CollectionRepo: collectionRepo, - HostName: hostName, - embeddingS3Client: s3Config.GetEmbeddingsS3Client(), - embeddingBucket: s3Config.GetEmbeddingsBucket(), - areEmbeddingAndHotBucketSame: s3Config.GetEmbeddingsBucket() == s3Config.GetHotBucket(), + Repo: repo, + AccessCtrl: accessCtrl, + ObjectCleanupController: objectCleanupController, + S3Config: s3Config, + QueueRepo: queueRepo, + TaskLockingRepo: taskLockingRepo, + FileRepo: fileRepo, + CollectionRepo: collectionRepo, + HostName: hostName, + derivedStorageS3Client: s3Config.GetDerivedStorageS3Client(), + derivedStorageBucket: s3Config.GetDerivedStorageBucket(), + areDerivedAndHotBucketSame: s3Config.GetDerivedStorageBucket() == s3Config.GetHotBucket(), } } @@ -265,13 +265,13 @@ func (c *Controller) deleteEmbedding(qItem repo.QueueItem) { } prefix := c.getEmbeddingObjectPrefix(ownerID, fileID) - err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetEmbeddingsDataCenter()) + err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetDerivedStorageDataCenter()) if err != nil { ctxLogger.WithError(err).Error("Failed to delete all objects") return } // if Embeddings DC is different from hot DC, delete from hot DC as well - if !c.areEmbeddingAndHotBucketSame { + if !c.areDerivedAndHotBucketSame { err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetHotDataCenter()) if err != nil { ctxLogger.WithError(err).Error("Failed to delete all objects from hot DC") @@ -305,9 +305,9 @@ func (c *Controller) getEmbeddingObjectPrefix(userID int64, fileID int64) string // uploadObject uploads the embedding object to the object store and returns the object size func (c *Controller) uploadObject(obj ente.EmbeddingObject, key string) (int, error) { embeddingObj, _ := json.Marshal(obj) - uploader := s3manager.NewUploaderWithClient(c.embeddingS3Client) + uploader := s3manager.NewUploaderWithClient(c.derivedStorageS3Client) up := s3manager.UploadInput{ - Bucket: c.embeddingBucket, + Bucket: c.derivedStorageBucket, Key: &key, Body: bytes.NewReader(embeddingObj), } @@ -329,7 +329,7 @@ func (c *Controller) getEmbeddingObjectsParallel(objectKeys []string) ([]ente.Em var wg sync.WaitGroup var errs []error embeddingObjects := make([]ente.EmbeddingObject, len(objectKeys)) - downloader := s3manager.NewDownloaderWithClient(c.embeddingS3Client) + downloader := s3manager.NewDownloaderWithClient(c.derivedStorageS3Client) for i, objectKey := range objectKeys { wg.Add(1) @@ -366,7 +366,7 @@ type embeddingObjectResult struct { func (c *Controller) getEmbeddingObjectsParallelV2(userID int64, dbEmbeddingRows []ente.Embedding) ([]embeddingObjectResult, error) { var wg sync.WaitGroup embeddingObjects := make([]embeddingObjectResult, len(dbEmbeddingRows)) - downloader := s3manager.NewDownloaderWithClient(c.embeddingS3Client) + downloader := s3manager.NewDownloaderWithClient(c.derivedStorageS3Client) for i, dbEmbeddingRow := range dbEmbeddingRows { wg.Add(1) @@ -417,7 +417,7 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d cancel() return ente.EmbeddingObject{}, stacktrace.Propagate(ctx.Err(), "") default: - obj, err := c.downloadObject(fetchCtx, objectKey, downloader, c.embeddingBucket) + obj, err := c.downloadObject(fetchCtx, objectKey, downloader, c.derivedStorageBucket) cancel() // Ensure cancel is called to release resources if err == nil { if i > 0 { @@ -432,10 +432,10 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d // check if the error is due to object not found if s3Err, ok := err.(awserr.RequestFailure); ok { if s3Err.Code() == s3.ErrCodeNoSuchKey { - if c.areEmbeddingAndHotBucketSame { + if c.areDerivedAndHotBucketSame { ctxLogger.Error("Object not found: ", s3Err) } else { - // If embedding and hot bucket are different, try to copy from hot bucket + // If derived and hot bucket are different, try to copy from hot bucket copyEmbeddingObject, err := c.copyEmbeddingObject(ctx, objectKey) if err == nil { ctxLogger.Info("Got the object from hot bucket object") @@ -473,7 +473,7 @@ func (c *Controller) downloadObject(ctx context.Context, objectKey string, downl // download the embedding object from hot bucket and upload to embeddings bucket func (c *Controller) copyEmbeddingObject(ctx context.Context, objectKey string) (*ente.EmbeddingObject, error) { - if c.embeddingBucket == c.S3Config.GetHotBucket() { + if c.derivedStorageBucket == c.S3Config.GetHotBucket() { return nil, stacktrace.Propagate(errors.New("embedding bucket and hot bucket are same"), "") } downloader := s3manager.NewDownloaderWithClient(c.S3Config.GetHotS3Client()) diff --git a/server/pkg/utils/s3config/s3config.go b/server/pkg/utils/s3config/s3config.go index 02a7fbd26..f3e22493a 100644 --- a/server/pkg/utils/s3config/s3config.go +++ b/server/pkg/utils/s3config/s3config.go @@ -28,8 +28,8 @@ type S3Config struct { hotDC string // Secondary (hot) data center secondaryHotDC string - // Bucket for storing ml embeddings & preview files - embeddingsDC string + //Derived data data center for derived files like ml embeddings & preview files + derivedStorageDC string // A map from data centers to S3 configurations s3Configs map[string]*aws.Config // A map from data centers to pre-created S3 clients @@ -102,10 +102,10 @@ func (config *S3Config) initialize() { config.secondaryHotDC = hs2 log.Infof("Hot storage: %s (secondary: %s)", hs1, hs2) } - config.embeddingsDC = config.hotDC - embeddingsDC := viper.GetString("s3.embeddings-bucket") + config.derivedStorageDC = config.hotDC + embeddingsDC := viper.GetString("s3.derived-storage") if embeddingsDC != "" && array.StringInList(embeddingsDC, dcs[:]) { - config.embeddingsDC = embeddingsDC + config.derivedStorageDC = embeddingsDC log.Infof("Embeddings bucket: %s", embeddingsDC) } @@ -180,15 +180,15 @@ func (config *S3Config) GetHotS3Client() *s3.S3 { return &s3Client } -func (config *S3Config) GetEmbeddingsDataCenter() string { - return config.embeddingsDC +func (config *S3Config) GetDerivedStorageDataCenter() string { + return config.derivedStorageDC } -func (config *S3Config) GetEmbeddingsBucket() *string { - return config.GetBucket(config.embeddingsDC) +func (config *S3Config) GetDerivedStorageBucket() *string { + return config.GetBucket(config.derivedStorageDC) } -func (config *S3Config) GetEmbeddingsS3Client() *s3.S3 { - s3Client := config.GetS3Client(config.embeddingsDC) +func (config *S3Config) GetDerivedStorageS3Client() *s3.S3 { + s3Client := config.GetS3Client(config.derivedStorageDC) return &s3Client } From 851f914ef87d86f0e9618e05e25247b5562488ce Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Wed, 15 May 2024 17:50:05 +0530 Subject: [PATCH 11/30] Add wasabi-derived in list of dcs --- server/pkg/utils/s3config/s3config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/pkg/utils/s3config/s3config.go b/server/pkg/utils/s3config/s3config.go index f3e22493a..bd5496f48 100644 --- a/server/pkg/utils/s3config/s3config.go +++ b/server/pkg/utils/s3config/s3config.go @@ -73,7 +73,7 @@ var ( dcWasabiEuropeCentralDeprecated string = "wasabi-eu-central-2" dcWasabiEuropeCentral_v3 string = "wasabi-eu-central-2-v3" dcSCWEuropeFrance_v3 string = "scw-eu-fr-v3" - dcWasabiEuropeCentral string = "wasabi-eu-central-2-embeddings" + dcWasabiEuropeCentralDerived string = "wasabi-eu-central-2-derived" ) // Number of days that the wasabi bucket is configured to retain objects. @@ -91,7 +91,7 @@ func NewS3Config() *S3Config { func (config *S3Config) initialize() { dcs := [6]string{ dcB2EuropeCentral, dcSCWEuropeFranceLockedDeprecated, dcWasabiEuropeCentralDeprecated, - dcWasabiEuropeCentral_v3, dcSCWEuropeFrance_v3, dcWasabiEuropeCentral} + dcWasabiEuropeCentral_v3, dcSCWEuropeFrance_v3, dcWasabiEuropeCentralDerived} config.hotDC = dcB2EuropeCentral config.secondaryHotDC = dcWasabiEuropeCentral_v3 From da188aa753b726a9d7864d407fbc311b9ef07725 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Thu, 16 May 2024 10:19:58 +0530 Subject: [PATCH 12/30] Add datacenter column for embeddings --- server/migrations/86_add_dc_embedding.down.sql | 2 ++ server/migrations/86_add_dc_embedding.up.sql | 3 +++ 2 files changed, 5 insertions(+) create mode 100644 server/migrations/86_add_dc_embedding.down.sql create mode 100644 server/migrations/86_add_dc_embedding.up.sql diff --git a/server/migrations/86_add_dc_embedding.down.sql b/server/migrations/86_add_dc_embedding.down.sql new file mode 100644 index 000000000..d42a539f9 --- /dev/null +++ b/server/migrations/86_add_dc_embedding.down.sql @@ -0,0 +1,2 @@ +-- Add types for the new dcs that are introduced for the derived data +ALTER TABLE embeddings DROP COLUMN IF EXISTS datacenters; \ No newline at end of file diff --git a/server/migrations/86_add_dc_embedding.up.sql b/server/migrations/86_add_dc_embedding.up.sql new file mode 100644 index 000000000..eecb8c013 --- /dev/null +++ b/server/migrations/86_add_dc_embedding.up.sql @@ -0,0 +1,3 @@ +-- Add types for the new dcs that are introduced for the derived data +ALTER TYPE s3region ADD VALUE 'wasabi-eu-central-2-derived'; +ALTER TABLE embeddings ADD COLUMN IF NOT EXISTS datacenters s3region[] default '{b2-eu-cen}'; \ No newline at end of file From 64ecdfa15319ea490769003b17aafed930043954 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Thu, 16 May 2024 10:58:00 +0530 Subject: [PATCH 13/30] Store dc during insert or update --- server/pkg/controller/embedding/controller.go | 2 +- server/pkg/repo/embedding/repository.go | 27 +++++++++++++------ 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index 81cc059bf..6cf0d610f 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -103,7 +103,7 @@ func (c *Controller) InsertOrUpdate(ctx *gin.Context, req ente.InsertOrUpdateEmb log.Error(uploadErr) return nil, stacktrace.Propagate(uploadErr, "") } - embedding, err := c.Repo.InsertOrUpdate(ctx, userID, req, size, version) + embedding, err := c.Repo.InsertOrUpdate(ctx, userID, req, size, version, c.S3Config.GetDerivedStorageDataCenter()) embedding.Version = &version if err != nil { return nil, stacktrace.Propagate(err, "") diff --git a/server/pkg/repo/embedding/repository.go b/server/pkg/repo/embedding/repository.go index 86915fde5..bcbe822e4 100644 --- a/server/pkg/repo/embedding/repository.go +++ b/server/pkg/repo/embedding/repository.go @@ -18,15 +18,26 @@ type Repository struct { } // Create inserts a new embedding - -func (r *Repository) InsertOrUpdate(ctx context.Context, ownerID int64, entry ente.InsertOrUpdateEmbeddingRequest, size int, version int) (ente.Embedding, error) { +func (r *Repository) InsertOrUpdate(ctx context.Context, ownerID int64, entry ente.InsertOrUpdateEmbeddingRequest, size int, version int, dc string) (ente.Embedding, error) { var updatedAt int64 - err := r.DB.QueryRowContext(ctx, `INSERT INTO embeddings - (file_id, owner_id, model, size, version) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT ON CONSTRAINT unique_embeddings_file_id_model - DO UPDATE SET updated_at = now_utc_micro_seconds(), size = $4, version = $5 - RETURNING updated_at`, entry.FileID, ownerID, entry.Model, size, version).Scan(&updatedAt) + err := r.DB.QueryRowContext(ctx, ` + INSERT INTO embeddings + (file_id, owner_id, model, size, version, datacenters) + VALUES + ($1, $2, $3, $4, $5, ARRAY[$6]::s3region[]) + ON CONFLICT ON CONSTRAINT unique_embeddings_file_id_model + DO UPDATE + SET + updated_at = now_utc_micro_seconds(), + size = $4, + version = $5, + datacenters = CASE + WHEN $6 = ANY(COALESCE(embeddings.datacenters, ARRAY['b2-eu-cen']::s3region[])) THEN embeddings.datacenters + ELSE array_append(COALESCE(embeddings.datacenters, ARRAY['b2-eu-cen']::s3region[]), $6::s3region) + END + RETURNING updated_at`, + entry.FileID, ownerID, entry.Model, size, version, dc).Scan(&updatedAt) + if err != nil { // check if error is due to model enum invalid value if err.Error() == fmt.Sprintf("pq: invalid input value for enum model: \"%s\"", entry.Model) { From 6e204d828cfc11aff5fc682fad446bbb0b854c42 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Thu, 16 May 2024 11:33:01 +0530 Subject: [PATCH 14/30] Delete derived data from all datacenters --- server/pkg/controller/embedding/controller.go | 47 +++++++++++++++---- server/pkg/repo/embedding/repository.go | 33 +++++++++++++ 2 files changed, 71 insertions(+), 9 deletions(-) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index 6cf0d610f..fec968daf 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -264,33 +264,62 @@ func (c *Controller) deleteEmbedding(qItem repo.QueueItem) { return } prefix := c.getEmbeddingObjectPrefix(ownerID, fileID) - - err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetDerivedStorageDataCenter()) + datacenters, err := c.Repo.GetDatacenters(context.Background(), fileID) if err != nil { - ctxLogger.WithError(err).Error("Failed to delete all objects") + ctxLogger.WithError(err).Error("Failed to fetch datacenters") return } - // if Embeddings DC is different from hot DC, delete from hot DC as well - if !c.areDerivedAndHotBucketSame { - err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetHotDataCenter()) + // Ensure that the object are deleted from active derived storage dc. Ideally, this section should never be executed + // unless there's a bug in storing the DC or the service restarts before removing the rows from the table + // todo:(neeraj): remove this section after a few weeks of deployment + if len(datacenters) == 0 { + ctxLogger.Warn("No datacenters found for file, ensuring deletion from derived storage and hot DC") + err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetDerivedStorageDataCenter()) if err != nil { - ctxLogger.WithError(err).Error("Failed to delete all objects from hot DC") + ctxLogger.WithError(err).Error("Failed to delete all objects") return } + // if Embeddings DC is different from hot DC, delete from hot DC as well + if !c.areDerivedAndHotBucketSame { + err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetHotDataCenter()) + if err != nil { + ctxLogger.WithError(err).Error("Failed to delete all objects from hot DC") + return + } + } + } else { + ctxLogger.Info("Deleting from all datacenters %v", datacenters) + } + + for i := range datacenters { + err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, datacenters[i]) + if err != nil { + ctxLogger.WithError(err).Errorf("Failed to delete all objects from %s", datacenters[i]) + return + } else { + removeErr := c.Repo.RemoveDatacenter(context.Background(), fileID, datacenters[i]) + if removeErr != nil { + ctxLogger.WithError(removeErr).Error("Failed to remove datacenter from db") + return + } + } } + noDcs, noDcErr := c.Repo.GetDatacenters(context.Background(), fileID) + if len(noDcs) > 0 || noDcErr != nil { + ctxLogger.Errorf("Failed to delete from all datacenters %s", noDcs) + return + } err = c.Repo.Delete(fileID) if err != nil { ctxLogger.WithError(err).Error("Failed to remove from db") return } - err = c.QueueRepo.DeleteItem(repo.DeleteEmbeddingsQueue, qItem.Item) if err != nil { ctxLogger.WithError(err).Error("Failed to remove item from the queue") return } - ctxLogger.Info("Successfully deleted all embeddings") } diff --git a/server/pkg/repo/embedding/repository.go b/server/pkg/repo/embedding/repository.go index bcbe822e4..d2e73145a 100644 --- a/server/pkg/repo/embedding/repository.go +++ b/server/pkg/repo/embedding/repository.go @@ -93,6 +93,39 @@ func (r *Repository) Delete(fileID int64) error { return nil } +// GetDatacenters returns unique list of datacenters where derived embeddings are stored +func (r *Repository) GetDatacenters(ctx context.Context, fileID int64) ([]string, error) { + rows, err := r.DB.QueryContext(ctx, `SELECT datacenters FROM embeddings WHERE file_id = $1`, fileID) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + uniqueDatacenters := make(map[string]struct{}) + for rows.Next() { + var datacenters []string + err = rows.Scan(pq.Array(&datacenters)) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + for _, dc := range datacenters { + uniqueDatacenters[dc] = struct{}{} + } + } + datacenters := make([]string, 0, len(uniqueDatacenters)) + for dc := range uniqueDatacenters { + datacenters = append(datacenters, dc) + } + return datacenters, nil +} + +// RemoveDatacenter removes the given datacenter from the list of datacenters +func (r *Repository) RemoveDatacenter(ctx context.Context, fileID int64, dc string) error { + _, err := r.DB.ExecContext(ctx, `UPDATE embeddings SET datacenters = array_remove(datacenters, $1) WHERE file_id = $2`, dc, fileID) + if err != nil { + return stacktrace.Propagate(err, "") + } + return nil +} + func convertRowsToEmbeddings(rows *sql.Rows) ([]ente.Embedding, error) { defer func() { if err := rows.Close(); err != nil { From 3c7d86da8d119046bd8bbc7190318c340308537c Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Thu, 16 May 2024 11:35:46 +0530 Subject: [PATCH 15/30] Minor refactor --- server/pkg/controller/embedding/controller.go | 115 ---------------- server/pkg/controller/embedding/delete.go | 126 ++++++++++++++++++ 2 files changed, 126 insertions(+), 115 deletions(-) create mode 100644 server/pkg/controller/embedding/delete.go diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index fec968daf..6f0f6e69a 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/ente-io/museum/pkg/utils/array" "strconv" @@ -23,7 +22,6 @@ import ( "github.com/ente-io/museum/pkg/utils/auth" "github.com/ente-io/museum/pkg/utils/network" "github.com/ente-io/museum/pkg/utils/s3config" - "github.com/ente-io/museum/pkg/utils/time" "github.com/ente-io/stacktrace" "github.com/gin-gonic/gin" log "github.com/sirupsen/logrus" @@ -210,119 +208,6 @@ func (c *Controller) GetFilesEmbedding(ctx *gin.Context, req ente.GetFilesEmbedd }, nil } -func (c *Controller) DeleteAll(ctx *gin.Context) error { - userID := auth.GetUserID(ctx.Request.Header) - - err := c.Repo.DeleteAll(ctx, userID) - if err != nil { - return stacktrace.Propagate(err, "") - } - return nil -} - -// CleanupDeletedEmbeddings clears all embeddings for deleted files from the object store -func (c *Controller) CleanupDeletedEmbeddings() { - log.Info("Cleaning up deleted embeddings") - if c.cleanupCronRunning { - log.Info("Skipping CleanupDeletedEmbeddings cron run as another instance is still running") - return - } - c.cleanupCronRunning = true - defer func() { - c.cleanupCronRunning = false - }() - items, err := c.QueueRepo.GetItemsReadyForDeletion(repo.DeleteEmbeddingsQueue, 200) - if err != nil { - log.WithError(err).Error("Failed to fetch items from queue") - return - } - for _, i := range items { - c.deleteEmbedding(i) - } -} - -func (c *Controller) deleteEmbedding(qItem repo.QueueItem) { - lockName := fmt.Sprintf("Embedding:%s", qItem.Item) - lockStatus, err := c.TaskLockingRepo.AcquireLock(lockName, time.MicrosecondsAfterHours(1), c.HostName) - ctxLogger := log.WithField("item", qItem.Item).WithField("queue_id", qItem.Id) - if err != nil || !lockStatus { - ctxLogger.Warn("unable to acquire lock") - return - } - defer func() { - err = c.TaskLockingRepo.ReleaseLock(lockName) - if err != nil { - ctxLogger.Errorf("Error while releasing lock %s", err) - } - }() - ctxLogger.Info("Deleting all embeddings") - - fileID, _ := strconv.ParseInt(qItem.Item, 10, 64) - ownerID, err := c.FileRepo.GetOwnerID(fileID) - if err != nil { - ctxLogger.WithError(err).Error("Failed to fetch ownerID") - return - } - prefix := c.getEmbeddingObjectPrefix(ownerID, fileID) - datacenters, err := c.Repo.GetDatacenters(context.Background(), fileID) - if err != nil { - ctxLogger.WithError(err).Error("Failed to fetch datacenters") - return - } - // Ensure that the object are deleted from active derived storage dc. Ideally, this section should never be executed - // unless there's a bug in storing the DC or the service restarts before removing the rows from the table - // todo:(neeraj): remove this section after a few weeks of deployment - if len(datacenters) == 0 { - ctxLogger.Warn("No datacenters found for file, ensuring deletion from derived storage and hot DC") - err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetDerivedStorageDataCenter()) - if err != nil { - ctxLogger.WithError(err).Error("Failed to delete all objects") - return - } - // if Embeddings DC is different from hot DC, delete from hot DC as well - if !c.areDerivedAndHotBucketSame { - err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetHotDataCenter()) - if err != nil { - ctxLogger.WithError(err).Error("Failed to delete all objects from hot DC") - return - } - } - } else { - ctxLogger.Info("Deleting from all datacenters %v", datacenters) - } - - for i := range datacenters { - err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, datacenters[i]) - if err != nil { - ctxLogger.WithError(err).Errorf("Failed to delete all objects from %s", datacenters[i]) - return - } else { - removeErr := c.Repo.RemoveDatacenter(context.Background(), fileID, datacenters[i]) - if removeErr != nil { - ctxLogger.WithError(removeErr).Error("Failed to remove datacenter from db") - return - } - } - } - - noDcs, noDcErr := c.Repo.GetDatacenters(context.Background(), fileID) - if len(noDcs) > 0 || noDcErr != nil { - ctxLogger.Errorf("Failed to delete from all datacenters %s", noDcs) - return - } - err = c.Repo.Delete(fileID) - if err != nil { - ctxLogger.WithError(err).Error("Failed to remove from db") - return - } - err = c.QueueRepo.DeleteItem(repo.DeleteEmbeddingsQueue, qItem.Item) - if err != nil { - ctxLogger.WithError(err).Error("Failed to remove item from the queue") - return - } - ctxLogger.Info("Successfully deleted all embeddings") -} - func (c *Controller) getObjectKey(userID int64, fileID int64, model string) string { return c.getEmbeddingObjectPrefix(userID, fileID) + model + ".json" } diff --git a/server/pkg/controller/embedding/delete.go b/server/pkg/controller/embedding/delete.go new file mode 100644 index 000000000..c6e193b79 --- /dev/null +++ b/server/pkg/controller/embedding/delete.go @@ -0,0 +1,126 @@ +package embedding + +import ( + "context" + "fmt" + "github.com/ente-io/museum/pkg/repo" + "github.com/ente-io/museum/pkg/utils/auth" + "github.com/ente-io/museum/pkg/utils/time" + "github.com/ente-io/stacktrace" + "github.com/gin-gonic/gin" + log "github.com/sirupsen/logrus" + "strconv" +) + +func (c *Controller) DeleteAll(ctx *gin.Context) error { + userID := auth.GetUserID(ctx.Request.Header) + + err := c.Repo.DeleteAll(ctx, userID) + if err != nil { + return stacktrace.Propagate(err, "") + } + return nil +} + +// CleanupDeletedEmbeddings clears all embeddings for deleted files from the object store +func (c *Controller) CleanupDeletedEmbeddings() { + log.Info("Cleaning up deleted embeddings") + if c.cleanupCronRunning { + log.Info("Skipping CleanupDeletedEmbeddings cron run as another instance is still running") + return + } + c.cleanupCronRunning = true + defer func() { + c.cleanupCronRunning = false + }() + items, err := c.QueueRepo.GetItemsReadyForDeletion(repo.DeleteEmbeddingsQueue, 200) + if err != nil { + log.WithError(err).Error("Failed to fetch items from queue") + return + } + for _, i := range items { + c.deleteEmbedding(i) + } +} + +func (c *Controller) deleteEmbedding(qItem repo.QueueItem) { + lockName := fmt.Sprintf("Embedding:%s", qItem.Item) + lockStatus, err := c.TaskLockingRepo.AcquireLock(lockName, time.MicrosecondsAfterHours(1), c.HostName) + ctxLogger := log.WithField("item", qItem.Item).WithField("queue_id", qItem.Id) + if err != nil || !lockStatus { + ctxLogger.Warn("unable to acquire lock") + return + } + defer func() { + err = c.TaskLockingRepo.ReleaseLock(lockName) + if err != nil { + ctxLogger.Errorf("Error while releasing lock %s", err) + } + }() + ctxLogger.Info("Deleting all embeddings") + + fileID, _ := strconv.ParseInt(qItem.Item, 10, 64) + ownerID, err := c.FileRepo.GetOwnerID(fileID) + if err != nil { + ctxLogger.WithError(err).Error("Failed to fetch ownerID") + return + } + prefix := c.getEmbeddingObjectPrefix(ownerID, fileID) + datacenters, err := c.Repo.GetDatacenters(context.Background(), fileID) + if err != nil { + ctxLogger.WithError(err).Error("Failed to fetch datacenters") + return + } + // Ensure that the object are deleted from active derived storage dc. Ideally, this section should never be executed + // unless there's a bug in storing the DC or the service restarts before removing the rows from the table + // todo:(neeraj): remove this section after a few weeks of deployment + if len(datacenters) == 0 { + ctxLogger.Warn("No datacenters found for file, ensuring deletion from derived storage and hot DC") + err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetDerivedStorageDataCenter()) + if err != nil { + ctxLogger.WithError(err).Error("Failed to delete all objects") + return + } + // if Embeddings DC is different from hot DC, delete from hot DC as well + if !c.areDerivedAndHotBucketSame { + err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetHotDataCenter()) + if err != nil { + ctxLogger.WithError(err).Error("Failed to delete all objects from hot DC") + return + } + } + } else { + ctxLogger.Info("Deleting from all datacenters %v", datacenters) + } + + for i := range datacenters { + err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, datacenters[i]) + if err != nil { + ctxLogger.WithError(err).Errorf("Failed to delete all objects from %s", datacenters[i]) + return + } else { + removeErr := c.Repo.RemoveDatacenter(context.Background(), fileID, datacenters[i]) + if removeErr != nil { + ctxLogger.WithError(removeErr).Error("Failed to remove datacenter from db") + return + } + } + } + + noDcs, noDcErr := c.Repo.GetDatacenters(context.Background(), fileID) + if len(noDcs) > 0 || noDcErr != nil { + ctxLogger.Errorf("Failed to delete from all datacenters %s", noDcs) + return + } + err = c.Repo.Delete(fileID) + if err != nil { + ctxLogger.WithError(err).Error("Failed to remove from db") + return + } + err = c.QueueRepo.DeleteItem(repo.DeleteEmbeddingsQueue, qItem.Item) + if err != nil { + ctxLogger.WithError(err).Error("Failed to remove item from the queue") + return + } + ctxLogger.Info("Successfully deleted all embeddings") +} From e0738db6ae517b0c0159136b6efe00dadf96336a Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Thu, 16 May 2024 12:23:25 +0530 Subject: [PATCH 16/30] Minor refactor --- server/pkg/controller/embedding/controller.go | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index 6f0f6e69a..3fff6b568 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -45,7 +45,7 @@ type Controller struct { HostName string cleanupCronRunning bool derivedStorageS3Client *s3.S3 - derivedStorageBucket *string + derivedStorageDataCenter string areDerivedAndHotBucketSame bool } @@ -61,8 +61,8 @@ func New(repo *embedding.Repository, accessCtrl access.Controller, objectCleanup CollectionRepo: collectionRepo, HostName: hostName, derivedStorageS3Client: s3Config.GetDerivedStorageS3Client(), - derivedStorageBucket: s3Config.GetDerivedStorageBucket(), - areDerivedAndHotBucketSame: s3Config.GetDerivedStorageBucket() == s3Config.GetHotBucket(), + derivedStorageDataCenter: s3Config.GetDerivedStorageDataCenter(), + areDerivedAndHotBucketSame: s3Config.GetDerivedStorageDataCenter() == s3Config.GetDerivedStorageDataCenter(), } } @@ -96,12 +96,12 @@ func (c *Controller) InsertOrUpdate(ctx *gin.Context, req ente.InsertOrUpdateEmb DecryptionHeader: req.DecryptionHeader, Client: network.GetPrettyUA(ctx.GetHeader("User-Agent")) + "/" + ctx.GetHeader("X-Client-Version"), } - size, uploadErr := c.uploadObject(obj, c.getObjectKey(userID, req.FileID, req.Model)) + size, uploadErr := c.uploadObject(obj, c.getObjectKey(userID, req.FileID, req.Model), c.derivedStorageDataCenter) if uploadErr != nil { log.Error(uploadErr) return nil, stacktrace.Propagate(uploadErr, "") } - embedding, err := c.Repo.InsertOrUpdate(ctx, userID, req, size, version, c.S3Config.GetDerivedStorageDataCenter()) + embedding, err := c.Repo.InsertOrUpdate(ctx, userID, req, size, version, c.derivedStorageDataCenter) embedding.Version = &version if err != nil { return nil, stacktrace.Propagate(err, "") @@ -217,11 +217,13 @@ func (c *Controller) getEmbeddingObjectPrefix(userID int64, fileID int64) string } // uploadObject uploads the embedding object to the object store and returns the object size -func (c *Controller) uploadObject(obj ente.EmbeddingObject, key string) (int, error) { +func (c *Controller) uploadObject(obj ente.EmbeddingObject, key string, dc string) (int, error) { embeddingObj, _ := json.Marshal(obj) - uploader := s3manager.NewUploaderWithClient(c.derivedStorageS3Client) + s3Client := c.S3Config.GetS3Client(dc) + s3Bucket := c.S3Config.GetBucket(dc) + uploader := s3manager.NewUploaderWithClient(&s3Client) up := s3manager.UploadInput{ - Bucket: c.derivedStorageBucket, + Bucket: s3Bucket, Key: &key, Body: bytes.NewReader(embeddingObj), } @@ -331,7 +333,7 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d cancel() return ente.EmbeddingObject{}, stacktrace.Propagate(ctx.Err(), "") default: - obj, err := c.downloadObject(fetchCtx, objectKey, downloader, c.derivedStorageBucket) + obj, err := c.downloadObject(fetchCtx, objectKey, downloader, c.derivedStorageDataCenter) cancel() // Ensure cancel is called to release resources if err == nil { if i > 0 { @@ -368,9 +370,10 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("failed to fetch object"), "") } -func (c *Controller) downloadObject(ctx context.Context, objectKey string, downloader *s3manager.Downloader, bucket *string) (ente.EmbeddingObject, error) { +func (c *Controller) downloadObject(ctx context.Context, objectKey string, downloader *s3manager.Downloader, dc string) (ente.EmbeddingObject, error) { var obj ente.EmbeddingObject buff := &aws.WriteAtBuffer{} + bucket := c.S3Config.GetBucket(dc) _, err := downloader.DownloadWithContext(ctx, buff, &s3.GetObjectInput{ Bucket: bucket, Key: &objectKey, @@ -387,16 +390,16 @@ func (c *Controller) downloadObject(ctx context.Context, objectKey string, downl // download the embedding object from hot bucket and upload to embeddings bucket func (c *Controller) copyEmbeddingObject(ctx context.Context, objectKey string) (*ente.EmbeddingObject, error) { - if c.derivedStorageBucket == c.S3Config.GetHotBucket() { - return nil, stacktrace.Propagate(errors.New("embedding bucket and hot bucket are same"), "") + if c.derivedStorageDataCenter == c.S3Config.GetHotDataCenter() { + return nil, stacktrace.Propagate(errors.New("derived DC bucket and hot DC are same"), "") } downloader := s3manager.NewDownloaderWithClient(c.S3Config.GetHotS3Client()) - obj, err := c.downloadObject(ctx, objectKey, downloader, c.S3Config.GetHotBucket()) + obj, err := c.downloadObject(ctx, objectKey, downloader, c.S3Config.GetHotDataCenter()) if err != nil { return nil, stacktrace.Propagate(err, "failed to download from hot bucket") } go func() { - _, err = c.uploadObject(obj, objectKey) + _, err = c.uploadObject(obj, objectKey, c.derivedStorageDataCenter) if err != nil { log.WithField("object", objectKey).Error("Failed to copy to embeddings bucket: ", err) } From 4cc866fa129da3b5f62f3525cef630e357f98a86 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Thu, 16 May 2024 12:49:23 +0530 Subject: [PATCH 17/30] Refactor --- server/pkg/controller/embedding/controller.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index 3fff6b568..09cb23985 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -44,7 +44,6 @@ type Controller struct { CollectionRepo *repo.CollectionRepository HostName string cleanupCronRunning bool - derivedStorageS3Client *s3.S3 derivedStorageDataCenter string areDerivedAndHotBucketSame bool } @@ -60,7 +59,6 @@ func New(repo *embedding.Repository, accessCtrl access.Controller, objectCleanup FileRepo: fileRepo, CollectionRepo: collectionRepo, HostName: hostName, - derivedStorageS3Client: s3Config.GetDerivedStorageS3Client(), derivedStorageDataCenter: s3Config.GetDerivedStorageDataCenter(), areDerivedAndHotBucketSame: s3Config.GetDerivedStorageDataCenter() == s3Config.GetDerivedStorageDataCenter(), } @@ -245,7 +243,8 @@ func (c *Controller) getEmbeddingObjectsParallel(objectKeys []string) ([]ente.Em var wg sync.WaitGroup var errs []error embeddingObjects := make([]ente.EmbeddingObject, len(objectKeys)) - downloader := s3manager.NewDownloaderWithClient(c.derivedStorageS3Client) + s3Client := c.S3Config.GetS3Client(c.derivedStorageDataCenter) + downloader := s3manager.NewDownloaderWithClient(&s3Client) for i, objectKey := range objectKeys { wg.Add(1) @@ -282,7 +281,8 @@ type embeddingObjectResult struct { func (c *Controller) getEmbeddingObjectsParallelV2(userID int64, dbEmbeddingRows []ente.Embedding) ([]embeddingObjectResult, error) { var wg sync.WaitGroup embeddingObjects := make([]embeddingObjectResult, len(dbEmbeddingRows)) - downloader := s3manager.NewDownloaderWithClient(c.derivedStorageS3Client) + s3Client := c.S3Config.GetS3Client(c.derivedStorageDataCenter) + downloader := s3manager.NewDownloaderWithClient(&s3Client) for i, dbEmbeddingRow := range dbEmbeddingRows { wg.Add(1) From 3f1ee82ec591b2c021db68f1e7a027d7687fc021 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Thu, 16 May 2024 12:52:20 +0530 Subject: [PATCH 18/30] Inline --- server/pkg/controller/embedding/controller.go | 2 +- server/pkg/controller/embedding/delete.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index 09cb23985..c4cae36bc 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -348,7 +348,7 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d // check if the error is due to object not found if s3Err, ok := err.(awserr.RequestFailure); ok { if s3Err.Code() == s3.ErrCodeNoSuchKey { - if c.areDerivedAndHotBucketSame { + if c.derivedStorageDataCenter == c.S3Config.GetHotDataCenter() { ctxLogger.Error("Object not found: ", s3Err) } else { // If derived and hot bucket are different, try to copy from hot bucket diff --git a/server/pkg/controller/embedding/delete.go b/server/pkg/controller/embedding/delete.go index c6e193b79..da9f349ad 100644 --- a/server/pkg/controller/embedding/delete.go +++ b/server/pkg/controller/embedding/delete.go @@ -81,8 +81,8 @@ func (c *Controller) deleteEmbedding(qItem repo.QueueItem) { ctxLogger.WithError(err).Error("Failed to delete all objects") return } - // if Embeddings DC is different from hot DC, delete from hot DC as well - if !c.areDerivedAndHotBucketSame { + // if Derived DC is different from hot DC, delete from hot DC as well + if c.derivedStorageDataCenter != c.S3Config.GetHotDataCenter() { err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetHotDataCenter()) if err != nil { ctxLogger.WithError(err).Error("Failed to delete all objects from hot DC") From b53a70cf65cf537e7fa5c0d83a7fd72e21129871 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Thu, 16 May 2024 12:57:31 +0530 Subject: [PATCH 19/30] Avoid retry if object is missing --- server/pkg/controller/embedding/controller.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index c4cae36bc..ff8b87696 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -350,6 +350,7 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d if s3Err.Code() == s3.ErrCodeNoSuchKey { if c.derivedStorageDataCenter == c.S3Config.GetHotDataCenter() { ctxLogger.Error("Object not found: ", s3Err) + return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "") } else { // If derived and hot bucket are different, try to copy from hot bucket copyEmbeddingObject, err := c.copyEmbeddingObject(ctx, objectKey) From 3485b3147565ccd1195f19257a5b98969eb0e9d5 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Thu, 16 May 2024 12:58:42 +0530 Subject: [PATCH 20/30] Clean up & new line --- .../migrations/86_add_dc_embedding.down.sql | 2 +- server/migrations/86_add_dc_embedding.up.sql | 2 +- server/pkg/controller/embedding/controller.go | 44 +++++++++---------- 3 files changed, 23 insertions(+), 25 deletions(-) diff --git a/server/migrations/86_add_dc_embedding.down.sql b/server/migrations/86_add_dc_embedding.down.sql index d42a539f9..d9c271d84 100644 --- a/server/migrations/86_add_dc_embedding.down.sql +++ b/server/migrations/86_add_dc_embedding.down.sql @@ -1,2 +1,2 @@ -- Add types for the new dcs that are introduced for the derived data -ALTER TABLE embeddings DROP COLUMN IF EXISTS datacenters; \ No newline at end of file +ALTER TABLE embeddings DROP COLUMN IF EXISTS datacenters; diff --git a/server/migrations/86_add_dc_embedding.up.sql b/server/migrations/86_add_dc_embedding.up.sql index eecb8c013..28f53d4d1 100644 --- a/server/migrations/86_add_dc_embedding.up.sql +++ b/server/migrations/86_add_dc_embedding.up.sql @@ -1,3 +1,3 @@ -- Add types for the new dcs that are introduced for the derived data ALTER TYPE s3region ADD VALUE 'wasabi-eu-central-2-derived'; -ALTER TABLE embeddings ADD COLUMN IF NOT EXISTS datacenters s3region[] default '{b2-eu-cen}'; \ No newline at end of file +ALTER TABLE embeddings ADD COLUMN IF NOT EXISTS datacenters s3region[] default '{b2-eu-cen}'; diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index ff8b87696..b62642f8f 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -34,33 +34,31 @@ const ( ) type Controller struct { - Repo *embedding.Repository - AccessCtrl access.Controller - ObjectCleanupController *controller.ObjectCleanupController - S3Config *s3config.S3Config - QueueRepo *repo.QueueRepository - TaskLockingRepo *repo.TaskLockRepository - FileRepo *repo.FileRepository - CollectionRepo *repo.CollectionRepository - HostName string - cleanupCronRunning bool - derivedStorageDataCenter string - areDerivedAndHotBucketSame bool + Repo *embedding.Repository + AccessCtrl access.Controller + ObjectCleanupController *controller.ObjectCleanupController + S3Config *s3config.S3Config + QueueRepo *repo.QueueRepository + TaskLockingRepo *repo.TaskLockRepository + FileRepo *repo.FileRepository + CollectionRepo *repo.CollectionRepository + HostName string + cleanupCronRunning bool + derivedStorageDataCenter string } func New(repo *embedding.Repository, accessCtrl access.Controller, objectCleanupController *controller.ObjectCleanupController, s3Config *s3config.S3Config, queueRepo *repo.QueueRepository, taskLockingRepo *repo.TaskLockRepository, fileRepo *repo.FileRepository, collectionRepo *repo.CollectionRepository, hostName string) *Controller { return &Controller{ - Repo: repo, - AccessCtrl: accessCtrl, - ObjectCleanupController: objectCleanupController, - S3Config: s3Config, - QueueRepo: queueRepo, - TaskLockingRepo: taskLockingRepo, - FileRepo: fileRepo, - CollectionRepo: collectionRepo, - HostName: hostName, - derivedStorageDataCenter: s3Config.GetDerivedStorageDataCenter(), - areDerivedAndHotBucketSame: s3Config.GetDerivedStorageDataCenter() == s3Config.GetDerivedStorageDataCenter(), + Repo: repo, + AccessCtrl: accessCtrl, + ObjectCleanupController: objectCleanupController, + S3Config: s3Config, + QueueRepo: queueRepo, + TaskLockingRepo: taskLockingRepo, + FileRepo: fileRepo, + CollectionRepo: collectionRepo, + HostName: hostName, + derivedStorageDataCenter: s3Config.GetDerivedStorageDataCenter(), } } From 20e9a6a1fcec9adfac3242cb62f08aa180351f85 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Thu, 16 May 2024 13:39:47 +0530 Subject: [PATCH 21/30] Refactor --- server/pkg/controller/embedding/controller.go | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index b62642f8f..fb79fd36e 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -33,6 +33,14 @@ const ( embeddingFetchTimeout = 15 * gTime.Second ) +// _fetchConfig is the configuration for the fetching objects from S3 +type _fetchConfig struct { + RetryCount int + FetchTimeOut gTime.Duration +} + +var _defaultFetchConfig = _fetchConfig{RetryCount: 3, FetchTimeOut: 15 * gTime.Second} + type Controller struct { Repo *embedding.Repository AccessCtrl access.Controller @@ -251,7 +259,7 @@ func (c *Controller) getEmbeddingObjectsParallel(objectKeys []string) ([]ente.Em defer wg.Done() defer func() { <-globalDiffFetchSemaphore }() // Release back to global semaphore - obj, err := c.getEmbeddingObject(context.Background(), objectKey, downloader, nil) + obj, err := c.getEmbeddingObject(context.Background(), objectKey, downloader) if err != nil { errs = append(errs, err) log.Error("error fetching embedding object: "+objectKey, err) @@ -289,7 +297,7 @@ func (c *Controller) getEmbeddingObjectsParallelV2(userID int64, dbEmbeddingRows defer wg.Done() defer func() { <-globalFileFetchSemaphore }() // Release back to global semaphore objectKey := c.getObjectKey(userID, dbEmbeddingRow.FileID, dbEmbeddingRow.Model) - obj, err := c.getEmbeddingObject(context.Background(), objectKey, downloader, nil) + obj, err := c.getEmbeddingObject(context.Background(), objectKey, downloader) if err != nil { log.Error("error fetching embedding object: "+objectKey, err) embeddingObjects[i] = embeddingObjectResult{ @@ -309,18 +317,8 @@ func (c *Controller) getEmbeddingObjectsParallelV2(userID int64, dbEmbeddingRows return embeddingObjects, nil } -type getOptions struct { - RetryCount int - FetchTimeOut gTime.Duration -} - -func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, downloader *s3manager.Downloader, opt *getOptions) (ente.EmbeddingObject, error) { - if opt == nil { - opt = &getOptions{ - RetryCount: 3, - FetchTimeOut: embeddingFetchTimeout, - } - } +func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, downloader *s3manager.Downloader) (ente.EmbeddingObject, error) { + opt := _defaultFetchConfig ctxLogger := log.WithField("objectKey", objectKey) totalAttempts := opt.RetryCount + 1 for i := 0; i < totalAttempts; i++ { @@ -346,7 +344,7 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d // check if the error is due to object not found if s3Err, ok := err.(awserr.RequestFailure); ok { if s3Err.Code() == s3.ErrCodeNoSuchKey { - if c.derivedStorageDataCenter == c.S3Config.GetHotDataCenter() { + if c.derivedStorageDataCenter == c.S3Config.GetHotBackblazeDC() { ctxLogger.Error("Object not found: ", s3Err) return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "") } else { @@ -389,11 +387,11 @@ func (c *Controller) downloadObject(ctx context.Context, objectKey string, downl // download the embedding object from hot bucket and upload to embeddings bucket func (c *Controller) copyEmbeddingObject(ctx context.Context, objectKey string) (*ente.EmbeddingObject, error) { - if c.derivedStorageDataCenter == c.S3Config.GetHotDataCenter() { + if c.derivedStorageDataCenter == c.S3Config.GetHotBackblazeDC() { return nil, stacktrace.Propagate(errors.New("derived DC bucket and hot DC are same"), "") } downloader := s3manager.NewDownloaderWithClient(c.S3Config.GetHotS3Client()) - obj, err := c.downloadObject(ctx, objectKey, downloader, c.S3Config.GetHotDataCenter()) + obj, err := c.downloadObject(ctx, objectKey, downloader, c.S3Config.GetHotBackblazeDC()) if err != nil { return nil, stacktrace.Propagate(err, "failed to download from hot bucket") } From a522631c2b96ceeb9c28711f3b134e31a9ce8b65 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Thu, 16 May 2024 15:19:22 +0530 Subject: [PATCH 22/30] Refactor --- server/pkg/controller/embedding/controller.go | 35 ++++++++++--------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index fb79fd36e..a217fb5f8 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -53,9 +53,16 @@ type Controller struct { HostName string cleanupCronRunning bool derivedStorageDataCenter string + downloadManagerCache map[string]*s3manager.Downloader } func New(repo *embedding.Repository, accessCtrl access.Controller, objectCleanupController *controller.ObjectCleanupController, s3Config *s3config.S3Config, queueRepo *repo.QueueRepository, taskLockingRepo *repo.TaskLockRepository, fileRepo *repo.FileRepository, collectionRepo *repo.CollectionRepository, hostName string) *Controller { + embeddingDcs := []string{s3Config.GetHotBackblazeDC(), s3Config.GetHotWasabiDC(), s3Config.GetDerivedStorageDataCenter()} + cache := make(map[string]*s3manager.Downloader, len(embeddingDcs)) + for i := range embeddingDcs { + s3Client := s3Config.GetS3Client(embeddingDcs[i]) + cache[embeddingDcs[i]] = s3manager.NewDownloaderWithClient(&s3Client) + } return &Controller{ Repo: repo, AccessCtrl: accessCtrl, @@ -67,6 +74,7 @@ func New(repo *embedding.Repository, accessCtrl access.Controller, objectCleanup CollectionRepo: collectionRepo, HostName: hostName, derivedStorageDataCenter: s3Config.GetDerivedStorageDataCenter(), + downloadManagerCache: cache, } } @@ -136,7 +144,7 @@ func (c *Controller) GetDiff(ctx *gin.Context, req ente.GetEmbeddingDiffRequest) // Fetch missing embeddings in parallel if len(objectKeys) > 0 { - embeddingObjects, err := c.getEmbeddingObjectsParallel(objectKeys) + embeddingObjects, err := c.getEmbeddingObjectsParallel(objectKeys, c.derivedStorageDataCenter) if err != nil { return nil, stacktrace.Propagate(err, "") } @@ -182,7 +190,7 @@ func (c *Controller) GetFilesEmbedding(ctx *gin.Context, req ente.GetFilesEmbedd errFileIds := make([]int64, 0) // Fetch missing userFileEmbeddings in parallel - embeddingObjects, err := c.getEmbeddingObjectsParallelV2(userID, embeddingsWithData) + embeddingObjects, err := c.getEmbeddingObjectsParallelV2(userID, embeddingsWithData, c.derivedStorageDataCenter) if err != nil { return nil, stacktrace.Propagate(err, "") } @@ -245,13 +253,10 @@ var globalDiffFetchSemaphore = make(chan struct{}, 300) var globalFileFetchSemaphore = make(chan struct{}, 400) -func (c *Controller) getEmbeddingObjectsParallel(objectKeys []string) ([]ente.EmbeddingObject, error) { +func (c *Controller) getEmbeddingObjectsParallel(objectKeys []string, dc string) ([]ente.EmbeddingObject, error) { var wg sync.WaitGroup var errs []error embeddingObjects := make([]ente.EmbeddingObject, len(objectKeys)) - s3Client := c.S3Config.GetS3Client(c.derivedStorageDataCenter) - downloader := s3manager.NewDownloaderWithClient(&s3Client) - for i, objectKey := range objectKeys { wg.Add(1) globalDiffFetchSemaphore <- struct{}{} // Acquire from global semaphore @@ -259,7 +264,7 @@ func (c *Controller) getEmbeddingObjectsParallel(objectKeys []string) ([]ente.Em defer wg.Done() defer func() { <-globalDiffFetchSemaphore }() // Release back to global semaphore - obj, err := c.getEmbeddingObject(context.Background(), objectKey, downloader) + obj, err := c.getEmbeddingObject(context.Background(), objectKey, dc) if err != nil { errs = append(errs, err) log.Error("error fetching embedding object: "+objectKey, err) @@ -284,11 +289,9 @@ type embeddingObjectResult struct { err error } -func (c *Controller) getEmbeddingObjectsParallelV2(userID int64, dbEmbeddingRows []ente.Embedding) ([]embeddingObjectResult, error) { +func (c *Controller) getEmbeddingObjectsParallelV2(userID int64, dbEmbeddingRows []ente.Embedding, dc string) ([]embeddingObjectResult, error) { var wg sync.WaitGroup embeddingObjects := make([]embeddingObjectResult, len(dbEmbeddingRows)) - s3Client := c.S3Config.GetS3Client(c.derivedStorageDataCenter) - downloader := s3manager.NewDownloaderWithClient(&s3Client) for i, dbEmbeddingRow := range dbEmbeddingRows { wg.Add(1) @@ -297,7 +300,7 @@ func (c *Controller) getEmbeddingObjectsParallelV2(userID int64, dbEmbeddingRows defer wg.Done() defer func() { <-globalFileFetchSemaphore }() // Release back to global semaphore objectKey := c.getObjectKey(userID, dbEmbeddingRow.FileID, dbEmbeddingRow.Model) - obj, err := c.getEmbeddingObject(context.Background(), objectKey, downloader) + obj, err := c.getEmbeddingObject(context.Background(), objectKey, dc) if err != nil { log.Error("error fetching embedding object: "+objectKey, err) embeddingObjects[i] = embeddingObjectResult{ @@ -317,7 +320,7 @@ func (c *Controller) getEmbeddingObjectsParallelV2(userID int64, dbEmbeddingRows return embeddingObjects, nil } -func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, downloader *s3manager.Downloader) (ente.EmbeddingObject, error) { +func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, dc string) (ente.EmbeddingObject, error) { opt := _defaultFetchConfig ctxLogger := log.WithField("objectKey", objectKey) totalAttempts := opt.RetryCount + 1 @@ -329,7 +332,7 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d cancel() return ente.EmbeddingObject{}, stacktrace.Propagate(ctx.Err(), "") default: - obj, err := c.downloadObject(fetchCtx, objectKey, downloader, c.derivedStorageDataCenter) + obj, err := c.downloadObject(fetchCtx, objectKey, dc) cancel() // Ensure cancel is called to release resources if err == nil { if i > 0 { @@ -367,10 +370,11 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("failed to fetch object"), "") } -func (c *Controller) downloadObject(ctx context.Context, objectKey string, downloader *s3manager.Downloader, dc string) (ente.EmbeddingObject, error) { +func (c *Controller) downloadObject(ctx context.Context, objectKey string, dc string) (ente.EmbeddingObject, error) { var obj ente.EmbeddingObject buff := &aws.WriteAtBuffer{} bucket := c.S3Config.GetBucket(dc) + downloader := c.downloadManagerCache[dc] _, err := downloader.DownloadWithContext(ctx, buff, &s3.GetObjectInput{ Bucket: bucket, Key: &objectKey, @@ -390,8 +394,7 @@ func (c *Controller) copyEmbeddingObject(ctx context.Context, objectKey string) if c.derivedStorageDataCenter == c.S3Config.GetHotBackblazeDC() { return nil, stacktrace.Propagate(errors.New("derived DC bucket and hot DC are same"), "") } - downloader := s3manager.NewDownloaderWithClient(c.S3Config.GetHotS3Client()) - obj, err := c.downloadObject(ctx, objectKey, downloader, c.S3Config.GetHotBackblazeDC()) + obj, err := c.downloadObject(ctx, objectKey, c.S3Config.GetHotBackblazeDC()) if err != nil { return nil, stacktrace.Propagate(err, "failed to download from hot bucket") } From b404b77da363d054c8521995578f2974d5589380 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Thu, 16 May 2024 16:08:38 +0530 Subject: [PATCH 23/30] Update dc while copying derived file --- server/pkg/controller/embedding/controller.go | 36 +++++++++++++------ server/pkg/repo/embedding/repository.go | 17 +++++++++ 2 files changed, 43 insertions(+), 10 deletions(-) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index a217fb5f8..df5301904 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -5,9 +5,11 @@ import ( "context" "encoding/json" "errors" + "fmt" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/ente-io/museum/pkg/utils/array" "strconv" + "strings" "sync" gTime "time" @@ -228,6 +230,15 @@ func (c *Controller) getEmbeddingObjectPrefix(userID int64, fileID int64) string return strconv.FormatInt(userID, 10) + "/ml-data/" + strconv.FormatInt(fileID, 10) + "/" } +// Get userId, model and fileID from the object key +func (c *Controller) getEmbeddingObjectDetails(objectKey string) (userID int64, model string, fileID int64) { + split := strings.Split(objectKey, "/") + userID, _ = strconv.ParseInt(split[0], 10, 64) + fileID, _ = strconv.ParseInt(split[2], 10, 64) + model = strings.Split(split[3], ".")[0] + return userID, model, fileID +} + // uploadObject uploads the embedding object to the object store and returns the object size func (c *Controller) uploadObject(obj ente.EmbeddingObject, key string, dc string) (int, error) { embeddingObj, _ := json.Marshal(obj) @@ -352,7 +363,7 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "") } else { // If derived and hot bucket are different, try to copy from hot bucket - copyEmbeddingObject, err := c.copyEmbeddingObject(ctx, objectKey) + copyEmbeddingObject, err := c.copyEmbeddingObject(ctx, objectKey, c.S3Config.GetHotBackblazeDC(), c.derivedStorageDataCenter) if err == nil { ctxLogger.Info("Got the object from hot bucket object") return *copyEmbeddingObject, nil @@ -390,21 +401,26 @@ func (c *Controller) downloadObject(ctx context.Context, objectKey string, dc st } // download the embedding object from hot bucket and upload to embeddings bucket -func (c *Controller) copyEmbeddingObject(ctx context.Context, objectKey string) (*ente.EmbeddingObject, error) { - if c.derivedStorageDataCenter == c.S3Config.GetHotBackblazeDC() { - return nil, stacktrace.Propagate(errors.New("derived DC bucket and hot DC are same"), "") +func (c *Controller) copyEmbeddingObject(ctx context.Context, objectKey string, srcDC, destDC string) (*ente.EmbeddingObject, error) { + if srcDC == destDC { + return nil, stacktrace.Propagate(errors.New("src and dest dc can not be same"), "") } - obj, err := c.downloadObject(ctx, objectKey, c.S3Config.GetHotBackblazeDC()) + obj, err := c.downloadObject(ctx, objectKey, srcDC) if err != nil { - return nil, stacktrace.Propagate(err, "failed to download from hot bucket") + return nil, stacktrace.Propagate(err, fmt.Sprintf("failed to download object from %s", srcDC)) } go func() { - _, err = c.uploadObject(obj, objectKey, c.derivedStorageDataCenter) - if err != nil { - log.WithField("object", objectKey).Error("Failed to copy to embeddings bucket: ", err) + userID, modelName, fileID := c.getEmbeddingObjectDetails(objectKey) + size, uploadErr := c.uploadObject(obj, objectKey, c.derivedStorageDataCenter) + if uploadErr != nil { + log.WithField("object", objectKey).Error("Failed to copy to embeddings bucket: ", uploadErr) + } + updateDcErr := c.Repo.AddNewDC(context.Background(), fileID, ente.Model(modelName), userID, size, destDC) + if updateDcErr != nil { + log.WithField("object", objectKey).Error("Failed to update dc in db: ", updateDcErr) + return } }() - return &obj, nil } diff --git a/server/pkg/repo/embedding/repository.go b/server/pkg/repo/embedding/repository.go index d2e73145a..1288512b3 100644 --- a/server/pkg/repo/embedding/repository.go +++ b/server/pkg/repo/embedding/repository.go @@ -3,6 +3,7 @@ package embedding import ( "context" "database/sql" + "errors" "fmt" "github.com/lib/pq" @@ -126,6 +127,22 @@ func (r *Repository) RemoveDatacenter(ctx context.Context, fileID int64, dc stri return nil } +// AddNewDC adds the dc name to the list of datacenters, if it doesn't exist already, for a given file, model and user. It also updates the size of the embedding +func (r *Repository) AddNewDC(ctx context.Context, fileID int64, model ente.Model, userID int64, size int, dc string) error { + res, err := r.DB.ExecContext(ctx, `UPDATE embeddings SET size = $1, datacenters = array_append(COALESCE(datacenters, ARRAY[]::s3region[]), $2::s3region) WHERE file_id = $3 AND model = $4 AND owner_id = $5`, size, dc, fileID, model, userID) + if err != nil { + return stacktrace.Propagate(err, "") + } + rowsAffected, err := res.RowsAffected() + if err != nil { + return stacktrace.Propagate(err, "") + } + if rowsAffected == 0 { + return stacktrace.Propagate(errors.New("no row got updated"), "") + } + return nil +} + func convertRowsToEmbeddings(rows *sql.Rows) ([]ente.Embedding, error) { defer func() { if err := rows.Close(); err != nil { From 08555954d2d2d602819f3edf2271213abbe96341 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Thu, 16 May 2024 16:10:51 +0530 Subject: [PATCH 24/30] Document --- server/pkg/controller/embedding/controller.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index df5301904..09e83a185 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -358,10 +358,9 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d // check if the error is due to object not found if s3Err, ok := err.(awserr.RequestFailure); ok { if s3Err.Code() == s3.ErrCodeNoSuchKey { - if c.derivedStorageDataCenter == c.S3Config.GetHotBackblazeDC() { - ctxLogger.Error("Object not found: ", s3Err) - return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "") - } else { + if c.derivedStorageDataCenter != c.S3Config.GetHotBackblazeDC() { + // todo:(neeraj) Refactor this later to get available the DC from the DB and use that to + // copy the object to currently active DC for derived storage // If derived and hot bucket are different, try to copy from hot bucket copyEmbeddingObject, err := c.copyEmbeddingObject(ctx, objectKey, c.S3Config.GetHotBackblazeDC(), c.derivedStorageDataCenter) if err == nil { @@ -371,6 +370,9 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d ctxLogger.WithError(err).Error("Failed to copy from hot bucket object") } return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "") + } else { + ctxLogger.Error("Object not found: ", s3Err) + return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "") } } } From 51138e92633987648559bfa73a4548b1edc4475c Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Thu, 16 May 2024 16:21:08 +0530 Subject: [PATCH 25/30] Increase initial timeout val for b2 dc --- server/pkg/controller/embedding/controller.go | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index 09e83a185..d4275d102 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -32,16 +32,18 @@ import ( const ( // maxEmbeddingDataSize is the min size of an embedding object in bytes minEmbeddingDataSize = 2048 - embeddingFetchTimeout = 15 * gTime.Second + embeddingFetchTimeout = 10 * gTime.Second ) // _fetchConfig is the configuration for the fetching objects from S3 type _fetchConfig struct { - RetryCount int - FetchTimeOut gTime.Duration + RetryCount int + InitialTimeout gTime.Duration + MaxTimeout gTime.Duration } -var _defaultFetchConfig = _fetchConfig{RetryCount: 3, FetchTimeOut: 15 * gTime.Second} +var _defaultFetchConfig = _fetchConfig{RetryCount: 3, InitialTimeout: 5 * gTime.Second, MaxTimeout: 30 * gTime.Second} +var _b2FetchConfig = _fetchConfig{RetryCount: 3, InitialTimeout: 15 * gTime.Second, MaxTimeout: 30 * gTime.Second} type Controller struct { Repo *embedding.Repository @@ -333,11 +335,20 @@ func (c *Controller) getEmbeddingObjectsParallelV2(userID int64, dbEmbeddingRows func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, dc string) (ente.EmbeddingObject, error) { opt := _defaultFetchConfig + if dc == c.S3Config.GetHotBackblazeDC() { + opt = _b2FetchConfig + } ctxLogger := log.WithField("objectKey", objectKey) totalAttempts := opt.RetryCount + 1 + timeout := opt.InitialTimeout for i := 0; i < totalAttempts; i++ { - // Create a new context with a timeout for each fetch - fetchCtx, cancel := context.WithTimeout(ctx, opt.FetchTimeOut) + if i > 0 { + timeout = timeout * 2 + if timeout > opt.MaxTimeout { + timeout = opt.MaxTimeout + } + } + fetchCtx, cancel := context.WithTimeout(ctx, timeout) select { case <-ctx.Done(): cancel() From 698ceca49e21a45a2868fadc1d1aa9387fb84de2 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Thu, 16 May 2024 16:21:55 +0530 Subject: [PATCH 26/30] Lint fix --- server/pkg/controller/embedding/delete.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/pkg/controller/embedding/delete.go b/server/pkg/controller/embedding/delete.go index da9f349ad..dd2027e42 100644 --- a/server/pkg/controller/embedding/delete.go +++ b/server/pkg/controller/embedding/delete.go @@ -90,7 +90,7 @@ func (c *Controller) deleteEmbedding(qItem repo.QueueItem) { } } } else { - ctxLogger.Info("Deleting from all datacenters %v", datacenters) + ctxLogger.Infof("Deleting from all datacenters %v", datacenters) } for i := range datacenters { From fd504612145e32c5393c4be239c9ae65c9e8b596 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Fri, 17 May 2024 10:52:00 +0530 Subject: [PATCH 27/30] Add dc in the log ctx --- server/pkg/controller/embedding/controller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index d4275d102..200fafc11 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -42,7 +42,7 @@ type _fetchConfig struct { MaxTimeout gTime.Duration } -var _defaultFetchConfig = _fetchConfig{RetryCount: 3, InitialTimeout: 5 * gTime.Second, MaxTimeout: 30 * gTime.Second} +var _defaultFetchConfig = _fetchConfig{RetryCount: 3, InitialTimeout: 10 * gTime.Second, MaxTimeout: 30 * gTime.Second} var _b2FetchConfig = _fetchConfig{RetryCount: 3, InitialTimeout: 15 * gTime.Second, MaxTimeout: 30 * gTime.Second} type Controller struct { @@ -338,7 +338,7 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d if dc == c.S3Config.GetHotBackblazeDC() { opt = _b2FetchConfig } - ctxLogger := log.WithField("objectKey", objectKey) + ctxLogger := log.WithField("objectKey", objectKey).WithField("dc", dc) totalAttempts := opt.RetryCount + 1 timeout := opt.InitialTimeout for i := 0; i < totalAttempts; i++ { From da155464fa3409a02b72da0faf38625224fc7ef7 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Fri, 17 May 2024 15:08:48 +0530 Subject: [PATCH 28/30] Remove updated_at trigger for embeddings table --- server/migrations/86_add_dc_embedding.down.sql | 16 ++++++++++++++++ server/migrations/86_add_dc_embedding.up.sql | 1 + 2 files changed, 17 insertions(+) diff --git a/server/migrations/86_add_dc_embedding.down.sql b/server/migrations/86_add_dc_embedding.down.sql index d9c271d84..b705b29b6 100644 --- a/server/migrations/86_add_dc_embedding.down.sql +++ b/server/migrations/86_add_dc_embedding.down.sql @@ -1,2 +1,18 @@ -- Add types for the new dcs that are introduced for the derived data ALTER TABLE embeddings DROP COLUMN IF EXISTS datacenters; + +DO +$$ + BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'update_embeddings_updated_at') THEN + CREATE TRIGGER update_embeddings_updated_at + BEFORE UPDATE + ON embeddings + FOR EACH ROW + EXECUTE PROCEDURE + trigger_updated_at_microseconds_column(); + ELSE + RAISE NOTICE 'Trigger update_embeddings_updated_at already exists.'; + END IF; + END +$$; \ No newline at end of file diff --git a/server/migrations/86_add_dc_embedding.up.sql b/server/migrations/86_add_dc_embedding.up.sql index 28f53d4d1..9d8e28ba7 100644 --- a/server/migrations/86_add_dc_embedding.up.sql +++ b/server/migrations/86_add_dc_embedding.up.sql @@ -1,3 +1,4 @@ -- Add types for the new dcs that are introduced for the derived data ALTER TYPE s3region ADD VALUE 'wasabi-eu-central-2-derived'; +DROP TRIGGER IF EXISTS update_embeddings_updated_at ON embeddings; ALTER TABLE embeddings ADD COLUMN IF NOT EXISTS datacenters s3region[] default '{b2-eu-cen}'; From e33d85412cde1da7e9c432792a295c74372d361a Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Fri, 17 May 2024 15:08:57 +0530 Subject: [PATCH 29/30] Fix query for add new DC --- server/pkg/repo/embedding/repository.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/server/pkg/repo/embedding/repository.go b/server/pkg/repo/embedding/repository.go index 1288512b3..d61134ecb 100644 --- a/server/pkg/repo/embedding/repository.go +++ b/server/pkg/repo/embedding/repository.go @@ -129,7 +129,14 @@ func (r *Repository) RemoveDatacenter(ctx context.Context, fileID int64, dc stri // AddNewDC adds the dc name to the list of datacenters, if it doesn't exist already, for a given file, model and user. It also updates the size of the embedding func (r *Repository) AddNewDC(ctx context.Context, fileID int64, model ente.Model, userID int64, size int, dc string) error { - res, err := r.DB.ExecContext(ctx, `UPDATE embeddings SET size = $1, datacenters = array_append(COALESCE(datacenters, ARRAY[]::s3region[]), $2::s3region) WHERE file_id = $3 AND model = $4 AND owner_id = $5`, size, dc, fileID, model, userID) + res, err := r.DB.ExecContext(ctx, ` + UPDATE embeddings + SET size = $1, + datacenters = CASE + WHEN $2::s3region = ANY(datacenters) THEN datacenters + ELSE array_append(datacenters, $2::s3region) + END + WHERE file_id = $3 AND model = $4 AND owner_id = $5`, size, dc, fileID, model, userID) if err != nil { return stacktrace.Propagate(err, "") } @@ -138,7 +145,7 @@ func (r *Repository) AddNewDC(ctx context.Context, fileID int64, model ente.Mode return stacktrace.Propagate(err, "") } if rowsAffected == 0 { - return stacktrace.Propagate(errors.New("no row got updated"), "") + return stacktrace.Propagate(errors.New("no row got updated"), "") } return nil } From 89b01f0a39d13890e536c87f20dba0a0379ca96c Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Fri, 17 May 2024 15:40:09 +0530 Subject: [PATCH 30/30] Query DB to get fallback DC --- server/pkg/controller/embedding/controller.go | 43 +++++++++++++------ server/pkg/repo/embedding/repository.go | 30 ++++++++++++- server/pkg/utils/s3config/s3config.go | 4 ++ 3 files changed, 61 insertions(+), 16 deletions(-) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index 200fafc11..6f3de3ca7 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -61,7 +61,7 @@ type Controller struct { } func New(repo *embedding.Repository, accessCtrl access.Controller, objectCleanupController *controller.ObjectCleanupController, s3Config *s3config.S3Config, queueRepo *repo.QueueRepository, taskLockingRepo *repo.TaskLockRepository, fileRepo *repo.FileRepository, collectionRepo *repo.CollectionRepository, hostName string) *Controller { - embeddingDcs := []string{s3Config.GetHotBackblazeDC(), s3Config.GetHotWasabiDC(), s3Config.GetDerivedStorageDataCenter()} + embeddingDcs := []string{s3Config.GetHotBackblazeDC(), s3Config.GetHotWasabiDC(), s3Config.GetWasabiDerivedDC(), s3Config.GetDerivedStorageDataCenter()} cache := make(map[string]*s3manager.Downloader, len(embeddingDcs)) for i := range embeddingDcs { s3Client := s3Config.GetS3Client(embeddingDcs[i]) @@ -369,22 +369,37 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d // check if the error is due to object not found if s3Err, ok := err.(awserr.RequestFailure); ok { if s3Err.Code() == s3.ErrCodeNoSuchKey { + var srcDc, destDc string + destDc = c.S3Config.GetDerivedStorageDataCenter() + // todo:(neeraj) Refactor this later to get available the DC from the DB instead of + // querying the DB. This will help in case of multiple DCs and avoid querying the DB + // for each object. + // For initial migration, as we know that original DC was b2, and if the embedding is not found + // in the new derived DC, we can try to fetch it from the B2 DC. if c.derivedStorageDataCenter != c.S3Config.GetHotBackblazeDC() { - // todo:(neeraj) Refactor this later to get available the DC from the DB and use that to - // copy the object to currently active DC for derived storage - // If derived and hot bucket are different, try to copy from hot bucket - copyEmbeddingObject, err := c.copyEmbeddingObject(ctx, objectKey, c.S3Config.GetHotBackblazeDC(), c.derivedStorageDataCenter) - if err == nil { - ctxLogger.Info("Got the object from hot bucket object") - return *copyEmbeddingObject, nil - } else { - ctxLogger.WithError(err).Error("Failed to copy from hot bucket object") - } - return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "") + // embeddings ideally should ideally be in the default hot bucket b2 + srcDc = c.S3Config.GetHotBackblazeDC() } else { - ctxLogger.Error("Object not found: ", s3Err) - return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "") + _, modelName, fileID := c.getEmbeddingObjectDetails(objectKey) + activeDcs, err := c.Repo.GetOtherDCsForFileAndModel(context.Background(), fileID, modelName, c.derivedStorageDataCenter) + if err != nil { + return ente.EmbeddingObject{}, stacktrace.Propagate(err, "failed to get other dc") + } + if len(activeDcs) > 0 { + srcDc = activeDcs[0] + } else { + ctxLogger.Error("Object not found in any dc ", s3Err) + return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "") + } } + copyEmbeddingObject, err := c.copyEmbeddingObject(ctx, objectKey, srcDc, destDc) + if err == nil { + ctxLogger.Infof("Got object from dc %s", srcDc) + return *copyEmbeddingObject, nil + } else { + ctxLogger.WithError(err).Errorf("Failed to get object from fallback dc %s", srcDc) + } + return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "") } } ctxLogger.Error("Failed to fetch object: ", err) diff --git a/server/pkg/repo/embedding/repository.go b/server/pkg/repo/embedding/repository.go index d61134ecb..5cfbd35c5 100644 --- a/server/pkg/repo/embedding/repository.go +++ b/server/pkg/repo/embedding/repository.go @@ -5,10 +5,9 @@ import ( "database/sql" "errors" "fmt" - "github.com/lib/pq" - "github.com/ente-io/museum/ente" "github.com/ente-io/stacktrace" + "github.com/lib/pq" "github.com/sirupsen/logrus" ) @@ -118,6 +117,33 @@ func (r *Repository) GetDatacenters(ctx context.Context, fileID int64) ([]string return datacenters, nil } +// GetOtherDCsForFileAndModel returns the list of datacenters where the embeddings are stored for a given file and model, excluding the ignoredDC +func (r *Repository) GetOtherDCsForFileAndModel(ctx context.Context, fileID int64, model string, ignoredDC string) ([]string, error) { + rows, err := r.DB.QueryContext(ctx, `SELECT datacenters FROM embeddings WHERE file_id = $1 AND model = $2`, fileID, model) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + uniqueDatacenters := make(map[string]bool) + for rows.Next() { + var datacenters []string + err = rows.Scan(pq.Array(&datacenters)) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + for _, dc := range datacenters { + // add to uniqueDatacenters if it is not the ignoredDC + if dc != ignoredDC { + uniqueDatacenters[dc] = true + } + } + } + datacenters := make([]string, 0, len(uniqueDatacenters)) + for dc := range uniqueDatacenters { + datacenters = append(datacenters, dc) + } + return datacenters, nil +} + // RemoveDatacenter removes the given datacenter from the list of datacenters func (r *Repository) RemoveDatacenter(ctx context.Context, fileID int64, dc string) error { _, err := r.DB.ExecContext(ctx, `UPDATE embeddings SET datacenters = array_remove(datacenters, $1) WHERE file_id = $2`, dc, fileID) diff --git a/server/pkg/utils/s3config/s3config.go b/server/pkg/utils/s3config/s3config.go index bd5496f48..a562e5181 100644 --- a/server/pkg/utils/s3config/s3config.go +++ b/server/pkg/utils/s3config/s3config.go @@ -202,6 +202,10 @@ func (config *S3Config) GetHotWasabiDC() string { return dcWasabiEuropeCentral_v3 } +func (config *S3Config) GetWasabiDerivedDC() string { + return dcWasabiEuropeCentralDerived +} + // Return the name of the cold Scaleway data center func (config *S3Config) GetColdScalewayDC() string { return dcSCWEuropeFrance_v3