From 6e204d828cfc11aff5fc682fad446bbb0b854c42 Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Thu, 16 May 2024 11:33:01 +0530 Subject: [PATCH] Delete derived data from all datacenters --- server/pkg/controller/embedding/controller.go | 47 +++++++++++++++---- server/pkg/repo/embedding/repository.go | 33 +++++++++++++ 2 files changed, 71 insertions(+), 9 deletions(-) diff --git a/server/pkg/controller/embedding/controller.go b/server/pkg/controller/embedding/controller.go index 6cf0d610f..fec968daf 100644 --- a/server/pkg/controller/embedding/controller.go +++ b/server/pkg/controller/embedding/controller.go @@ -264,33 +264,62 @@ func (c *Controller) deleteEmbedding(qItem repo.QueueItem) { return } prefix := c.getEmbeddingObjectPrefix(ownerID, fileID) - - err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetDerivedStorageDataCenter()) + datacenters, err := c.Repo.GetDatacenters(context.Background(), fileID) if err != nil { - ctxLogger.WithError(err).Error("Failed to delete all objects") + ctxLogger.WithError(err).Error("Failed to fetch datacenters") return } - // if Embeddings DC is different from hot DC, delete from hot DC as well - if !c.areDerivedAndHotBucketSame { - err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetHotDataCenter()) + // Ensure that the object are deleted from active derived storage dc. Ideally, this section should never be executed + // unless there's a bug in storing the DC or the service restarts before removing the rows from the table + // todo:(neeraj): remove this section after a few weeks of deployment + if len(datacenters) == 0 { + ctxLogger.Warn("No datacenters found for file, ensuring deletion from derived storage and hot DC") + err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetDerivedStorageDataCenter()) if err != nil { - ctxLogger.WithError(err).Error("Failed to delete all objects from hot DC") + ctxLogger.WithError(err).Error("Failed to delete all objects") return } + // if Embeddings DC is different from hot DC, delete from hot DC as well + if !c.areDerivedAndHotBucketSame { + err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetHotDataCenter()) + if err != nil { + ctxLogger.WithError(err).Error("Failed to delete all objects from hot DC") + return + } + } + } else { + ctxLogger.Info("Deleting from all datacenters %v", datacenters) + } + + for i := range datacenters { + err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, datacenters[i]) + if err != nil { + ctxLogger.WithError(err).Errorf("Failed to delete all objects from %s", datacenters[i]) + return + } else { + removeErr := c.Repo.RemoveDatacenter(context.Background(), fileID, datacenters[i]) + if removeErr != nil { + ctxLogger.WithError(removeErr).Error("Failed to remove datacenter from db") + return + } + } } + noDcs, noDcErr := c.Repo.GetDatacenters(context.Background(), fileID) + if len(noDcs) > 0 || noDcErr != nil { + ctxLogger.Errorf("Failed to delete from all datacenters %s", noDcs) + return + } err = c.Repo.Delete(fileID) if err != nil { ctxLogger.WithError(err).Error("Failed to remove from db") return } - err = c.QueueRepo.DeleteItem(repo.DeleteEmbeddingsQueue, qItem.Item) if err != nil { ctxLogger.WithError(err).Error("Failed to remove item from the queue") return } - ctxLogger.Info("Successfully deleted all embeddings") } diff --git a/server/pkg/repo/embedding/repository.go b/server/pkg/repo/embedding/repository.go index bcbe822e4..d2e73145a 100644 --- a/server/pkg/repo/embedding/repository.go +++ b/server/pkg/repo/embedding/repository.go @@ -93,6 +93,39 @@ func (r *Repository) Delete(fileID int64) error { return nil } +// GetDatacenters returns unique list of datacenters where derived embeddings are stored +func (r *Repository) GetDatacenters(ctx context.Context, fileID int64) ([]string, error) { + rows, err := r.DB.QueryContext(ctx, `SELECT datacenters FROM embeddings WHERE file_id = $1`, fileID) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + uniqueDatacenters := make(map[string]struct{}) + for rows.Next() { + var datacenters []string + err = rows.Scan(pq.Array(&datacenters)) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + for _, dc := range datacenters { + uniqueDatacenters[dc] = struct{}{} + } + } + datacenters := make([]string, 0, len(uniqueDatacenters)) + for dc := range uniqueDatacenters { + datacenters = append(datacenters, dc) + } + return datacenters, nil +} + +// RemoveDatacenter removes the given datacenter from the list of datacenters +func (r *Repository) RemoveDatacenter(ctx context.Context, fileID int64, dc string) error { + _, err := r.DB.ExecContext(ctx, `UPDATE embeddings SET datacenters = array_remove(datacenters, $1) WHERE file_id = $2`, dc, fileID) + if err != nil { + return stacktrace.Propagate(err, "") + } + return nil +} + func convertRowsToEmbeddings(rows *sql.Rows) ([]ente.Embedding, error) { defer func() { if err := rows.Close(); err != nil {