Rename embedding dc to derived storage

This commit is contained in:
Neeraj Gupta 2024-05-15 16:37:23 +05:30
parent cc457eca98
commit 7eabea3884
3 changed files with 49 additions and 49 deletions

View file

@ -125,15 +125,15 @@ s3:
endpoint:
region:
bucket:
wasabi-eu-central-2-embeddings:
wasabi-eu-central-2-derived:
key:
secret:
endpoint:
region:
bucket:
# Embeddings bucket is used for storing embeddings and other derived data from a file.
# Derived storage bucket is used for storing derived data like embeddings, preview etc.
# By default, it is the same as the hot storage bucket.
# embeddings-bucket: wasabi-eu-central-2-embeddings
# derived-storage: wasabi-eu-central-2-derived
# If true, enable some workarounds to allow us to use a local minio instance
# for object storage.

View file

@ -36,35 +36,35 @@ const (
)
type Controller struct {
Repo *embedding.Repository
AccessCtrl access.Controller
ObjectCleanupController *controller.ObjectCleanupController
S3Config *s3config.S3Config
QueueRepo *repo.QueueRepository
TaskLockingRepo *repo.TaskLockRepository
FileRepo *repo.FileRepository
CollectionRepo *repo.CollectionRepository
HostName string
cleanupCronRunning bool
embeddingS3Client *s3.S3
embeddingBucket *string
areEmbeddingAndHotBucketSame bool
Repo *embedding.Repository
AccessCtrl access.Controller
ObjectCleanupController *controller.ObjectCleanupController
S3Config *s3config.S3Config
QueueRepo *repo.QueueRepository
TaskLockingRepo *repo.TaskLockRepository
FileRepo *repo.FileRepository
CollectionRepo *repo.CollectionRepository
HostName string
cleanupCronRunning bool
derivedStorageS3Client *s3.S3
derivedStorageBucket *string
areDerivedAndHotBucketSame bool
}
func New(repo *embedding.Repository, accessCtrl access.Controller, objectCleanupController *controller.ObjectCleanupController, s3Config *s3config.S3Config, queueRepo *repo.QueueRepository, taskLockingRepo *repo.TaskLockRepository, fileRepo *repo.FileRepository, collectionRepo *repo.CollectionRepository, hostName string) *Controller {
return &Controller{
Repo: repo,
AccessCtrl: accessCtrl,
ObjectCleanupController: objectCleanupController,
S3Config: s3Config,
QueueRepo: queueRepo,
TaskLockingRepo: taskLockingRepo,
FileRepo: fileRepo,
CollectionRepo: collectionRepo,
HostName: hostName,
embeddingS3Client: s3Config.GetEmbeddingsS3Client(),
embeddingBucket: s3Config.GetEmbeddingsBucket(),
areEmbeddingAndHotBucketSame: s3Config.GetEmbeddingsBucket() == s3Config.GetHotBucket(),
Repo: repo,
AccessCtrl: accessCtrl,
ObjectCleanupController: objectCleanupController,
S3Config: s3Config,
QueueRepo: queueRepo,
TaskLockingRepo: taskLockingRepo,
FileRepo: fileRepo,
CollectionRepo: collectionRepo,
HostName: hostName,
derivedStorageS3Client: s3Config.GetDerivedStorageS3Client(),
derivedStorageBucket: s3Config.GetDerivedStorageBucket(),
areDerivedAndHotBucketSame: s3Config.GetDerivedStorageBucket() == s3Config.GetHotBucket(),
}
}
@ -265,13 +265,13 @@ func (c *Controller) deleteEmbedding(qItem repo.QueueItem) {
}
prefix := c.getEmbeddingObjectPrefix(ownerID, fileID)
err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetEmbeddingsDataCenter())
err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetDerivedStorageDataCenter())
if err != nil {
ctxLogger.WithError(err).Error("Failed to delete all objects")
return
}
// if Embeddings DC is different from hot DC, delete from hot DC as well
if !c.areEmbeddingAndHotBucketSame {
if !c.areDerivedAndHotBucketSame {
err = c.ObjectCleanupController.DeleteAllObjectsWithPrefix(prefix, c.S3Config.GetHotDataCenter())
if err != nil {
ctxLogger.WithError(err).Error("Failed to delete all objects from hot DC")
@ -305,9 +305,9 @@ func (c *Controller) getEmbeddingObjectPrefix(userID int64, fileID int64) string
// uploadObject uploads the embedding object to the object store and returns the object size
func (c *Controller) uploadObject(obj ente.EmbeddingObject, key string) (int, error) {
embeddingObj, _ := json.Marshal(obj)
uploader := s3manager.NewUploaderWithClient(c.embeddingS3Client)
uploader := s3manager.NewUploaderWithClient(c.derivedStorageS3Client)
up := s3manager.UploadInput{
Bucket: c.embeddingBucket,
Bucket: c.derivedStorageBucket,
Key: &key,
Body: bytes.NewReader(embeddingObj),
}
@ -329,7 +329,7 @@ func (c *Controller) getEmbeddingObjectsParallel(objectKeys []string) ([]ente.Em
var wg sync.WaitGroup
var errs []error
embeddingObjects := make([]ente.EmbeddingObject, len(objectKeys))
downloader := s3manager.NewDownloaderWithClient(c.embeddingS3Client)
downloader := s3manager.NewDownloaderWithClient(c.derivedStorageS3Client)
for i, objectKey := range objectKeys {
wg.Add(1)
@ -366,7 +366,7 @@ type embeddingObjectResult struct {
func (c *Controller) getEmbeddingObjectsParallelV2(userID int64, dbEmbeddingRows []ente.Embedding) ([]embeddingObjectResult, error) {
var wg sync.WaitGroup
embeddingObjects := make([]embeddingObjectResult, len(dbEmbeddingRows))
downloader := s3manager.NewDownloaderWithClient(c.embeddingS3Client)
downloader := s3manager.NewDownloaderWithClient(c.derivedStorageS3Client)
for i, dbEmbeddingRow := range dbEmbeddingRows {
wg.Add(1)
@ -417,7 +417,7 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d
cancel()
return ente.EmbeddingObject{}, stacktrace.Propagate(ctx.Err(), "")
default:
obj, err := c.downloadObject(fetchCtx, objectKey, downloader, c.embeddingBucket)
obj, err := c.downloadObject(fetchCtx, objectKey, downloader, c.derivedStorageBucket)
cancel() // Ensure cancel is called to release resources
if err == nil {
if i > 0 {
@ -432,10 +432,10 @@ func (c *Controller) getEmbeddingObject(ctx context.Context, objectKey string, d
// check if the error is due to object not found
if s3Err, ok := err.(awserr.RequestFailure); ok {
if s3Err.Code() == s3.ErrCodeNoSuchKey {
if c.areEmbeddingAndHotBucketSame {
if c.areDerivedAndHotBucketSame {
ctxLogger.Error("Object not found: ", s3Err)
} else {
// If embedding and hot bucket are different, try to copy from hot bucket
// If derived and hot bucket are different, try to copy from hot bucket
copyEmbeddingObject, err := c.copyEmbeddingObject(ctx, objectKey)
if err == nil {
ctxLogger.Info("Got the object from hot bucket object")
@ -473,7 +473,7 @@ func (c *Controller) downloadObject(ctx context.Context, objectKey string, downl
// download the embedding object from hot bucket and upload to embeddings bucket
func (c *Controller) copyEmbeddingObject(ctx context.Context, objectKey string) (*ente.EmbeddingObject, error) {
if c.embeddingBucket == c.S3Config.GetHotBucket() {
if c.derivedStorageBucket == c.S3Config.GetHotBucket() {
return nil, stacktrace.Propagate(errors.New("embedding bucket and hot bucket are same"), "")
}
downloader := s3manager.NewDownloaderWithClient(c.S3Config.GetHotS3Client())

View file

@ -28,8 +28,8 @@ type S3Config struct {
hotDC string
// Secondary (hot) data center
secondaryHotDC string
// Bucket for storing ml embeddings & preview files
embeddingsDC string
//Derived data data center for derived files like ml embeddings & preview files
derivedStorageDC string
// A map from data centers to S3 configurations
s3Configs map[string]*aws.Config
// A map from data centers to pre-created S3 clients
@ -102,10 +102,10 @@ func (config *S3Config) initialize() {
config.secondaryHotDC = hs2
log.Infof("Hot storage: %s (secondary: %s)", hs1, hs2)
}
config.embeddingsDC = config.hotDC
embeddingsDC := viper.GetString("s3.embeddings-bucket")
config.derivedStorageDC = config.hotDC
embeddingsDC := viper.GetString("s3.derived-storage")
if embeddingsDC != "" && array.StringInList(embeddingsDC, dcs[:]) {
config.embeddingsDC = embeddingsDC
config.derivedStorageDC = embeddingsDC
log.Infof("Embeddings bucket: %s", embeddingsDC)
}
@ -180,15 +180,15 @@ func (config *S3Config) GetHotS3Client() *s3.S3 {
return &s3Client
}
func (config *S3Config) GetEmbeddingsDataCenter() string {
return config.embeddingsDC
func (config *S3Config) GetDerivedStorageDataCenter() string {
return config.derivedStorageDC
}
func (config *S3Config) GetEmbeddingsBucket() *string {
return config.GetBucket(config.embeddingsDC)
func (config *S3Config) GetDerivedStorageBucket() *string {
return config.GetBucket(config.derivedStorageDC)
}
func (config *S3Config) GetEmbeddingsS3Client() *s3.S3 {
s3Client := config.GetS3Client(config.embeddingsDC)
func (config *S3Config) GetDerivedStorageS3Client() *s3.S3 {
s3Client := config.GetS3Client(config.derivedStorageDC)
return &s3Client
}