import re2 as re from threading import Thread from flask import render_template, request, redirect, url_for, flash from flask_login import login_required, current_user from flask_wtf import FlaskForm from wtforms import StringField, validators, IntegerField from app.config import EMAIL_SERVERS_WITH_PRIORITY, EMAIL_DOMAIN from app.dashboard.base import dashboard_bp from app.dns_utils import ( get_mx_domains, get_spf_domain, get_txt_record, get_cname_record, ) from app.email_utils import send_email from app.extensions import db from app.log import LOG from app.models import ( CustomDomain, Alias, DomainDeletedAlias, Mailbox, DomainMailbox, AutoCreateRule, AutoCreateRuleMailbox, ) from app.utils import random_string @dashboard_bp.route("/domains//dns", methods=["GET", "POST"]) @login_required def domain_detail_dns(custom_domain_id): custom_domain: CustomDomain = CustomDomain.get(custom_domain_id) if not custom_domain or custom_domain.user_id != flash("You cannot see this page", "warning") return redirect(url_for("dashboard.index")) # generate a domain ownership txt token if needed if not custom_domain.ownership_verified and not custom_domain.ownership_txt_token: custom_domain.ownership_txt_token = random_string(30) db.session.commit() spf_record = f"v=spf1 include:{EMAIL_DOMAIN} ~all" # hardcode the DKIM selector here dkim_cname = f"dkim._domainkey.{EMAIL_DOMAIN}" dmarc_record = "v=DMARC1; p=quarantine; pct=100; adkim=s; aspf=s" mx_ok = spf_ok = dkim_ok = dmarc_ok = ownership_ok = True mx_errors = spf_errors = dkim_errors = dmarc_errors = ownership_errors = [] if request.method == "POST": if request.form.get("form-name") == "check-ownership": txt_records = get_txt_record(custom_domain.domain) if custom_domain.get_ownership_dns_txt_value() in txt_records: flash( "Domain ownership is verified. Please proceed to the other records setup", "success", ) custom_domain.ownership_verified = True db.session.commit() return redirect( url_for( "dashboard.domain_detail_dns",, _anchor="dns-setup", ) ) else: flash("We can't find the needed TXT record", "error") ownership_ok = False ownership_errors = txt_records elif request.form.get("form-name") == "check-mx": mx_domains = get_mx_domains(custom_domain.domain) if sorted(mx_domains) != sorted(EMAIL_SERVERS_WITH_PRIORITY): flash("The MX record is not correctly set", "warning") mx_ok = False # build mx_errors to show to user mx_errors = [ f"{priority} {domain}" for (priority, domain) in mx_domains ] else: flash( "Your domain can start receiving emails. You can now use it to create alias", "success", ) custom_domain.verified = True db.session.commit() return redirect( url_for( "dashboard.domain_detail_dns", ) ) elif request.form.get("form-name") == "check-spf": spf_domains = get_spf_domain(custom_domain.domain) if EMAIL_DOMAIN in spf_domains: custom_domain.spf_verified = True db.session.commit() flash("SPF is setup correctly", "success") return redirect( url_for( "dashboard.domain_detail_dns", ) ) else: custom_domain.spf_verified = False db.session.commit() flash( f"SPF: {EMAIL_DOMAIN} is not included in your SPF record.", "warning", ) spf_ok = False spf_errors = get_txt_record(custom_domain.domain) elif request.form.get("form-name") == "check-dkim": dkim_record = get_cname_record("dkim._domainkey." + custom_domain.domain) if dkim_record == dkim_cname: flash("DKIM is setup correctly.", "success") custom_domain.dkim_verified = True db.session.commit() return redirect( url_for( "dashboard.domain_detail_dns", ) ) else: custom_domain.dkim_verified = False db.session.commit() flash("DKIM: the CNAME record is not correctly set", "warning") dkim_ok = False dkim_errors = [dkim_record or "[Empty]"] elif request.form.get("form-name") == "check-dmarc": txt_records = get_txt_record("_dmarc." + custom_domain.domain) if dmarc_record in txt_records: custom_domain.dmarc_verified = True db.session.commit() flash("DMARC is setup correctly", "success") return redirect( url_for( "dashboard.domain_detail_dns", ) ) else: custom_domain.dmarc_verified = False db.session.commit() flash( "DMARC: The TXT record is not correctly set", "warning", ) dmarc_ok = False dmarc_errors = txt_records return render_template( "dashboard/domain_detail/dns.html", EMAIL_SERVERS_WITH_PRIORITY=EMAIL_SERVERS_WITH_PRIORITY, **locals(), ) @dashboard_bp.route("/domains//info", methods=["GET", "POST"]) @login_required def domain_detail(custom_domain_id): custom_domain: CustomDomain = CustomDomain.get(custom_domain_id) mailboxes = current_user.mailboxes() if not custom_domain or custom_domain.user_id != flash("You cannot see this page", "warning") return redirect(url_for("dashboard.index")) if request.method == "POST": if request.form.get("form-name") == "switch-catch-all": custom_domain.catch_all = not custom_domain.catch_all db.session.commit() if custom_domain.catch_all: flash( f"The catch-all has been enabled for {custom_domain.domain}", "success", ) else: flash( f"The catch-all has been disabled for {custom_domain.domain}", "warning", ) return redirect( url_for("dashboard.domain_detail", ) elif request.form.get("form-name") == "set-name": if request.form.get("action") == "save": = request.form.get("alias-name").replace("\n", "") db.session.commit() flash( f"Default alias name for Domain {custom_domain.domain} has been set", "success", ) else: = None db.session.commit() flash( f"Default alias name for Domain {custom_domain.domain} has been removed", "info", ) return redirect( url_for("dashboard.domain_detail", ) elif request.form.get("form-name") == "switch-random-prefix-generation": custom_domain.random_prefix_generation = ( not custom_domain.random_prefix_generation ) db.session.commit() if custom_domain.random_prefix_generation: flash( f"Random prefix generation has been enabled for {custom_domain.domain}", "success", ) else: flash( f"Random prefix generation has been disabled for {custom_domain.domain}", "warning", ) return redirect( url_for("dashboard.domain_detail", ) elif request.form.get("form-name") == "update": mailbox_ids = request.form.getlist("mailbox_ids") # check if mailbox is not tempered with mailboxes = [] for mailbox_id in mailbox_ids: mailbox = Mailbox.get(mailbox_id) if ( not mailbox or mailbox.user_id != or not mailbox.verified ): flash("Something went wrong, please retry", "warning") return redirect( url_for( "dashboard.domain_detail", ) ) mailboxes.append(mailbox) if not mailboxes: flash("You must select at least 1 mailbox", "warning") return redirect( url_for( "dashboard.domain_detail", ) ) # first remove all existing domain-mailboxes links DomainMailbox.query.filter_by( db.session.flush() for mailbox in mailboxes: DomainMailbox.create(, db.session.commit() flash(f"{custom_domain.domain} mailboxes has been updated", "success") return redirect( url_for("dashboard.domain_detail", ) elif request.form.get("form-name") == "delete": name = custom_domain.domain LOG.d("Schedule deleting %s", custom_domain) Thread(target=delete_domain, args=(custom_domain_id,)).start() flash( f"{name} scheduled for deletion." f"You will receive a confirmation email when the deletion is finished", "success", ) return redirect(url_for("dashboard.custom_domain")) nb_alias = Alias.filter_by( return render_template("dashboard/domain_detail/info.html", **locals()) def delete_domain(custom_domain_id: int): from server import create_light_app with create_light_app().app_context(): custom_domain = CustomDomain.get(custom_domain_id) if not custom_domain: return domain_name = custom_domain.domain user = custom_domain.user CustomDomain.delete( db.session.commit() LOG.d("Domain %s deleted", domain_name) send_email(, f"Your domain {domain_name} has been deleted", f"""Domain {domain_name} along with its aliases are deleted successfully. Regards, SimpleLogin team. """, ) @dashboard_bp.route("/domains//trash", methods=["GET", "POST"]) @login_required def domain_detail_trash(custom_domain_id): custom_domain = CustomDomain.get(custom_domain_id) if not custom_domain or custom_domain.user_id != flash("You cannot see this page", "warning") return redirect(url_for("dashboard.index")) if request.method == "POST": if request.form.get("form-name") == "empty-all": DomainDeletedAlias.filter_by( db.session.commit() flash("All deleted aliases can now be re-created", "success") return redirect( url_for( "dashboard.domain_detail_trash", ) ) elif request.form.get("form-name") == "remove-single": deleted_alias_id = request.form.get("deleted-alias-id") deleted_alias = DomainDeletedAlias.get(deleted_alias_id) if not deleted_alias or deleted_alias.domain_id != flash("Unknown error, refresh the page", "warning") return redirect( url_for( "dashboard.domain_detail_trash",, ) ) DomainDeletedAlias.delete( db.session.commit() flash( f"{} can now be re-created", "success", ) return redirect( url_for( "dashboard.domain_detail_trash", ) ) domain_deleted_aliases = DomainDeletedAlias.filter_by( ).all() return render_template( "dashboard/domain_detail/trash.html", domain_deleted_aliases=domain_deleted_aliases, custom_domain=custom_domain, ) class AutoCreateRuleForm(FlaskForm): regex = StringField( "regex", validators=[validators.DataRequired(), validators.Length(max=128)] ) order = IntegerField( "order", validators=[validators.DataRequired(), validators.NumberRange(min=0, max=100)], ) class AutoCreateTestForm(FlaskForm): local = StringField( "local part", validators=[validators.DataRequired(), validators.Length(max=128)] ) @dashboard_bp.route( "/domains//auto-create", methods=["GET", "POST"] ) @login_required def domain_detail_auto_create(custom_domain_id): custom_domain: CustomDomain = CustomDomain.get(custom_domain_id) mailboxes = current_user.mailboxes() new_auto_create_rule_form = AutoCreateRuleForm() auto_create_test_form = AutoCreateTestForm() auto_create_test_local, auto_create_test_result, auto_create_test_passed = ( "", "", False, ) if not custom_domain or custom_domain.user_id != flash("You cannot see this page", "warning") return redirect(url_for("dashboard.index")) if request.method == "POST": if request.form.get("form-name") == "create-auto-create-rule": if new_auto_create_rule_form.validate(): # make sure order isn't used before for auto_create_rule in custom_domain.auto_create_rules: auto_create_rule: AutoCreateRule if auto_create_rule.order == int( ): flash( "Another rule with the same order already exists", "error" ) break else: mailbox_ids = request.form.getlist("mailbox_ids") # check if mailbox is not tempered with mailboxes = [] for mailbox_id in mailbox_ids: mailbox = Mailbox.get(mailbox_id) if ( not mailbox or mailbox.user_id != or not mailbox.verified ): flash("Something went wrong, please retry", "warning") return redirect( url_for( "dashboard.domain_detail_auto_create",, ) ) mailboxes.append(mailbox) if not mailboxes: flash("You must select at least 1 mailbox", "warning") return redirect( url_for( "dashboard.domain_detail_auto_create",, ) ) try: re.compile( except: flash( f"Invalid regex {}", "error", ) return redirect( url_for( "dashboard.domain_detail_auto_create",, ) ) rule = AutoCreateRule.create(, order=int(,, flush=True, ) for mailbox in mailboxes: AutoCreateRuleMailbox.create(, ) db.session.commit() flash("New auto create rule has been created", "success") return redirect( url_for( "dashboard.domain_detail_auto_create",, ) ) elif request.form.get("form-name") == "delete-auto-create-rule": rule_id = request.form.get("rule-id") rule: AutoCreateRule = AutoCreateRule.get(int(rule_id)) if not rule or rule.custom_domain_id != flash("Something wrong, please retry", "error") return redirect( url_for( "dashboard.domain_detail_auto_create",, ) ) rule_order = rule.order AutoCreateRule.delete(rule_id) db.session.commit() flash(f"Rule #{rule_order} has been deleted", "success") return redirect( url_for( "dashboard.domain_detail_auto_create",, ) ) elif request.form.get("form-name") == "test-auto-create-rule": if auto_create_test_form.validate(): local = auto_create_test_local = local for rule in custom_domain.auto_create_rules: rule: AutoCreateRule regex = re.compile(rule.regex) if re.fullmatch(regex, local): auto_create_test_result = ( f"{local}@{custom_domain.domain} passes rule #{rule.order}" ) auto_create_test_passed = True break else: # no rule passes auto_create_test_result = ( f"{local}@{custom_domain.domain} doesn't pass any rule" ) return render_template( "dashboard/domain_detail/auto-create.html", **locals() ) return render_template("dashboard/domain_detail/auto-create.html", **locals())