optimize migrate_domain_trash: bulk create and delete, keep track of progress

This commit is contained in:
Son 2022-01-06 18:30:46 +01:00
parent d1b9fb8bb5
commit 01cc9fe388

46
cron.py
View file

@ -6,8 +6,10 @@ from typing import List, Tuple
import arrow
import requests
from sqlalchemy import func, desc, or_
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.orm import joinedload
from sqlalchemy.orm.exc import ObjectDeletedError
from sqlalchemy.sql import Insert
from app import s3
from app.alias_utils import nb_email_log_for_mailbox
@ -543,13 +545,39 @@ nb_referred_user_upgrade: {stats_today.nb_referred_user_paid} - {increase_percen
def migrate_domain_trash():
"""Move aliases from global trash to domain trash if applicable"""
for deleted_alias in DeletedAlias.all():
# ignore duplicate when insert
# copied from https://github.com/sqlalchemy/sqlalchemy/issues/5374
@compiles(Insert, "postgresql")
def postgresql_on_conflict_do_nothing(insert, compiler, **kw):
statement = compiler.visit_insert(insert, **kw)
# IF we have a "RETURNING" clause, we must insert before it
returning_position = statement.find("RETURNING")
if returning_position >= 0:
return (
statement[:returning_position]
+ "ON CONFLICT DO NOTHING "
+ statement[returning_position:]
)
else:
return statement + " ON CONFLICT DO NOTHING"
sl_domains = [sl.domain for sl in SLDomain.all()]
count = 0
domain_deleted_aliases = []
deleted_alias_ids = []
for deleted_alias in Session.query(DeletedAlias):
if count % 1000 == 0:
LOG.d("process %s", count)
count += 1
alias_domain = get_email_domain_part(deleted_alias.email)
if not SLDomain.get_by(domain=alias_domain):
if alias_domain not in sl_domains:
custom_domain = CustomDomain.get_by(domain=alias_domain)
if custom_domain:
LOG.w("move %s to domain %s trash", deleted_alias, custom_domain)
Session.add(
domain_deleted_aliases.append(
DomainDeletedAlias(
user_id=custom_domain.user_id,
email=deleted_alias.email,
@ -557,9 +585,17 @@ def migrate_domain_trash():
created_at=deleted_alias.created_at,
)
)
DeletedAlias.delete(deleted_alias.id)
deleted_alias_ids.append(deleted_alias.id)
Session.commit()
LOG.d("create %s DomainDeletedAlias", len(domain_deleted_aliases))
Session.bulk_save_objects(domain_deleted_aliases)
LOG.d("delete %s DeletedAlias", len(deleted_alias_ids))
DeletedAlias.filter(DeletedAlias.id.in_(deleted_alias_ids)).delete(
synchronize_session=False
)
Session.commit()
def set_custom_domain_for_alias():