diff --git a/cron.py b/cron.py index c509e372..47050890 100644 --- a/cron.py +++ b/cron.py @@ -6,8 +6,10 @@ from typing import List, Tuple import arrow import requests from sqlalchemy import func, desc, or_ +from sqlalchemy.ext.compiler import compiles from sqlalchemy.orm import joinedload from sqlalchemy.orm.exc import ObjectDeletedError +from sqlalchemy.sql import Insert from app import s3 from app.alias_utils import nb_email_log_for_mailbox @@ -543,13 +545,39 @@ nb_referred_user_upgrade: {stats_today.nb_referred_user_paid} - {increase_percen def migrate_domain_trash(): """Move aliases from global trash to domain trash if applicable""" - for deleted_alias in DeletedAlias.all(): + + # ignore duplicate when insert + # copied from https://github.com/sqlalchemy/sqlalchemy/issues/5374 + @compiles(Insert, "postgresql") + def postgresql_on_conflict_do_nothing(insert, compiler, **kw): + statement = compiler.visit_insert(insert, **kw) + # IF we have a "RETURNING" clause, we must insert before it + returning_position = statement.find("RETURNING") + if returning_position >= 0: + return ( + statement[:returning_position] + + "ON CONFLICT DO NOTHING " + + statement[returning_position:] + ) + else: + return statement + " ON CONFLICT DO NOTHING" + + sl_domains = [sl.domain for sl in SLDomain.all()] + count = 0 + domain_deleted_aliases = [] + deleted_alias_ids = [] + for deleted_alias in Session.query(DeletedAlias): + if count % 1000 == 0: + LOG.d("process %s", count) + + count += 1 + alias_domain = get_email_domain_part(deleted_alias.email) - if not SLDomain.get_by(domain=alias_domain): + if alias_domain not in sl_domains: custom_domain = CustomDomain.get_by(domain=alias_domain) if custom_domain: LOG.w("move %s to domain %s trash", deleted_alias, custom_domain) - Session.add( + domain_deleted_aliases.append( DomainDeletedAlias( user_id=custom_domain.user_id, email=deleted_alias.email, @@ -557,9 +585,17 @@ def migrate_domain_trash(): created_at=deleted_alias.created_at, ) ) - DeletedAlias.delete(deleted_alias.id) + deleted_alias_ids.append(deleted_alias.id) - Session.commit() + LOG.d("create %s DomainDeletedAlias", len(domain_deleted_aliases)) + Session.bulk_save_objects(domain_deleted_aliases) + + LOG.d("delete %s DeletedAlias", len(deleted_alias_ids)) + DeletedAlias.filter(DeletedAlias.id.in_(deleted_alias_ids)).delete( + synchronize_session=False + ) + + Session.commit() def set_custom_domain_for_alias():