From 89b01f0a39d13890e536c87f20dba0a0379ca96c Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Fri, 17 May 2024 15:40:09 +0530 Subject: [PATCH] Query DB to get fallback DC --- server/pkg/controller/embedding/controller.go | 43 +++++++++++++------ server/pkg/repo/embedding/repository.go | 30 ++++++++++++- server/pkg/utils/s3config/s3config.go | 4 ++ 3 files changed, 61 insertions(+), 16 deletions(-) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index 200fafc11..6f3de3ca7 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -61,7 +61,7 @@ type Controller struct { } func New(repo *embedding.Repository, accessCtrl access.Controller, objectCleanupController *controller.ObjectCleanupController, s3Config *s3config.S3Config, queueRepo *repo.QueueRepository, taskLockingRepo *repo.TaskLockRepository, fileRepo *repo.FileRepository, collectionRepo *repo.CollectionRepository, hostName string) *Controller { - embeddingDcs := []string{s3Config.GetHotBackblazeDC(), s3Config.GetHotWasabiDC(), s3Config.GetDerivedStorageDataCenter()} + embeddingDcs := []string{s3Config.GetHotBackblazeDC(), s3Config.GetHotWasabiDC(), s3Config.GetWasabiDerivedDC(), s3Config.GetDerivedStorageDataCenter()} cache := make(map[string]*s3manager.Downloader, len(embeddingDcs)) for i := range embeddingDcs { s3Client := s3Config.GetS3Client(embeddingDcs[i]) @@ -369,22 +369,37 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d // check if the error is due to object not found if s3Err, ok := err.(awserr.RequestFailure); ok { if s3Err.Code() == s3.ErrCodeNoSuchKey { + var srcDc, destDc string + destDc = c.S3Config.GetDerivedStorageDataCenter() + // todo:(neeraj) Refactor this later to get available the DC from the DB instead of + // querying the DB. This will help in case of multiple DCs and avoid querying the DB + // for each object. + // For initial migration, as we know that original DC was b2, and if the embedding is not found + // in the new derived DC, we can try to fetch it from the B2 DC. if c.derivedStorageDataCenter != c.S3Config.GetHotBackblazeDC() { - // todo:(neeraj) Refactor this later to get available the DC from the DB and use that to - // copy the object to currently active DC for derived storage - // If derived and hot bucket are different, try to copy from hot bucket - copyEmbeddingObject, err := c.copyEmbeddingObject(ctx, objectKey, c.S3Config.GetHotBackblazeDC(), c.derivedStorageDataCenter) - if err == nil { - ctxLogger.Info("Got the object from hot bucket object") - return *copyEmbeddingObject, nil - } else { - ctxLogger.WithError(err).Error("Failed to copy from hot bucket object") - } - return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "") + // embeddings ideally should ideally be in the default hot bucket b2 + srcDc = c.S3Config.GetHotBackblazeDC() } else { - ctxLogger.Error("Object not found: ", s3Err) - return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "") + _, modelName, fileID := c.getEmbeddingObjectDetails(objectKey) + activeDcs, err := c.Repo.GetOtherDCsForFileAndModel(context.Background(), fileID, modelName, c.derivedStorageDataCenter) + if err != nil { + return ente.EmbeddingObject{}, stacktrace.Propagate(err, "failed to get other dc") + } + if len(activeDcs) > 0 { + srcDc = activeDcs[0] + } else { + ctxLogger.Error("Object not found in any dc ", s3Err) + return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "") + } } + copyEmbeddingObject, err := c.copyEmbeddingObject(ctx, objectKey, srcDc, destDc) + if err == nil { + ctxLogger.Infof("Got object from dc %s", srcDc) + return *copyEmbeddingObject, nil + } else { + ctxLogger.WithError(err).Errorf("Failed to get object from fallback dc %s", srcDc) + } + return ente.EmbeddingObject{}, stacktrace.Propagate(errors.New("object not found"), "") } } ctxLogger.Error("Failed to fetch object: ", err) diff --git a/server/pkg/repo/embedding/repository.go b/server/pkg/repo/embedding/repository.go index d61134ecb..5cfbd35c5 100644 --- a/server/pkg/repo/embedding/repository.go +++ b/server/pkg/repo/embedding/repository.go @@ -5,10 +5,9 @@ import ( "database/sql" "errors" "fmt" - "github.com/lib/pq" - "github.com/ente-io/museum/ente" "github.com/ente-io/stacktrace" + "github.com/lib/pq" "github.com/sirupsen/logrus" ) @@ -118,6 +117,33 @@ func (r *Repository) GetDatacenters(ctx context.Context, fileID int64) ([]string return datacenters, nil } +// GetOtherDCsForFileAndModel returns the list of datacenters where the embeddings are stored for a given file and model, excluding the ignoredDC +func (r *Repository) GetOtherDCsForFileAndModel(ctx context.Context, fileID int64, model string, ignoredDC string) ([]string, error) { + rows, err := r.DB.QueryContext(ctx, `SELECT datacenters FROM embeddings WHERE file_id = $1 AND model = $2`, fileID, model) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + uniqueDatacenters := make(map[string]bool) + for rows.Next() { + var datacenters []string + err = rows.Scan(pq.Array(&datacenters)) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + for _, dc := range datacenters { + // add to uniqueDatacenters if it is not the ignoredDC + if dc != ignoredDC { + uniqueDatacenters[dc] = true + } + } + } + datacenters := make([]string, 0, len(uniqueDatacenters)) + for dc := range uniqueDatacenters { + datacenters = append(datacenters, dc) + } + return datacenters, nil +} + // RemoveDatacenter removes the given datacenter from the list of datacenters func (r *Repository) RemoveDatacenter(ctx context.Context, fileID int64, dc string) error { _, err := r.DB.ExecContext(ctx, `UPDATE embeddings SET datacenters = array_remove(datacenters, $1) WHERE file_id = $2`, dc, fileID) diff --git a/server/pkg/utils/s3config/s3config.go b/server/pkg/utils/s3config/s3config.go index bd5496f48..a562e5181 100644 --- a/server/pkg/utils/s3config/s3config.go +++ b/server/pkg/utils/s3config/s3config.go @@ -202,6 +202,10 @@ func (config *S3Config) GetHotWasabiDC() string { return dcWasabiEuropeCentral_v3 } +func (config *S3Config) GetWasabiDerivedDC() string { + return dcWasabiEuropeCentralDerived +} + // Return the name of the cold Scaleway data center func (config *S3Config) GetColdScalewayDC() string { return dcSCWEuropeFrance_v3