import base64
import csv
import hashlib
import io
import mimetypes
import json
import os
import re
import secrets
import shutil
import subprocess
import sys
import tarfile
import tempfile
from contextlib import contextmanager
from dataclasses import dataclass
from ipaddress import ip_address, ip_network
from pathlib import Path, PurePosixPath
from time import time
from typing import Iterable
from urllib import error as urllib_error
from urllib import parse as urllib_parse
from urllib import request as urllib_request

from passlib.context import CryptContext
from pydantic import ValidationError

from . import models, schemas
from .db import SessionLocal
from .dns_providers import DNSProviderError, publish_dkim_record_for_domain, zone_relative_record_name
from .settings import get_settings
from .utils import hash_password, normalize_domain, remove_managed_dkim_key, sync_dkim_signing_maps

settings = get_settings()

SCRIPT_NAME_BY_COMMAND = {
    "api": "manage-api-credentials.sh",
    "backup": "manage-backups.sh",
    "bans": "manage-bans.sh",
    "firewall": "manage-firewall.sh",
    "limits": "manage-limits.sh",
    "queue": "manage-queue.sh",
    "settings": "manage-settings.sh",
    "ssl": "manage-ssl.sh",
}

LIMIT_ENV_MAP = {
    "postfix-client-connection-rate-limit": "LIMRISTEM_MAIL_POSTFIX_CLIENT_CONNECTION_RATE_LIMIT",
    "postfix-client-message-rate-limit": "LIMRISTEM_MAIL_POSTFIX_CLIENT_MESSAGE_RATE_LIMIT",
    "postfix-rate-time-unit": "LIMRISTEM_MAIL_POSTFIX_RATE_TIME_UNIT",
    "api-auth-fail-limit": "LIMRISTEM_MAIL_API_AUTH_FAIL_LIMIT",
    "api-auth-window-seconds": "LIMRISTEM_MAIL_API_AUTH_WINDOW_SECONDS",
    "api-auth-block-seconds": "LIMRISTEM_MAIL_API_AUTH_BLOCK_SECONDS",
    "rspamd-action-greylist": "LIMRISTEM_MAIL_RSPAMD_ACTION_GREYLIST",
    "rspamd-action-add-header": "LIMRISTEM_MAIL_RSPAMD_ACTION_ADD_HEADER",
    "rspamd-action-reject": "LIMRISTEM_MAIL_RSPAMD_ACTION_REJECT",
    "rspamd-greylist-delay": "LIMRISTEM_MAIL_RSPAMD_GREYLIST_DELAY",
    "rspamd-greylist-expire": "LIMRISTEM_MAIL_RSPAMD_GREYLIST_EXPIRE",
}

LIMIT_DEFAULT_MAP = {
    "postfix-client-connection-rate-limit": "30",
    "postfix-client-message-rate-limit": "100",
    "postfix-rate-time-unit": "60s",
    "api-auth-fail-limit": "5",
    "api-auth-window-seconds": "300",
    "api-auth-block-seconds": "900",
    "rspamd-action-greylist": "4",
    "rspamd-action-add-header": "6",
    "rspamd-action-reject": "15",
    "rspamd-greylist-delay": "5m",
    "rspamd-greylist-expire": "35d",
}

BACKUP_ENV_MAP = {
    "backup-enabled": "LIMRISTEM_MAIL_ENABLE_BACKUP_TIMER",
    "backup-type": "LIMRISTEM_MAIL_BACKUP_TYPE",
    "backup-full-weekday": "LIMRISTEM_MAIL_BACKUP_FULL_WEEKDAY",
    "backup-db-mode": "LIMRISTEM_MAIL_BACKUP_DB_MODE",
    "backup-db-root-password": "LIMRISTEM_MAIL_BACKUP_DB_ROOT_PASSWORD",
    "backup-local-dir": "LIMRISTEM_MAIL_BACKUP_LOCAL_DIR",
    "backup-retention-days": "LIMRISTEM_MAIL_BACKUP_RETENTION_DAYS",
    "backup-remote-targets": "LIMRISTEM_MAIL_BACKUP_REMOTE_TARGETS",
    "backup-oncalendar": "LIMRISTEM_MAIL_BACKUP_ONCALENDAR",
    "backup-compression": "LIMRISTEM_MAIL_BACKUP_COMPRESSION",
    "backup-include-redis": "LIMRISTEM_MAIL_BACKUP_INCLUDE_REDIS",
    "backup-storage-type": "LIMRISTEM_MAIL_BACKUP_STORAGE_TYPE",
    "backup-storage-remote-name": "LIMRISTEM_MAIL_BACKUP_STORAGE_REMOTE_NAME",
    "backup-storage-path": "LIMRISTEM_MAIL_BACKUP_STORAGE_PATH",
    "backup-storage-host": "LIMRISTEM_MAIL_BACKUP_STORAGE_HOST",
    "backup-storage-port": "LIMRISTEM_MAIL_BACKUP_STORAGE_PORT",
    "backup-storage-user": "LIMRISTEM_MAIL_BACKUP_STORAGE_USER",
    "backup-storage-password": "LIMRISTEM_MAIL_BACKUP_STORAGE_PASSWORD",
    "backup-storage-bucket": "LIMRISTEM_MAIL_BACKUP_STORAGE_BUCKET",
    "backup-storage-region": "LIMRISTEM_MAIL_BACKUP_STORAGE_REGION",
    "backup-storage-endpoint": "LIMRISTEM_MAIL_BACKUP_STORAGE_ENDPOINT",
    "backup-storage-access-key-id": "LIMRISTEM_MAIL_BACKUP_STORAGE_ACCESS_KEY_ID",
    "backup-storage-secret-access-key": "LIMRISTEM_MAIL_BACKUP_STORAGE_SECRET_ACCESS_KEY",
}

BACKUP_DEFAULT_MAP = {
    "backup-enabled": "yes",
    "backup-type": "auto",
    "backup-full-weekday": "Sun",
    "backup-db-mode": "logical",
    "backup-db-root-password": "",
    "backup-local-dir": "/var/backups/limristem-mail",
    "backup-retention-days": "14",
    "backup-remote-targets": "",
    "backup-oncalendar": "*-*-* 03:15:00",
    "backup-compression": "gzip",
    "backup-include-redis": "yes",
    "backup-storage-type": "local",
    "backup-storage-remote-name": "limristem-mail-backup",
    "backup-storage-path": "",
    "backup-storage-host": "",
    "backup-storage-port": "",
    "backup-storage-user": "",
    "backup-storage-password": "",
    "backup-storage-bucket": "",
    "backup-storage-region": "",
    "backup-storage-endpoint": "",
    "backup-storage-access-key-id": "",
    "backup-storage-secret-access-key": "",
}

SETTINGS_ENV_MAP = {
    "server-name": "LIMRISTEM_MAIL_SERVER_NAME",
    "panel-title": "LIMRISTEM_MAIL_PANEL_TITLE",
    "panel-favicon-path": "LIMRISTEM_MAIL_PANEL_FAVICON_PATH",
    "auto-updates-enabled": "LIMRISTEM_MAIL_ENABLE_AUTO_UPDATES",
}

SETTINGS_DEFAULT_MAP = {
    "server-name": "Limristem eMail",
    "panel-title": "Limristem eMail Admin Panel",
    "panel-favicon-path": "",
    "auto-updates-enabled": "no",
}

PENDING_PRIVATE_KEY_TTL_SECONDS = 30 * 60
PANEL_FAVICON_MAX_BYTES = 512 * 1024
UPDATE_RESPONSE_MAX_BYTES = 64 * 1024
UPDATE_CHECKSUM_MAX_BYTES = 4096
UPDATE_ARCHIVE_MAX_BYTES = 128 * 1024 * 1024
UPDATE_EXTRACTED_MAX_BYTES = 512 * 1024 * 1024
UPDATE_ARCHIVE_MAX_MEMBERS = 10000
UPDATE_CHECK_URL = os.getenv("LIMRISTEM_MAIL_UPDATE_CHECK_URL", "https://get.limristem.eu/mail/version.json").strip() or "https://get.limristem.eu/mail/version.json"
AUTO_UPDATE_TIMER_NAME = "limristem-mail-auto-update-check"
SAFE_RESOURCE_ID_RE = re.compile(r"^[a-z0-9][a-z0-9_.-]{0,63}$")
SAFE_REMOTE_NAME_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_-]{0,63}$")
SAFE_PRIVATE_KEY_TOKEN_RE = re.compile(r"^[a-z0-9][a-z0-9_.-]{0,63}-[a-f0-9]{24}\.pem$")
FAVICON_SIGNATURES = {
    ".ico": (b"\x00\x00\x01\x00",),
    ".png": (b"\x89PNG\r\n\x1a\n",),
    ".jpg": (b"\xff\xd8\xff",),
    ".jpeg": (b"\xff\xd8\xff",),
    ".gif": (b"GIF87a", b"GIF89a"),
    ".webp": (b"RIFF",),
}


class NoRedirectHandler(urllib_request.HTTPRedirectHandler):
    def redirect_request(self, req, fp, code, msg, headers, newurl):
        return None


NO_REDIRECT_OPENER = urllib_request.build_opener(NoRedirectHandler())
NO_REDIRECT_OPENER.addheaders = [("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36")]


FIREWALL_ENV_MAP = {
    "firewall-enabled": "LIMRISTEM_MAIL_FIREWALL_ENABLED",
    "firewall-rules-json": "LIMRISTEM_MAIL_FIREWALL_RULES",
    "firewall-allowed-tcp-ports": "LIMRISTEM_MAIL_FIREWALL_ALLOWED_TCP_PORTS",
    "firewall-allowed-udp-ports": "LIMRISTEM_MAIL_FIREWALL_ALLOWED_UDP_PORTS",
}

FIREWALL_DEFAULT_MAP = {
    "firewall-enabled": "yes",
    "firewall-allowed-tcp-ports": "22 25 80 110 143 443 465 587 993 995",
    "firewall-allowed-udp-ports": "",
    "firewall-rules-json": "",
}

API_HASH_CONTEXT = CryptContext(schemes=["argon2"], default="argon2", argon2__type="ID")
MONITORED_SERVICES = {
    "ssh": ("ssh", "sshd"),
    "mysql": ("mariadb", "mysql"),
    "redis": ("redis-server", "redis"),
    "dovecot": ("dovecot",),
    "postfix": ("postfix",),
    "rspamd": ("rspamd",),
    "nginx": ("nginx",),
    "api": ("limristem-mail",),
}


@dataclass
class BackendCommandError(Exception):
    detail: str
    returncode: int = 1

    def __str__(self) -> str:
        return self.detail


class FallbackToScript(Exception):
    pass


def _config_dir() -> Path:
    return Path(os.getenv("LIMRISTEM_MAIL_CONFIG_DIR", str(settings.base_dir / "config")))


def _managed_config_dir() -> Path:
    configured = Path(os.getenv("LIMRISTEM_MAIL_MANAGED_CONFIG_DIR", str(_config_dir())))
    etc_dir = Path("/etc/limristem-mail.d")
    if _is_writable(configured / ".limristem-mail-managed-config-probe"):
        return configured
    if _is_writable(etc_dir / ".limristem-mail-managed-config-probe"):
        return etc_dir
    return configured


def _resolve_main_env_file() -> Path:
    if os.getenv("LIMRISTEM_MAIL_ENV_FILE"):
        return Path(os.environ["LIMRISTEM_MAIL_ENV_FILE"])
    default_file = _config_dir() / "limristem-mail.env"
    etc_file = Path("/etc/limristem-mail.env")
    if _is_writable(default_file):
        return default_file
    if _is_writable(etc_file):
        return etc_file
    return default_file


def _resolve_backup_env_file() -> Path:
    if os.getenv("LIMRISTEM_MAIL_BACKUP_ENV_FILE"):
        return Path(os.environ["LIMRISTEM_MAIL_BACKUP_ENV_FILE"])
    default_file = _config_dir() / "limristem-mail-backup.env"
    etc_file = Path("/etc/limristem-mail-backup.env")
    if _resolve_main_env_file() == Path("/etc/limristem-mail.env") and _is_writable(etc_file):
        return etc_file
    if _is_writable(default_file):
        return default_file
    if _is_writable(etc_file):
        return etc_file
    return default_file


def _resolve_rclone_config_file() -> Path:
    if _resolve_main_env_file() == Path("/etc/limristem-mail.env"):
        return Path("/etc/limristem-mail-rclone.conf")
    return _config_dir() / "rclone-backup.conf"


def _ssl_default_paths(env: dict[str, str]) -> tuple[str, str, str]:
    hostname = (env.get("LIMRISTEM_MAIL_HOSTNAME") or settings.hostname).strip().rstrip(".").lower()
    mode = (env.get("LIMRISTEM_MAIL_SSL_MODE") or settings.ssl_mode or "selfsigned").strip().lower()
    if mode == "manual":
        cert_path = env.get("LIMRISTEM_MAIL_TLS_CERT_PATH", "/etc/ssl/limristem-mail/manual.crt")
        key_path = env.get("LIMRISTEM_MAIL_TLS_KEY_PATH", "/etc/ssl/limristem-mail/manual.key")
    elif mode == "letsencrypt":
        cert_path = env.get("LIMRISTEM_MAIL_TLS_CERT_PATH", f"/etc/letsencrypt/live/{hostname}/fullchain.pem")
        key_path = env.get("LIMRISTEM_MAIL_TLS_KEY_PATH", f"/etc/letsencrypt/live/{hostname}/privkey.pem")
    elif mode == "plain":
        cert_path = env.get("LIMRISTEM_MAIL_TLS_CERT_PATH", "")
        key_path = env.get("LIMRISTEM_MAIL_TLS_KEY_PATH", "")
    else:
        cert_path = env.get("LIMRISTEM_MAIL_TLS_CERT_PATH", "/etc/ssl/limristem-mail/limristem-mail.crt")
        key_path = env.get("LIMRISTEM_MAIL_TLS_KEY_PATH", "/etc/ssl/limristem-mail/limristem-mail.key")
    return mode, cert_path, key_path


def _backup_state_paths() -> dict[str, Path]:
    state_dir = _managed_config_dir()
    return {
        "state_dir": state_dir,
        "schedules": state_dir / "backup-schedules.json",
        "storages": state_dir / "backup-storages.json",
        "keys_dir": state_dir / "backup-storage-keys",
        "pending_keys_dir": state_dir / "backup-storage-private",
    }


def _settings_state_paths() -> dict[str, Path]:
    state_dir = _managed_config_dir()
    return {
        "state_dir": state_dir,
        "update_state": state_dir / "auto-update-state.json",
        "favicon_dir": state_dir / "panel-assets",
    }


def _manual_ban_file() -> Path:
    return _managed_config_dir() / "fail2ban-manual-bans.tsv"


def _scripts_dir() -> Path:
    return settings.runtime_bin_dir


def _script_path(command_name: str) -> Path:
    return _scripts_dir() / SCRIPT_NAME_BY_COMMAND[command_name]


def _decode_env_value(value: str) -> str:
    if len(value) >= 2 and value[0] == '"' and value[-1] == '"':
        inner = value[1:-1]
        sentinel_cr = "__LIMRISTEM_MAIL_ESCAPED_CR__"
        sentinel_nl = "__LIMRISTEM_MAIL_ESCAPED_NL__"
        inner = inner.replace(r"\`", "`")
        inner = inner.replace(r"\$", "$")
        inner = inner.replace(r'\"', '"')
        inner = inner.replace(r"\r", sentinel_cr)
        inner = inner.replace(r"\n", sentinel_nl)
        inner = inner.replace(r"\\", "\\")
        inner = inner.replace(sentinel_cr, "\r")
        inner = inner.replace(sentinel_nl, "\n")
        return inner
    if len(value) >= 2 and value[0] == "'" and value[-1] == "'":
        return value[1:-1]
    return value


def _quote_env_value(value: str) -> str:
    rendered = value.replace("\n", "__LIMRISTEM_MAIL_ESCAPED_NL__").replace("\r", "__LIMRISTEM_MAIL_ESCAPED_CR__")
    rendered = rendered.replace("\\", r"\\")
    rendered = rendered.replace('"', r'\"')
    rendered = rendered.replace("$", r"\$")
    rendered = rendered.replace("`", r"\`")
    rendered = rendered.replace("__LIMRISTEM_MAIL_ESCAPED_NL__", r"\n")
    rendered = rendered.replace("__LIMRISTEM_MAIL_ESCAPED_CR__", r"\r")
    return f'"{rendered}"'


def _strip_matching_quotes(value: str) -> str:
    normalized = value.strip()
    if len(normalized) >= 2 and normalized[0] == normalized[-1] and normalized[0] in {'"', "'"}:
        return normalized[1:-1].strip()
    return normalized


def _load_env_file(path: Path) -> dict[str, str]:
    data: dict[str, str] = {}
    if not path.exists():
        return data
    for line in path.read_text(encoding="utf-8").splitlines():
        if not line or line.lstrip().startswith("#") or "=" not in line:
            continue
        key, value = line.split("=", 1)
        data[key.strip()] = _decode_env_value(value.strip())
    return data


def _load_merged_env(*paths: Path) -> dict[str, str]:
    data: dict[str, str] = {}
    for path in paths:
        data.update(_load_env_file(path))
    return data


def _ensure_parent(path: Path) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)


def _upsert_env_value(path: Path, key: str, value: str) -> None:
    _ensure_parent(path)
    lines: list[str] = []
    found = False
    if path.exists():
        for raw_line in path.read_text(encoding="utf-8").splitlines():
            if raw_line.startswith(f"{key}="):
                lines.append(f"{key}={_quote_env_value(value)}")
                found = True
            else:
                lines.append(raw_line)
    if not found:
        lines.append(f"{key}={_quote_env_value(value)}")
    path.write_text("\n".join(lines).rstrip("\n") + "\n", encoding="utf-8")
    path.chmod(0o660)


def _is_writable(path: Path) -> bool:
    target = path if path.is_dir() else path.parent
    try:
        target.mkdir(parents=True, exist_ok=True)
    except Exception:
        return False
    probe = target / f".limristem-mail-write-test-{secrets.token_hex(4)}"
    try:
        probe.write_text("", encoding="utf-8")
        return True
    except Exception:
        return False
    finally:
        try:
            probe.unlink(missing_ok=True)
        except Exception:
            pass


def _run_external(command: list[str], *, input_text: str | None = None, check: bool = True) -> subprocess.CompletedProcess[str]:
    try:
        return subprocess.run(
            command,
            input=input_text,
            capture_output=True,
            text=True,
            check=check,
        )
    except FileNotFoundError as exc:
        raise BackendCommandError(f"Required command not found: {command[0]}") from exc
    except subprocess.CalledProcessError as exc:
        detail = (exc.stderr or exc.stdout or "").strip() or f"{' '.join(command)} failed with status {exc.returncode}"
        raise BackendCommandError(detail, exc.returncode) from exc


def _run_external_best_effort(command: list[str], *, input_text: str | None = None) -> None:
    try:
        subprocess.run(command, input=input_text, capture_output=True, text=True, check=False)
    except FileNotFoundError:
        return


def _run_script_fallback(command_name: str, *args: str) -> str:
    script_path = _script_path(command_name)
    if not script_path.exists():
        raise BackendCommandError(f"Unknown limristem-mail command: {command_name}")
    result = _run_external([str(script_path), *args])
    return result.stdout


def _json_default(value):
    if hasattr(value, "isoformat"):
        return value.isoformat()
    raise TypeError(f"Object of type {type(value).__name__} is not JSON serializable")


@contextmanager
def _db_session(operation: str = "Database operation"):
    db = SessionLocal()
    try:
        yield db
    except BackendCommandError:
        raise
    except Exception as exc:
        raise BackendCommandError(f"{operation} failed: {exc}") from exc
    finally:
        db.close()


def _parse_options(tokens: list[str]) -> dict[str, str | bool]:
    options: dict[str, str | bool] = {}
    index = 0
    while index < len(tokens):
        token = tokens[index]
        if not token.startswith("--"):
            raise BackendCommandError(f"Unexpected argument: {token}")
        key = token[2:]
        if key in {"json", "zone", "generate-dkim", "inactive", "enable-sync", "disable-sync", "clear-api-token"}:
            options[key] = True
            index += 1
            continue
        if index + 1 >= len(tokens):
            raise BackendCommandError(f"Missing value for option: {token}")
        options[key] = tokens[index + 1]
        index += 2
    return options


def _split_positionals_and_options(args: tuple[str, ...]) -> tuple[list[str], dict[str, str | bool]]:
    tokens = list(args)
    option_start = next((index for index, token in enumerate(tokens) if token.startswith("--")), len(tokens))
    return tokens[:option_start], _parse_options(tokens[option_start:])


def _serialize_domain(domain: models.Domain) -> dict[str, object]:
    return {
        "id": domain.id,
        "name": domain.name,
        "is_active": domain.is_active,
        "max_users": domain.max_users,
        "dkim_selector": domain.dkim_selector,
        "dkim_public_key": domain.dkim_public_key,
        "dkim_private_path": domain.dkim_private_path,
        "dns_provider": domain.dns_provider,
        "dns_account_id": domain.dns_account_id,
        "dns_zone_id": domain.dns_zone_id,
        "dns_sync_enabled": domain.dns_sync_enabled,
        "dns_last_sync_at": domain.dns_last_sync_at,
        "dns_last_sync_status": domain.dns_last_sync_status,
        "dns_has_api_token": domain.dns_has_api_token,
        "dmarc_policy": domain.dmarc_policy,
        "created_at": domain.created_at,
    }


def _serialize_account(account: models.Account) -> dict[str, object]:
    domain_name = account.domain.name if account.domain else account.username.split("@", 1)[-1]
    return {
        "id": account.id,
        "email": f"{account.local_part}@{domain_name}",
        "domain_id": account.domain_id,
        "quota_mb": account.quota_mb,
        "is_active": account.is_active,
        "created_at": account.created_at,
    }


def _render_json(payload) -> str:
    return json.dumps(payload, default=_json_default)


def _dns_payload_for_domain(
    domain_name: str,
    *,
    dmarc_policy: str = "reject",
    dkim_selector: str = "default",
    dkim_public_key: str | None = None,
) -> dict[str, str]:
    mx_host = settings.hostname
    mail_host_record = f"mail.{domain_name}"
    spf = f"v=spf1 mx a:{mx_host} -all"
    dmarc = f"v=DMARC1; p={dmarc_policy}; adkim=s; aspf=s; pct=100; rua=mailto:postmaster@{domain_name}"
    dkim = dkim_public_key or "Generate a DKIM key to populate this record"
    payload = {
        "mx": f"{domain_name} IN MX 10 {mx_host}.",
        **({"mail": f"{mail_host_record} IN CNAME {mx_host}."} if mail_host_record != mx_host else {}),
        "spf": f'{domain_name} IN TXT "{spf}"',
        "dmarc": f'_dmarc.{domain_name} IN TXT "{dmarc}"',
        "dkim": f'{dkim_selector}._domainkey.{domain_name} IN TXT "{dkim}"',
        "jmap": f"_jmap._tcp.{domain_name} IN SRV 0 1 443 {mx_host}.",
        "imap": f"_imap._tcp.{domain_name} IN SRV 0 1 143 {mx_host}.",
        "imaps": f"_imaps._tcp.{domain_name} IN SRV 0 1 993 {mx_host}.",
        "pop3": f"_pop3._tcp.{domain_name} IN SRV 0 1 110 {mx_host}.",
        "pop3s": f"_pop3s._tcp.{domain_name} IN SRV 0 1 995 {mx_host}.",
        "submission": f"_submission._tcp.{domain_name} IN SRV 0 1 587 {mx_host}.",
        "submissions": f"_submissions._tcp.{domain_name} IN SRV 0 1 465 {mx_host}.",
        "autoconfig": f"autoconfig.{domain_name} IN CNAME {mx_host}.",
        "autodiscover": f"autodiscover.{domain_name} IN CNAME {mx_host}.",
        "arc": "ARC enabled via Rspamd (no DNS record needed). Keep DKIM signing active for forwarded mail.",
        "ptr": f"Set the reverse DNS of the public mail IP to {mx_host}.",
    }
    if settings.enable_srs and settings.srs_domain and settings.srs_domain != domain_name and domain_name == settings.primary_domain:
        payload["srs_spf"] = f'{settings.srs_domain} IN TXT "{spf}"'
    if settings.enable_mta_sts and domain_name == settings.primary_domain:
        if settings.mta_sts_host != mx_host:
            payload["mta_sts_host"] = f"{settings.mta_sts_host} IN CNAME {mx_host}."
        payload["mta_sts"] = f'_mta-sts.{domain_name} IN TXT "v=STSv1; id={settings.mta_sts_id}"'
        payload["mta_sts_policy"] = f"https://{settings.mta_sts_host}/.well-known/mta-sts.txt"
    if settings.enable_tls_rpt and domain_name == settings.primary_domain:
        payload["tls_rpt"] = f'_smtp._tls.{domain_name} IN TXT "v=TLSRPTv1; rua=mailto:{settings.tls_rpt_mailbox}"'
    return payload


def _dns_export_rows(dns_records: dict[str, str]) -> list[dict[str, object]]:
    rows: list[dict[str, object]] = []
    for key in (
        "mx",
        "mail",
        "spf",
        "srs_spf",
        "dmarc",
        "dkim",
        "jmap",
        "imap",
        "imaps",
        "pop3",
        "pop3s",
        "submission",
        "submissions",
        "autoconfig",
        "autodiscover",
        "mta_sts_host",
        "mta_sts",
        "tls_rpt",
    ):
        raw_value = dns_records.get(key)
        if not raw_value or " IN " not in raw_value:
            continue
        name, _, remainder = raw_value.partition(" IN ")
        record_type, _, content = remainder.partition(" ")
        row: dict[str, object] = {
            "key": key,
            "name": name,
            "type": record_type,
            "content": content.strip(),
            "raw": raw_value,
        }
        if record_type == "MX":
            priority, _, target = content.strip().partition(" ")
            if priority.isdigit():
                row["priority"] = int(priority)
                row["content"] = target.strip()
        rows.append(row)
    return rows


def _ssl_show(as_json: bool) -> str:
    try:
        env = _load_merged_env(_resolve_main_env_file())
    except (OSError, RuntimeError, ValueError):
        env = {}
    mode, cert_path, key_path = _ssl_default_paths(env)
    hostname = (env.get("LIMRISTEM_MAIL_HOSTNAME") or settings.hostname).strip().rstrip(".").lower()
    primary_domain = (env.get("LIMRISTEM_MAIL_PRIMARY_DOMAIN") or settings.primary_domain).strip().rstrip(".").lower()
    le_email = env.get("LIMRISTEM_MAIL_LE_EMAIL", f"postmaster@{primary_domain}")
    payload = {
        "mode": mode,
        "hostname": hostname,
        "le_email": le_email,
        "cert_path": cert_path,
        "key_path": key_path,
        "cert_exists": bool(cert_path) and Path(cert_path).is_file(),
        "key_exists": bool(key_path) and Path(key_path).is_file(),
        "nginx_enabled": (env.get("LIMRISTEM_MAIL_ENABLE_NGINX", "no").strip().lower() == "yes"),
        "mta_sts_enabled": (env.get("LIMRISTEM_MAIL_ENABLE_MTA_STS", "yes").strip().lower() == "yes"),
    }
    payload["status"] = "ready" if payload["cert_exists"] and payload["key_exists"] else ("plain" if mode == "plain" else "missing")
    if as_json:
        return _render_json(payload)
    return (
        f"mode={payload['mode']}\n"
        f"hostname={payload['hostname']}\n"
        f"le_email={payload['le_email']}\n"
        f"cert_path={payload['cert_path']}\n"
        f"key_path={payload['key_path']}\n"
        f"status={payload['status']}\n"
    )


def _dns_zone_export(domain_name: str, dns_records: dict[str, str]) -> str:
    lines = [f"$ORIGIN {domain_name}.", "$TTL 3600"]
    for row in _dns_export_rows(dns_records):
        owner = zone_relative_record_name(domain_name, str(row["name"]))
        if row["type"] == "MX" and "priority" in row:
            lines.append(f"{owner} IN MX {row['priority']} {row['content']}")
        else:
            lines.append(f"{owner} IN {row['type']} {row['content']}")
    for key, prefix in (("arc", "; "), ("ptr", "; "), ("mta_sts_policy", "; MTA-STS policy URL: ")):
        if dns_records.get(key):
            lines.append(f"{prefix}{dns_records[key]}")
    return "\n".join(lines) + "\n"


def _dns_json_export(domain_name: str, dns_records: dict[str, str]) -> dict[str, object]:
    return {
        "domain": domain_name,
        "records": _dns_export_rows(dns_records),
        "notes": {
            key: dns_records[key]
            for key in ("arc", "ptr", "mta_sts_policy")
            if dns_records.get(key)
        },
    }


def _show_domain_dns(domain_name: str, *, as_json: bool, as_zone: bool) -> str:
    normalized_name = normalize_domain(domain_name)
    domain_record: models.Domain | None = None
    try:
        with _db_session("DNS lookup") as db:
            domain_record = db.query(models.Domain).filter(models.Domain.name == normalized_name).first()
    except BackendCommandError:
        domain_record = None
    dns_records = _dns_payload_for_domain(
        normalized_name,
        dmarc_policy=domain_record.dmarc_policy if domain_record else "reject",
        dkim_selector=domain_record.dkim_selector if domain_record else "default",
        dkim_public_key=domain_record.dkim_public_key if domain_record else None,
    )
    if as_zone:
        return _dns_zone_export(normalized_name, dns_records)
    if as_json:
        return _render_json(_dns_json_export(normalized_name, dns_records))
    return "\n".join(f"{key}: {value}" for key, value in dns_records.items()) + "\n"


def _set_domain_dns_status(domain: models.Domain, status: str) -> None:
    domain.dns_last_sync_at = models.utc_now()
    domain.dns_last_sync_status = status[:512]


def _configure_domain_dns(args: tuple[str, ...]) -> str:
    positionals, options = _split_positionals_and_options(args)
    if not positionals:
        raise BackendCommandError(
            "Usage: limristem-mail dns configure-cloudflare <domain> --account-id ACCOUNT --zone-id ZONE --api-token TOKEN [--enable-sync] [--json]"
        )
    domain_name = normalize_domain(positionals[0])
    payload = schemas.DomainDnsProviderUpdate(
        dns_provider="cloudflare",
        dns_account_id=str(options.get("account-id", "") or "") or None,
        dns_zone_id=str(options.get("zone-id", "") or "") or None,
        dns_api_token=str(options.get("api-token", "") or "") or None,
        dns_sync_enabled=True if options.get("enable-sync") else (False if options.get("disable-sync") else None),
        clear_api_token=bool(options.get("clear-api-token")),
    )
    with _db_session("DNS provider configuration") as db:
        domain = db.query(models.Domain).filter(models.Domain.name == domain_name).first()
        if not domain:
            raise BackendCommandError("Domain not found")
        if payload.dns_zone_id is not None:
            domain.dns_zone_id = payload.dns_zone_id
        if payload.dns_account_id is not None:
            domain.dns_account_id = payload.dns_account_id
        if payload.clear_api_token:
            domain.dns_api_token = None
        elif payload.dns_api_token:
            from .crypto import encrypt_token
            domain.dns_api_token = encrypt_token(payload.dns_api_token)
        domain.dns_provider = "cloudflare"
        if payload.dns_sync_enabled is not None:
            domain.dns_sync_enabled = payload.dns_sync_enabled
        if domain.dns_sync_enabled and (not domain.dns_account_id or not domain.dns_zone_id or not domain.dns_api_token):
            raise BackendCommandError("Cloudflare account ID, zone ID and API token are required before enabling DNS sync")
        db.commit()
        db.refresh(domain)
        payload_out = _serialize_domain(domain)
    if options.get("json"):
        return _render_json(payload_out)
    return f"Configured Cloudflare DNS for {payload_out['name']}\n"


def _disable_domain_dns(args: tuple[str, ...]) -> str:
    positionals, options = _split_positionals_and_options(args)
    if len(positionals) != 1:
        raise BackendCommandError("Usage: limristem-mail dns disable <domain> [--json]")
    domain_name = normalize_domain(positionals[0])
    with _db_session("DNS provider disable") as db:
        domain = db.query(models.Domain).filter(models.Domain.name == domain_name).first()
        if not domain:
            raise BackendCommandError("Domain not found")
        domain.dns_provider = None
        domain.dns_zone_id = None
        domain.dns_api_token = None
        domain.dns_sync_enabled = False
        domain.dns_last_sync_status = "disabled"
        db.commit()
        db.refresh(domain)
        payload_out = _serialize_domain(domain)
    if options.get("json"):
        return _render_json(payload_out)
    return f"Disabled DNS provider for {payload_out['name']}\n"


def _sync_domain_dkim_dns(args: tuple[str, ...]) -> str:
    positionals, options = _split_positionals_and_options(args)
    if len(positionals) != 1:
        raise BackendCommandError("Usage: limristem-mail dns sync-dkim <domain> [--json]")
    domain_name = normalize_domain(positionals[0])
    with _db_session("DKIM DNS sync") as db:
        domain = db.query(models.Domain).filter(models.Domain.name == domain_name).first()
        if not domain:
            raise BackendCommandError("Domain not found")
        try:
            change = publish_dkim_record_for_domain(domain)
        except DNSProviderError as exc:
            _set_domain_dns_status(domain, f"error: {exc}")
            db.commit()
            raise BackendCommandError(str(exc)) from exc
        _set_domain_dns_status(domain, f"ok: {change.action} {change.name}")
        db.commit()
        result = {"status": "ok", "record": change.as_dict()}
    if options.get("json"):
        return _render_json(result)
    return f"{result['record']['action']} {result['record']['name']} on cloudflare\n"


def _add_domain(args: tuple[str, ...]) -> str:
    positionals, options = _split_positionals_and_options(args)
    if len(positionals) != 1:
        raise BackendCommandError(
            "Usage: limristem-mail add domain <domain>\n"
            "  [--max-users N] [--dmarc-policy POLICY]\n"
            "  [--generate-dkim] [--dkim-selector SELECTOR]\n"
            "  [--inactive] [--json]"
        )
    try:
        payload = schemas.DomainCreate(
            name=positionals[0],
            max_users=int(str(options.get("max-users", "0"))),
            dmarc_policy=str(options.get("dmarc-policy", "reject")),
            generate_dkim=bool(options.get("generate-dkim")),
            dkim_selector=str(options.get("dkim-selector", "default")),
            is_active=not bool(options.get("inactive")),
        )
    except (TypeError, ValueError, ValidationError) as exc:
        raise BackendCommandError(str(exc)) from exc
    with _db_session("Domain creation") as db:
        existing = db.query(models.Domain).filter(models.Domain.name == payload.name).first()
        if existing:
            raise BackendCommandError("Domain already exists")
        domain = models.Domain(
            name=payload.name,
            is_active=payload.is_active,
            max_users=payload.max_users,
            dmarc_policy=payload.dmarc_policy,
            dkim_selector=payload.dkim_selector,
        )
        if payload.generate_dkim:
            from .utils import generate_dkim_key

            dkim = generate_dkim_key(payload.name, payload.dkim_selector)
            domain.dkim_private_path = dkim["path"]
            domain.dkim_public_key = dkim.get("dns")
        db.add(domain)
        try:
            db.commit()
        except Exception:
            db.rollback()
            remove_managed_dkim_key(domain.dkim_private_path)
            raise
        db.refresh(domain)
        sync_dkim_signing_maps(db.query(models.Domain).all())
        payload_out = _serialize_domain(domain)
    if options.get("json"):
        return _render_json(payload_out)
    return f"Created domain: {payload_out['name']}\n"


def _add_account(args: tuple[str, ...]) -> str:
    positionals, options = _split_positionals_and_options(args)
    if len(positionals) < 2:
        raise BackendCommandError(
            "Usage: limristem-mail add account <email> <password...>\n"
            "  [--quota-mb N] [--inactive] [--json]"
        )
    raw_email = positionals[0].strip().lower()
    if "@" not in raw_email:
        raise BackendCommandError("Account email must include a domain, for example postmaster@example.com")
    local_part, domain_name = raw_email.split("@", 1)
    password = " ".join(positionals[1:]).strip()
    try:
        payload = schemas.AccountCreate(
            domain=domain_name,
            local_part=local_part,
            password=password,
            quota_mb=int(str(options.get("quota-mb", "2048"))),
            is_active=not bool(options.get("inactive")),
        )
    except (TypeError, ValueError, ValidationError) as exc:
        raise BackendCommandError(str(exc)) from exc
    with _db_session("Account creation") as db:
        domain = db.query(models.Domain).filter(models.Domain.name == payload.domain).with_for_update().first()
        if not domain:
            raise BackendCommandError("Domain not found")
        if not domain.is_active:
            raise BackendCommandError("Domain is inactive")
        active_accounts = db.query(models.Account).filter(
            models.Account.domain_id == domain.id,
            models.Account.is_active.is_(True),
        ).count()
        if payload.is_active and domain.max_users and active_accounts >= domain.max_users:
            raise BackendCommandError("Domain account limit reached")
        existing = db.query(models.Account).filter(
            models.Account.domain_id == domain.id,
            models.Account.local_part == payload.local_part,
        ).first()
        if existing:
            raise BackendCommandError("Account already exists")
        account = models.Account(
            domain_id=domain.id,
            local_part=payload.local_part,
            username=f"{payload.local_part}@{domain.name}",
            password_hash=hash_password(payload.password),
            quota_mb=payload.quota_mb,
            is_active=payload.is_active,
        )
        db.add(account)
        db.commit()
        db.refresh(account)
        payload_out = _serialize_account(account)
    if options.get("json"):
        return _render_json(payload_out)
    return f"Created account: {payload_out['email']}\n"


def _service_status_payload() -> list[dict[str, str]]:
    if not shutil.which("systemctl"):
        return [
            {"name": name, "unit": candidates[0], "status": "unknown", "enabled": "unknown"}
            for name, candidates in MONITORED_SERVICES.items()
        ]
    payload: list[dict[str, str]] = []
    for name, candidates in MONITORED_SERVICES.items():
        unit_name = candidates[0]
        status = "unknown"
        enabled = "unknown"
        for candidate in candidates:
            active_result = subprocess.run(
                ["systemctl", "is-active", candidate],
                capture_output=True,
                text=True,
                check=False,
            )
            observed = active_result.stdout.strip() or "unknown"
            if active_result.returncode == 0 or observed not in {"unknown", ""}:
                unit_name = candidate
                status = observed
                enabled_result = subprocess.run(
                    ["systemctl", "is-enabled", candidate],
                    capture_output=True,
                    text=True,
                    check=False,
                )
                enabled_output = (enabled_result.stdout or enabled_result.stderr or "").strip()
                enabled = enabled_output or ("enabled" if enabled_result.returncode == 0 else "unknown")
                break
        payload.append({"name": name, "unit": unit_name, "status": status, "enabled": enabled})
    return payload


def _show_services(as_json: bool) -> str:
    payload = _service_status_payload()
    if as_json:
        return _render_json(payload)
    return "\n".join(f"{item['name']}: {item['status']} ({item['unit']}, enabled={item['enabled']})" for item in payload) + "\n"


def _start_uvicorn(target: str, args: tuple[str, ...]) -> str:
    import uvicorn

    _, options = _split_positionals_and_options(args)
    if not settings.enable_api:
        raise BackendCommandError("API is disabled. Set LIMRISTEM_MAIL_ENABLE_API=yes before starting it.")
    if target == "panel" and not settings.enable_web_panel:
        raise BackendCommandError("Web panel is disabled. Set LIMRISTEM_MAIL_ENABLE_WEB_PANEL=yes before starting it.")
    host = str(options.get("host", settings.api_bind))
    port = int(str(options.get("port", settings.api_port)))
    uvicorn.run("api.app:app", host=host, port=port)
    return ""


def _normalize_yes_no(value: str, *, default: str = "yes") -> str:
    return "yes" if str(value or default).strip().lower() == "yes" else "no"


def _slugify(value: str, default: str) -> str:
    slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")
    return slug or default


def _load_json_file(path: Path, *, default):
    try:
        return json.loads(path.read_text(encoding="utf-8") or json.dumps(default))
    except Exception:
        return default


def _write_json_file(path: Path, payload) -> None:
    _ensure_parent(path)
    path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
    path.chmod(0o660)


def _refresh_runtime_settings() -> None:
    env = _load_env_file(_resolve_main_env_file())
    settings.hostname = (env.get("LIMRISTEM_MAIL_HOSTNAME") or settings.hostname).strip().rstrip(".").lower()
    settings.server_name = env.get("LIMRISTEM_MAIL_SERVER_NAME", "Limristem eMail").strip() or "Limristem eMail"
    settings.panel_title = env.get("LIMRISTEM_MAIL_PANEL_TITLE", f"{settings.server_name} Admin Panel").strip() or f"{settings.server_name} Admin Panel"
    settings.panel_favicon_path = env.get("LIMRISTEM_MAIL_PANEL_FAVICON_PATH", "").strip()
    settings.enable_auto_updates = _normalize_yes_no(env.get("LIMRISTEM_MAIL_ENABLE_AUTO_UPDATES", "no"), default="no") == "yes"


def _normalize_version_parts(value: str) -> tuple[int, ...]:
    text = str(value or "").strip()
    if not text:
        return (0,)
    parts: list[int] = []
    for token in re.findall(r"\d+", text):
        parts.append(int(token))
    return tuple(parts or [0])


def _is_version_newer(candidate: str, current: str) -> bool:
    candidate_parts = list(_normalize_version_parts(candidate))
    current_parts = list(_normalize_version_parts(current))
    width = max(len(candidate_parts), len(current_parts))
    candidate_parts.extend([0] * (width - len(candidate_parts)))
    current_parts.extend([0] * (width - len(current_parts)))
    return tuple(candidate_parts) > tuple(current_parts)


def _load_update_state_payload() -> dict[str, object]:
    return _load_json_file(_settings_state_paths()["update_state"], default={})


def _write_update_state_payload(payload: dict[str, object]) -> None:
    _write_json_file(_settings_state_paths()["update_state"], payload)


def _current_app_version() -> str:
    env = _load_env_file(_resolve_main_env_file())
    return (env.get("LIMRISTEM_MAIL_VERSION") or settings.app_version or "1.0.2").strip() or "1.0.2"


def _fetch_remote_version_payload() -> dict[str, object]:
    parsed_endpoint = urllib_parse.urlparse(UPDATE_CHECK_URL)
    if parsed_endpoint.scheme != "https" or not parsed_endpoint.hostname or parsed_endpoint.username or parsed_endpoint.password:
        raise BackendCommandError("Unable to check updates: update endpoint must use HTTPS")
    try:
        with NO_REDIRECT_OPENER.open(UPDATE_CHECK_URL, timeout=30) as response:
            final_url = response.geturl() if hasattr(response, "geturl") else UPDATE_CHECK_URL
            final_endpoint = urllib_parse.urlparse(final_url)
            if (
                final_endpoint.scheme != "https"
                or not final_endpoint.hostname
                or final_endpoint.username
                or final_endpoint.password
            ):
                raise BackendCommandError("Unable to check updates: endpoint redirected to an insecure URL")
            body = response.read(UPDATE_RESPONSE_MAX_BYTES + 1)
    except BackendCommandError:
        raise
    except (urllib_error.URLError, urllib_error.HTTPError, TimeoutError, json.JSONDecodeError) as exc:
        raise BackendCommandError(f"Unable to check updates: {exc}") from exc
    if len(body) > UPDATE_RESPONSE_MAX_BYTES:
        raise BackendCommandError("Unable to check updates: response payload too large")
    try:
        payload = json.loads(body.decode("utf-8"))
    except json.JSONDecodeError as exc:
        raise BackendCommandError(f"Unable to check updates: {exc}") from exc
    if isinstance(payload, list):
        candidates = [item for item in payload if isinstance(item, dict)]
    elif isinstance(payload, dict):
        versions = payload.get("versions")
        if isinstance(versions, list):
            candidates = [item for item in versions if isinstance(item, dict)]
        else:
            candidates = [payload]
    else:
        raise BackendCommandError("Unable to check updates: invalid JSON payload")
    versions = [str(item.get("version", "")).strip() for item in candidates if str(item.get("version", "")).strip()]
    if not versions:
        raise BackendCommandError("Unable to check updates: missing version field")
    selected = max(candidates, key=lambda item: _normalize_version_parts(str(item.get("version", "")).strip()))
    return selected


def _apply_server_name(server_name: str) -> None:
    postfix_main_cf = Path("/etc/postfix/main.cf")
    if postfix_main_cf.exists() and _is_writable(postfix_main_cf):
        lines = postfix_main_cf.read_text(encoding="utf-8").splitlines()
        rendered = f"smtpd_banner = $myhostname ESMTP {server_name}"
        updated = False
        result: list[str] = []
        for line in lines:
            if line.startswith("smtpd_banner ="):
                result.append(rendered)
                updated = True
            else:
                result.append(line)
        if not updated:
            result.insert(0, rendered)
        postfix_main_cf.write_text("\n".join(result).rstrip("\n") + "\n", encoding="utf-8")
        _run_external_best_effort(["systemctl", "reload", "postfix"])
    dovecot_conf = Path("/etc/dovecot/dovecot.conf")
    if dovecot_conf.exists() and _is_writable(dovecot_conf):
        lines = dovecot_conf.read_text(encoding="utf-8").splitlines()
        rendered = f"login_greeting = {server_name} ready."
        updated = False
        result = []
        for line in lines:
            if line.startswith("login_greeting ="):
                result.append(rendered)
                updated = True
            else:
                result.append(line)
        if not updated:
            result.append(rendered)
        dovecot_conf.write_text("\n".join(result).rstrip("\n") + "\n", encoding="utf-8")
        _run_external_best_effort(["systemctl", "restart", "dovecot"])


def _settings_show(as_json: bool) -> str:
    env = _load_env_file(_resolve_main_env_file())
    current_version = _current_app_version()
    server_name = env.get("LIMRISTEM_MAIL_SERVER_NAME", "Limristem eMail").strip() or "Limristem eMail"
    panel_title = env.get("LIMRISTEM_MAIL_PANEL_TITLE", f"{server_name} Admin Panel").strip() or f"{server_name} Admin Panel"
    favicon_path = env.get("LIMRISTEM_MAIL_PANEL_FAVICON_PATH", "").strip()
    payload = {
        "hostname": (env.get("LIMRISTEM_MAIL_HOSTNAME") or settings.hostname).strip().rstrip(".").lower(),
        "server_name": server_name,
        "panel_title": panel_title,
        "panel_favicon_path": favicon_path,
        "panel_favicon_url": "/panel/favicon" if favicon_path else "",
        "current_version": current_version,
        "auto_updates_enabled": _normalize_yes_no(env.get("LIMRISTEM_MAIL_ENABLE_AUTO_UPDATES", "no"), default="no"),
        "update_status": _load_update_state_payload(),
    }
    if as_json:
        return json.dumps(payload)
    lines = [
        f"hostname={payload['hostname']}",
        f"server_name={payload['server_name']}",
        f"panel_title={payload['panel_title']}",
        f"panel_favicon_path={payload['panel_favicon_path']}",
        f"current_version={payload['current_version']}",
        f"auto_updates_enabled={payload['auto_updates_enabled']}",
    ]
    latest_version = str(payload["update_status"].get("latest_version", "")).strip() if isinstance(payload["update_status"], dict) else ""
    if latest_version:
        lines.append(f"latest_version={latest_version}")
    return "\n".join(lines) + "\n"


def _sync_auto_update_units() -> None:
    systemd_dir = Path("/etc/systemd/system")
    service_name = f"{AUTO_UPDATE_TIMER_NAME}.service"
    timer_name = f"{AUTO_UPDATE_TIMER_NAME}.timer"
    if not _is_writable(systemd_dir / ".limristem-mail-auto-update-probe"):
        return
    env = _load_env_file(_resolve_main_env_file())
    enabled = _normalize_yes_no(env.get("LIMRISTEM_MAIL_ENABLE_AUTO_UPDATES", "no"), default="no")
    service_path = systemd_dir / service_name
    timer_path = systemd_dir / timer_name
    if enabled != "yes":
        _run_external_best_effort(["systemctl", "disable", "--now", timer_name])
        try:
            service_path.unlink(missing_ok=True)
            timer_path.unlink(missing_ok=True)
        except Exception:
            pass
        _run_external_best_effort(["systemctl", "daemon-reload"])
        return
    cli_path = settings.base_dir / "limristem-mail"
    env_file = _resolve_main_env_file()
    service_path.write_text(
        "\n".join(
            [
                "[Unit]",
                "Description=Limristem eMail automatic update check",
                "After=network-online.target",
                "Wants=network-online.target",
                "",
                "[Service]",
                "Type=oneshot",
                f"EnvironmentFile={env_file}",
                "Environment=LIMRISTEM_MAIL_UPDATE_WORKER=yes",
                f"ExecStart={cli_path} settings check-update --apply-if-enabled --json",
                "User=root",
                "Group=root",
                "",
            ]
        ),
        encoding="utf-8",
    )
    timer_path.write_text(
        "\n".join(
            [
                "[Unit]",
                "Description=Run Limristem eMail automatic update checks every 24 hours",
                "",
                "[Timer]",
                "OnBootSec=10m",
                "OnUnitActiveSec=24h",
                "Persistent=true",
                "",
                "[Install]",
                "WantedBy=timers.target",
                "",
            ]
        ),
        encoding="utf-8",
    )
    _run_external_best_effort(["systemctl", "daemon-reload"])
    _run_external_best_effort(["systemctl", "enable", "--now", timer_name])


def _settings_set_pairs(pairs: Iterable[tuple[str, str]]) -> str:
    env_file = _resolve_main_env_file()
    normalized: dict[str, str] = {}
    current = _load_env_file(env_file)
    for key, value in pairs:
        env_key = SETTINGS_ENV_MAP.get(key)
        if not env_key:
            raise BackendCommandError(f"Unknown settings key: {key}")
        cleaned_value = str(value).strip()
        if key == "auto-updates-enabled":
            cleaned_value = _normalize_yes_no(cleaned_value, default=current.get(env_key, SETTINGS_DEFAULT_MAP[key]))
        elif key in {"server-name", "panel-title"}:
            if not cleaned_value:
                cleaned_value = SETTINGS_DEFAULT_MAP[key]
            maximum = 128 if key == "server-name" else 160
            if len(cleaned_value) > maximum or any(
                ord(character) < 32 or ord(character) == 127 for character in cleaned_value
            ):
                raise BackendCommandError(f"Invalid value for {key}")
        normalized[env_key] = cleaned_value
    for env_key, value in normalized.items():
        _upsert_env_value(env_file, env_key, value)
    if "LIMRISTEM_MAIL_SERVER_NAME" in normalized:
        _apply_server_name(normalized["LIMRISTEM_MAIL_SERVER_NAME"])
    _sync_auto_update_units()
    _refresh_runtime_settings()
    return _settings_show(as_json=True)


def _settings_upload_favicon(filename: str, content_type: str, data_b64: str, as_json: bool) -> str:
    try:
        file_bytes = base64.b64decode(data_b64, validate=True)
    except Exception as exc:
        raise BackendCommandError("Invalid favicon payload") from exc
    if not file_bytes:
        raise BackendCommandError("Favicon upload is empty")
    if len(file_bytes) > PANEL_FAVICON_MAX_BYTES:
        raise BackendCommandError("Favicon upload exceeds the 512 KiB limit")
    suffix = (Path(filename or "favicon.ico").suffix or mimetypes.guess_extension(content_type or "") or ".ico").lower()
    signatures = FAVICON_SIGNATURES.get(suffix)
    if not signatures:
        raise BackendCommandError("Unsupported favicon file type")
    if suffix == ".webp":
        valid_signature = file_bytes.startswith(b"RIFF") and len(file_bytes) >= 12 and file_bytes[8:12] == b"WEBP"
    else:
        valid_signature = any(file_bytes.startswith(signature) for signature in signatures)
    if not valid_signature:
        raise BackendCommandError("Favicon content does not match the selected file type")
    paths = _settings_state_paths()
    favicon_dir = paths["favicon_dir"]
    favicon_dir.mkdir(parents=True, exist_ok=True)
    target = favicon_dir / f"panel-favicon{suffix}"
    target.write_bytes(file_bytes)
    target.chmod(0o640)
    _upsert_env_value(_resolve_main_env_file(), "LIMRISTEM_MAIL_PANEL_FAVICON_PATH", str(target))
    _refresh_runtime_settings()
    return _settings_show(as_json=as_json)


def _download_update_resource(url: str, *, max_bytes: int, label: str) -> bytes:
    parsed = urllib_parse.urlparse(url)
    if parsed.scheme != "https" or not parsed.hostname or parsed.username or parsed.password:
        raise BackendCommandError(f"Unable to apply update: invalid {label} URL")
    try:
        with NO_REDIRECT_OPENER.open(url, timeout=30) as response:
            final_url = response.geturl() if hasattr(response, "geturl") else url
            final_parsed = urllib_parse.urlparse(final_url)
            if (
                final_parsed.scheme != "https"
                or not final_parsed.hostname
                or final_parsed.username
                or final_parsed.password
            ):
                raise BackendCommandError(f"Unable to apply update: {label} redirected to an insecure URL")
            body = response.read(max_bytes + 1)
    except BackendCommandError:
        raise
    except (urllib_error.URLError, urllib_error.HTTPError, TimeoutError) as exc:
        raise BackendCommandError(f"Unable to apply update: {label} download failed: {exc}") from exc
    if len(body) > max_bytes:
        raise BackendCommandError(f"Unable to apply update: {label} is too large")
    return body


def _safe_extract_update_archive(archive_bytes: bytes, destination: Path) -> None:
    try:
        with tarfile.open(fileobj=io.BytesIO(archive_bytes), mode="r:gz") as archive:
            total_size = 0
            members = archive.getmembers()
            if len(members) > UPDATE_ARCHIVE_MAX_MEMBERS:
                raise BackendCommandError("Unable to apply update: archive contains too many entries")
            seen_paths: set[tuple[str, ...]] = set()
            for member in members:
                member_path = PurePosixPath(member.name)
                member_parts = tuple(part for part in member_path.parts if part not in {"", "."})
                if member_path.is_absolute() or not member_parts or ".." in member_parts:
                    raise BackendCommandError("Unable to apply update: archive contains unsafe paths")
                if member_parts in seen_paths:
                    raise BackendCommandError("Unable to apply update: archive contains duplicate paths")
                seen_paths.add(member_parts)
                if member.issym() or member.islnk() or member.isdev():
                    raise BackendCommandError("Unable to apply update: archive contains links or device files")
                if not member.isdir() and not member.isfile():
                    raise BackendCommandError("Unable to apply update: archive contains unsupported file types")
                total_size += max(member.size, 0)
                if total_size > UPDATE_EXTRACTED_MAX_BYTES:
                    raise BackendCommandError("Unable to apply update: extracted package is too large")
            archive.extractall(destination, members=members, filter="data")
    except (tarfile.TarError, OSError) as exc:
        raise BackendCommandError(f"Unable to apply update: invalid package archive: {exc}") from exc


def _run_install_update(remote: dict[str, object]) -> None:
    version = str(remote.get("version", "")).strip()
    download_url = str(remote.get("download_url", "")).strip()
    checksum_url = str(remote.get("sha256_url", "")).strip()
    if not version or not download_url or not checksum_url:
        raise BackendCommandError("Unable to apply update: version metadata is missing download or checksum information")

    archive_bytes = _download_update_resource(
        download_url,
        max_bytes=UPDATE_ARCHIVE_MAX_BYTES,
        label="package",
    )
    try:
        checksum_text = _download_update_resource(
            checksum_url,
            max_bytes=UPDATE_CHECKSUM_MAX_BYTES,
            label="checksum",
        ).decode("ascii", errors="strict")
    except UnicodeDecodeError as exc:
        raise BackendCommandError("Unable to apply update: invalid SHA-256 checksum file") from exc
    checksum_match = re.search(r"\b([A-Fa-f0-9]{64})\b", checksum_text)
    if not checksum_match:
        raise BackendCommandError("Unable to apply update: invalid SHA-256 checksum file")
    actual_checksum = hashlib.sha256(archive_bytes).hexdigest()
    if not secrets.compare_digest(actual_checksum, checksum_match.group(1).lower()):
        raise BackendCommandError("Unable to apply update: SHA-256 checksum mismatch")

    with tempfile.TemporaryDirectory(prefix="limristem-mail-update-") as temporary_dir:
        staging_dir = Path(temporary_dir)
        _safe_extract_update_archive(archive_bytes, staging_dir)
        install_script = staging_dir / "install.sh"
        version_file = staging_dir / "version.json"
        if not install_script.is_file() or not version_file.is_file():
            raise BackendCommandError("Unable to apply update: package is missing install.sh or version.json")
        package_versions = _load_json_file(version_file, default=[])
        if isinstance(package_versions, list) and package_versions:
            package_metadata = package_versions[0]
        else:
            package_metadata = package_versions
        package_version = str(package_metadata.get("version", "")).strip() if isinstance(package_metadata, dict) else ""
        if package_version != version:
            raise BackendCommandError("Unable to apply update: package version does not match version metadata")
        install_script.chmod(0o755)
        _run_external(
            [
                str(install_script),
                "-u",
                "--install-path",
                str(settings.base_dir),
                "--non-interactive",
            ]
        )


def _settings_check_update(*, as_json: bool, apply_if_enabled: bool = False, force_apply: bool = False) -> str:
    remote = _fetch_remote_version_payload()
    current_version = _current_app_version()
    latest_version = str(remote.get("version", "")).strip()
    update_available = _is_version_newer(latest_version, current_version)
    settings_payload = json.loads(_settings_show(as_json=True))
    auto_enabled = settings_payload.get("auto_updates_enabled", "no") == "yes"
    applied = False
    error_message = ""
    if update_available and (force_apply or (apply_if_enabled and auto_enabled)):
        try:
            _run_install_update(remote)
            applied = True
        except BackendCommandError as exc:
            error_message = exc.detail
    payload = {
        "checked_at": int(time()),
        "endpoint": UPDATE_CHECK_URL,
        "current_version": current_version,
        "latest_version": latest_version,
        "update_available": update_available,
        "auto_updates_enabled": auto_enabled,
        "applied": applied,
        "error": error_message,
    }
    _write_update_state_payload(payload)
    if as_json:
        return json.dumps(payload)
    return "\n".join(f"{key}={value}" for key, value in payload.items()) + "\n"


def _settings_apply_update(as_json: bool) -> str:
    return _settings_check_update(as_json=as_json, force_apply=True)


def _pending_key_expired(path: Path) -> bool:
    try:
        return time() - path.stat().st_mtime > PENDING_PRIVATE_KEY_TTL_SECONDS
    except FileNotFoundError:
        return False


def _cleanup_expired_pending_keys(paths: dict[str, Path] | None = None) -> None:
    paths = paths or _backup_state_paths()
    pending_dir = paths["pending_keys_dir"]
    if not pending_dir.exists():
        return
    for item in pending_dir.glob("*.pem"):
        if _pending_key_expired(item):
            try:
                item.unlink(missing_ok=True)
            except Exception:
                continue


def _obscure_rclone_password(secret: str) -> str:
    if not secret:
        return ""
    if shutil.which("rclone"):
        return _run_external(["rclone", "obscure", secret]).stdout.strip()
    return secret


def _write_rclone_profile(config_file: Path, storage: dict[str, str]) -> None:
    config_file.parent.mkdir(parents=True, exist_ok=True)
    remote_name = storage.get("remote_name") or storage.get("name") or "limristem-mail-backup"
    storage_type = storage.get("type", "local")
    lines = [f"[{remote_name}]"]
    if storage_type == "s3":
        lines.extend(
            [
                "type = s3",
                "provider = AWS",
                "env_auth = false",
                f"access_key_id = {storage.get('access_key_id', '')}",
                f"secret_access_key = {storage.get('secret_access_key', '')}",
            ]
        )
        if storage.get("region"):
            lines.append(f"region = {storage['region']}")
        if storage.get("endpoint"):
            lines.append(f"endpoint = {storage['endpoint']}")
    elif storage_type in {"sftp", "ftp", "ftps"}:
        lines.extend(
            [
                f"type = {'ftp' if storage_type in {'ftp', 'ftps'} else 'sftp'}",
                f"host = {storage.get('host', '')}",
                f"user = {storage.get('user', '')}",
            ]
        )
        if storage.get("port"):
            lines.append(f"port = {storage['port']}")
        if storage.get("password"):
            lines.append(f"pass = {storage.get('password_obscured', storage['password'])}")
        if storage_type == "ftps":
            lines.append("tls = true")
    else:
        config_file.write_text("", encoding="utf-8")
        return
    config_file.write_text("\n".join(lines) + "\n", encoding="utf-8")


def _storage_remote_target(storage: dict[str, str]) -> str:
    storage_type = storage.get("type", "local")
    remote_name = storage.get("remote_name") or storage.get("name") or "limristem-mail-backup"
    path = (storage.get("path") or "").lstrip("/")
    if storage_type == "s3":
        target = f"{remote_name}:{storage.get('bucket', '')}".rstrip(":")
        return f"{target}/{path}" if path else target
    if storage_type in {"sftp", "ftp", "ftps"}:
        remote_path = storage.get("path") or "/"
        return f"{remote_name}:{remote_path}"
    return ""


def _backup_state_defaults() -> tuple[list[dict], list[dict]]:
    try:
        merged = _load_merged_env(_resolve_main_env_file(), _resolve_backup_env_file())
    except (OSError, RuntimeError, ValueError):
        merged = {}
    storages: list[dict] = []
    schedules: list[dict] = []
    storage_type = merged.get("LIMRISTEM_MAIL_BACKUP_STORAGE_TYPE", "local")
    legacy_storage = {
        "id": "legacy-default",
        "name": "Default storage",
        "type": storage_type,
        "remote_name": merged.get("LIMRISTEM_MAIL_BACKUP_STORAGE_REMOTE_NAME", "limristem-mail-backup"),
        "path": merged.get("LIMRISTEM_MAIL_BACKUP_STORAGE_PATH", ""),
        "host": merged.get("LIMRISTEM_MAIL_BACKUP_STORAGE_HOST", ""),
        "port": merged.get("LIMRISTEM_MAIL_BACKUP_STORAGE_PORT", ""),
        "user": merged.get("LIMRISTEM_MAIL_BACKUP_STORAGE_USER", ""),
        "password": merged.get("LIMRISTEM_MAIL_BACKUP_STORAGE_PASSWORD", ""),
        "bucket": merged.get("LIMRISTEM_MAIL_BACKUP_STORAGE_BUCKET", ""),
        "region": merged.get("LIMRISTEM_MAIL_BACKUP_STORAGE_REGION", ""),
        "endpoint": merged.get("LIMRISTEM_MAIL_BACKUP_STORAGE_ENDPOINT", ""),
        "access_key_id": merged.get("LIMRISTEM_MAIL_BACKUP_STORAGE_ACCESS_KEY_ID", ""),
        "secret_access_key": merged.get("LIMRISTEM_MAIL_BACKUP_STORAGE_SECRET_ACCESS_KEY", ""),
        "encrypt": "no",
        "public_key_path": "",
    }
    has_remote = storage_type != "local" or any(
        legacy_storage[key] for key in ("host", "bucket", "access_key_id", "user", "path")
    )
    if has_remote:
        storages = [legacy_storage]
    schedules = [
        {
            "id": _slugify("default-backup", "default-backup"),
            "name": "Default backup schedule",
            "enabled": "yes",
            "oncalendar": merged.get("LIMRISTEM_MAIL_BACKUP_ONCALENDAR", "*-*-* 03:15:00") or "*-*-* 03:15:00",
            "backup_type": merged.get("LIMRISTEM_MAIL_BACKUP_TYPE", "auto") or "auto",
            "full_weekday": merged.get("LIMRISTEM_MAIL_BACKUP_FULL_WEEKDAY", "Sun") or "Sun",
            "compression": merged.get("LIMRISTEM_MAIL_BACKUP_COMPRESSION", "gzip") or "gzip",
            "include_database": "no" if merged.get("LIMRISTEM_MAIL_BACKUP_DB_MODE", "logical") == "none" else "yes",
            "include_redis": merged.get("LIMRISTEM_MAIL_BACKUP_INCLUDE_REDIS", "yes") or "yes",
            "storage_id": storages[0]["id"] if storages else "",
        }
    ]
    return storages, schedules


def _ensure_backup_state(*, create: bool) -> dict[str, Path]:
    paths = _backup_state_paths()
    if not create:
        _cleanup_expired_pending_keys(paths)
        return paths
    state_dir = paths["state_dir"]
    state_dir.mkdir(parents=True, exist_ok=True)
    paths["keys_dir"].mkdir(parents=True, exist_ok=True)
    paths["pending_keys_dir"].mkdir(parents=True, exist_ok=True)
    paths["keys_dir"].chmod(0o700)
    paths["pending_keys_dir"].chmod(0o700)
    storages, schedules = _backup_state_defaults()
    if not paths["storages"].exists():
        _write_json_file(paths["storages"], storages)
    if not paths["schedules"].exists():
        _write_json_file(paths["schedules"], schedules)
    _cleanup_expired_pending_keys(paths)
    return paths


def _mask_storage_records(records: list[dict]) -> list[dict]:
    masked: list[dict] = []
    for item in records:
        record = dict(item)
        for field, flag in (("password", "has_password"), ("secret_access_key", "has_secret_access_key")):
            raw = bool(record.get(field))
            record[flag] = raw
            record[field] = ""
        masked.append(record)
    return masked


def _load_backup_config_payload() -> dict[str, str]:
    try:
        merged = _load_merged_env(_resolve_main_env_file(), _resolve_backup_env_file())
    except (OSError, RuntimeError, ValueError):
        merged = {}
    payload: dict[str, str] = {}
    for key, env_key in BACKUP_ENV_MAP.items():
        payload[key] = merged.get(env_key, BACKUP_DEFAULT_MAP[key])
    return payload


def _list_backup_runs_payload() -> list[dict]:
    config = _load_backup_config_payload()
    backup_dir = Path(config.get("backup-local-dir") or "/var/backups/limristem-mail")
    try:
        if not backup_dir.is_dir():
            return []
        paths = sorted([item for item in backup_dir.iterdir() if item.is_dir()], key=lambda item: item.name, reverse=True)
    except OSError:
        return []
    runs = []
    for path in paths:
        try:
            stat = path.stat()
        except OSError:
            continue
        metadata_path = path / "metadata.env"
        try:
            metadata = str(metadata_path) if metadata_path.exists() else None
        except OSError:
            metadata = None
        runs.append(
            {
                "name": path.name,
                "path": str(path),
                "mtime": int(stat.st_mtime),
                "metadata": metadata,
            }
        )
    return runs


def _load_backup_storages_raw() -> list[dict]:
    paths = _ensure_backup_state(create=False)
    if paths["storages"].exists():
        return _load_json_file(paths["storages"], default=[])
    storages, _ = _backup_state_defaults()
    return storages


def _load_backup_schedules_raw() -> list[dict]:
    paths = _ensure_backup_state(create=False)
    if paths["schedules"].exists():
        return _load_json_file(paths["schedules"], default=[])
    _, schedules = _backup_state_defaults()
    return schedules


def _write_storage_record(record: dict) -> None:
    paths = _ensure_backup_state(create=True)
    records = _load_json_file(paths["storages"], default=[])
    updated = []
    replaced = False
    for item in records:
        if item.get("id") == record.get("id"):
            updated.append(record)
            replaced = True
        else:
            updated.append(item)
    if not replaced:
        updated.append(record)
    updated.sort(key=lambda item: (item.get("name") or item.get("id") or "").lower())
    _write_json_file(paths["storages"], updated)


def _write_schedule_record(record: dict) -> None:
    paths = _ensure_backup_state(create=True)
    records = _load_json_file(paths["schedules"], default=[])
    updated = []
    replaced = False
    for item in records:
        if item.get("id") == record.get("id"):
            updated.append(record)
            replaced = True
        else:
            updated.append(item)
    if not replaced:
        updated.append(record)
    updated.sort(key=lambda item: (item.get("name") or item.get("id") or "").lower())
    _write_json_file(paths["schedules"], updated)


def _sync_legacy_storage_state() -> None:
    storages = _load_backup_storages_raw()
    record = next((item for item in storages if item.get("id") == "legacy-default"), None)
    if not record:
        return
    file_path = _resolve_backup_env_file()
    mapping = {
        "LIMRISTEM_MAIL_BACKUP_STORAGE_TYPE": record.get("type", "local"),
        "LIMRISTEM_MAIL_BACKUP_STORAGE_REMOTE_NAME": record.get("remote_name", ""),
        "LIMRISTEM_MAIL_BACKUP_STORAGE_PATH": record.get("path", ""),
        "LIMRISTEM_MAIL_BACKUP_STORAGE_HOST": record.get("host", ""),
        "LIMRISTEM_MAIL_BACKUP_STORAGE_PORT": record.get("port", ""),
        "LIMRISTEM_MAIL_BACKUP_STORAGE_USER": record.get("user", ""),
        "LIMRISTEM_MAIL_BACKUP_STORAGE_PASSWORD": record.get("password", ""),
        "LIMRISTEM_MAIL_BACKUP_STORAGE_BUCKET": record.get("bucket", ""),
        "LIMRISTEM_MAIL_BACKUP_STORAGE_REGION": record.get("region", ""),
        "LIMRISTEM_MAIL_BACKUP_STORAGE_ENDPOINT": record.get("endpoint", ""),
        "LIMRISTEM_MAIL_BACKUP_STORAGE_ACCESS_KEY_ID": record.get("access_key_id", ""),
        "LIMRISTEM_MAIL_BACKUP_STORAGE_SECRET_ACCESS_KEY": record.get("secret_access_key", ""),
    }
    for key, value in mapping.items():
        _upsert_env_value(file_path, key, value)


def _sync_legacy_schedule_state() -> None:
    schedules = _load_backup_schedules_raw()
    schedule = next((item for item in schedules if item.get("id") == "default-backup"), None)
    if not schedule and schedules:
        schedule = schedules[0]
    if not schedule:
        return
    file_path = _resolve_backup_env_file()
    _upsert_env_value(file_path, "LIMRISTEM_MAIL_BACKUP_ONCALENDAR", schedule.get("oncalendar", "*-*-* 03:15:00"))
    _upsert_env_value(file_path, "LIMRISTEM_MAIL_BACKUP_TYPE", schedule.get("backup_type", "auto"))
    _upsert_env_value(file_path, "LIMRISTEM_MAIL_BACKUP_FULL_WEEKDAY", schedule.get("full_weekday", "Sun"))
    _upsert_env_value(file_path, "LIMRISTEM_MAIL_BACKUP_COMPRESSION", schedule.get("compression", "gzip"))
    _upsert_env_value(file_path, "LIMRISTEM_MAIL_BACKUP_INCLUDE_REDIS", schedule.get("include_redis", "yes"))
    _upsert_env_value(
        file_path,
        "LIMRISTEM_MAIL_BACKUP_DB_MODE",
        "logical" if schedule.get("include_database", "yes") == "yes" else "none",
    )


def _sync_storage_profile() -> None:
    config = _load_backup_config_payload()
    config_file = _resolve_rclone_config_file()
    storage_type = config.get("backup-storage-type", "local")
    if storage_type in {"", "local"}:
        _upsert_env_value(_resolve_backup_env_file(), BACKUP_ENV_MAP["backup-remote-targets"], "")
        try:
            config_file.unlink(missing_ok=True)
        except Exception:
            pass
        return
    storage = {
        "type": storage_type,
        "remote_name": config.get("backup-storage-remote-name", "limristem-mail-backup"),
        "path": config.get("backup-storage-path", ""),
        "host": config.get("backup-storage-host", ""),
        "port": config.get("backup-storage-port", ""),
        "user": config.get("backup-storage-user", ""),
        "password": config.get("backup-storage-password", ""),
        "bucket": config.get("backup-storage-bucket", ""),
        "region": config.get("backup-storage-region", ""),
        "endpoint": config.get("backup-storage-endpoint", ""),
        "access_key_id": config.get("backup-storage-access-key-id", ""),
        "secret_access_key": config.get("backup-storage-secret-access-key", ""),
    }
    if storage_type in {"sftp", "ftp", "ftps"} and storage.get("password"):
        storage["password_obscured"] = _obscure_rclone_password(storage["password"])
    try:
        _write_rclone_profile(config_file, storage)
    except Exception:
        return
    _upsert_env_value(_resolve_backup_env_file(), BACKUP_ENV_MAP["backup-remote-targets"], _storage_remote_target(storage))


def _schedule_unit_name(schedule_id: str) -> str:
    return "limristem-mail-backup-schedule-" + re.sub(r"[^A-Za-z0-9_.-]+", "-", schedule_id).strip("-")


def _sync_schedule_units() -> None:
    systemd_dir = Path("/etc/systemd/system")
    if not _is_writable(systemd_dir / ".limristem-mail-backup-schedule-probe"):
        return
    try:
        schedules = _load_backup_schedules_raw()
        config = _load_backup_config_payload()
        global_enabled = config.get("backup-enabled", "yes")
        for timer_file in systemd_dir.glob("limristem-mail-backup-schedule-*.timer"):
            _run_external_best_effort(["systemctl", "disable", "--now", timer_file.name])
            try:
                timer_file.unlink(missing_ok=True)
                service_path = timer_file.with_suffix(".service")
                service_path.unlink(missing_ok=True)
            except Exception:
                pass
        _run_external_best_effort(["systemctl", "disable", "--now", "limristem-mail-backup.timer"])
        base_dir = os.getenv("LIMRISTEM_MAIL_BASE_DIR", str(settings.base_dir))
        env_file = _resolve_main_env_file()
        backup_env = _resolve_backup_env_file()
        for item in schedules:
            schedule_id = item.get("id", "")
            if not schedule_id:
                continue
            unit_name = _schedule_unit_name(schedule_id)
            unit_file = systemd_dir / f"{unit_name}.service"
            timer_file = systemd_dir / f"{unit_name}.timer"
            unit_file.write_text(
                "\n".join(
                    [
                        "[Unit]",
                        f"Description=Limristem eMail backup job ({item.get('name', schedule_id)})",
                        "After=network-online.target mariadb.service redis-server.service",
                        "Wants=network-online.target",
                        "",
                        "[Service]",
                        "Type=oneshot",
                        f"EnvironmentFile={env_file}",
                        f"EnvironmentFile={backup_env}",
                        f"ExecStart={_scripts_dir()}/backup.sh --schedule-id {schedule_id}",
                        "User=root",
                        "Group=root",
                        "",
                    ]
                ),
                encoding="utf-8",
            )
            oncalendar_lines = [line.strip() for line in item.get("oncalendar", "*-*-* 03:15:00").splitlines() if line.strip()]
            timer_file.write_text(
                "\n".join(
                    [
                        "[Unit]",
                        f"Description=Run Limristem eMail backup schedule ({item.get('name', schedule_id)})",
                        "",
                        "[Timer]",
                        *[f"OnCalendar={line}" for line in oncalendar_lines],
                        "Persistent=true",
                        "",
                        "[Install]",
                        "WantedBy=timers.target",
                        "",
                    ]
                ),
                encoding="utf-8",
            )
        _run_external_best_effort(["systemctl", "daemon-reload"])
        for item in schedules:
            schedule_id = item.get("id", "")
            if not schedule_id:
                continue
            unit_name = _schedule_unit_name(schedule_id)
            if global_enabled == "yes" and item.get("enabled", "yes") == "yes":
                _run_external_best_effort(["systemctl", "enable", "--now", f"{unit_name}.timer"])
            else:
                _run_external_best_effort(["systemctl", "disable", "--now", f"{unit_name}.timer"])
    except Exception:
        return


def _normalize_storage_payload(payload: dict) -> dict:
    existing = {item.get("id"): item for item in _load_backup_storages_raw() if item.get("id")}
    source = existing.get(str(payload.get("id", "")).strip(), {})
    name = str(payload.get("name", "")).strip() or str(source.get("name", "")).strip() or "Backup storage"
    storage_type = str(payload.get("type", source.get("type", "local"))).strip().lower() or "local"
    if storage_type not in {"local", "s3", "sftp", "ftp", "ftps"}:
        raise BackendCommandError("Invalid storage type")
    storage_id = str(payload.get("id", "")).strip() or _slugify(name, "backup-storage")
    if not SAFE_RESOURCE_ID_RE.fullmatch(storage_id):
        raise BackendCommandError("Invalid storage ID")
    remote_name = str(payload.get("remote_name", "")).strip() or str(source.get("remote_name", "")).strip() or storage_id
    if not SAFE_REMOTE_NAME_RE.fullmatch(remote_name):
        raise BackendCommandError("Invalid storage remote name")
    record = {
        "id": storage_id,
        "name": name,
        "type": storage_type,
        "remote_name": remote_name,
        "path": str(payload.get("path", source.get("path", ""))).strip(),
        "host": str(payload.get("host", source.get("host", ""))).strip(),
        "port": str(payload.get("port", source.get("port", ""))).strip(),
        "user": str(payload.get("user", source.get("user", ""))).strip(),
        "password": str(payload.get("password", source.get("password", ""))),
        "bucket": str(payload.get("bucket", source.get("bucket", ""))).strip(),
        "region": str(payload.get("region", source.get("region", ""))).strip(),
        "endpoint": str(payload.get("endpoint", source.get("endpoint", ""))).strip(),
        "access_key_id": str(payload.get("access_key_id", source.get("access_key_id", ""))).strip(),
        "secret_access_key": str(payload.get("secret_access_key", source.get("secret_access_key", ""))),
        "encrypt": "yes" if str(payload.get("encrypt", source.get("encrypt", "no"))).strip().lower() == "yes" else "no",
        "public_key_path": str(source.get("public_key_path", "")),
    }
    for field in (
        "name",
        "path",
        "host",
        "user",
        "password",
        "bucket",
        "region",
        "endpoint",
        "access_key_id",
        "secret_access_key",
    ):
        if any(character in record[field] for character in ("\x00", "\r", "\n")):
            raise BackendCommandError(f"Invalid storage {field}")
    if record["port"]:
        try:
            port = int(record["port"])
        except ValueError as exc:
            raise BackendCommandError("Invalid storage port") from exc
        if port < 1 or port > 65535:
            raise BackendCommandError("Invalid storage port")
    if record["public_key_path"]:
        keys_dir = _backup_state_paths()["keys_dir"].resolve(strict=False)
        public_key_path = Path(record["public_key_path"]).resolve(strict=False)
        try:
            public_key_path.relative_to(keys_dir)
        except ValueError as exc:
            raise BackendCommandError("Invalid storage public key path") from exc
    required = {
        "s3": ("bucket", "access_key_id", "secret_access_key"),
        "sftp": ("host", "user", "password"),
        "ftp": ("host", "user", "password"),
        "ftps": ("host", "user", "password"),
    }.get(storage_type, ())
    for field in required:
        if not str(record.get(field, "")).strip():
            raise BackendCommandError(f"Missing value for storage {field}")
    return record


def _generate_storage_keypair(storage_id: str) -> tuple[str, str]:
    paths = _ensure_backup_state(create=True)
    _cleanup_expired_pending_keys(paths)
    public_key = paths["keys_dir"] / f"{storage_id}.pub.pem"
    token = f"{storage_id}-{secrets.token_hex(12)}.pem"
    private_key = paths["pending_keys_dir"] / token
    if not shutil.which("openssl"):
        raise BackendCommandError("openssl is required to generate encrypted backup keys")
    _run_external(["openssl", "genpkey", "-algorithm", "RSA", "-out", str(private_key), "-pkeyopt", "rsa_keygen_bits:2048"])
    _run_external(["openssl", "rsa", "-pubout", "-in", str(private_key), "-out", str(public_key)])
    private_key.chmod(0o600)
    public_key.chmod(0o600)
    return token, str(public_key)


def _normalize_schedule_payload(payload: dict) -> dict:
    schedules = _load_backup_schedules_raw()
    storages = _load_backup_storages_raw()
    storage_ids = {item.get("id") for item in storages if item.get("id")}
    existing = {item.get("id"): item for item in schedules if item.get("id")}
    source = existing.get(str(payload.get("id", "")).strip(), {})
    name = str(payload.get("name", "")).strip() or str(source.get("name", "")).strip() or "Backup schedule"
    schedule_id = str(payload.get("id", "")).strip() or _slugify(name, "backup-schedule")
    if not SAFE_RESOURCE_ID_RE.fullmatch(schedule_id):
        raise BackendCommandError("Invalid schedule ID")
    if any(character in name for character in ("\x00", "\r", "\n")):
        raise BackendCommandError("Invalid schedule name")
    enabled = _normalize_yes_no(_strip_matching_quotes(str(payload.get("enabled", source.get("enabled", "yes")))))
    raw_oncalendar = str(payload.get("oncalendar", source.get("oncalendar", "*-*-* 03:15:00"))).replace("\r", "")
    lines = [line.strip() for line in raw_oncalendar.splitlines() if line.strip()]
    if not lines:
        raise BackendCommandError("Missing value for schedule")
    backup_type = _strip_matching_quotes(str(payload.get("backup_type", source.get("backup_type", "auto")))).lower()
    if backup_type not in {"auto", "full", "incremental"}:
        raise BackendCommandError("Invalid schedule backup type")
    if backup_type == "auto":
        full_weekday = _strip_matching_quotes(str(payload.get("full_weekday", source.get("full_weekday", "Sun")))).title()
        if full_weekday not in {"Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"}:
            raise BackendCommandError("Invalid schedule weekday")
    else:
        full_weekday = "Sun"
    compression = _strip_matching_quotes(str(payload.get("compression", source.get("compression", "gzip")))).lower()
    if compression not in {"gzip", "none"}:
        raise BackendCommandError("Invalid schedule compression")
    include_database = _normalize_yes_no(_strip_matching_quotes(str(payload.get("include_database", source.get("include_database", "yes")))))
    include_redis = _normalize_yes_no(_strip_matching_quotes(str(payload.get("include_redis", source.get("include_redis", "yes")))))
    storage_id = str(payload.get("storage_id", source.get("storage_id", ""))).strip()
    if storage_id and storage_id not in storage_ids:
        raise BackendCommandError("Unknown storage selected")
    return {
        "id": schedule_id,
        "name": name,
        "enabled": enabled,
        "oncalendar": "\n".join(lines),
        "backup_type": backup_type,
        "full_weekday": full_weekday,
        "compression": compression,
        "include_database": include_database,
        "include_redis": include_redis,
        "storage_id": storage_id,
    }


def _normalize_firewall_rules_json(raw: str) -> str:
    items = json.loads(raw.strip() or "[]")
    if not isinstance(items, list):
        raise BackendCommandError("Firewall rules must be a JSON array")
    normalized = []
    for item in items:
        if not isinstance(item, dict):
            raise BackendCommandError("Each firewall rule must be an object")
        ports = str(item.get("ports", "")).strip()
        protocol = str(item.get("protocol", "tcp")).strip().lower()
        source = str(item.get("source", "")).strip()
        enabled = _normalize_yes_no(str(item.get("enabled", "yes")))
        if not ports and not protocol and not source:
            continue
        if protocol not in {"tcp", "udp"}:
            raise BackendCommandError(f"Invalid protocol: {protocol}")
        tokens = [token for token in re.split(r"[\s,]+", ports) if token]
        if not tokens:
            raise BackendCommandError("Firewall rule ports are required")
        rendered_tokens = []
        seen = set()
        for token in tokens:
            if "-" in token:
                start_text, end_text = token.split("-", 1)
                if not start_text.isdigit() or not end_text.isdigit():
                    raise BackendCommandError(f"Invalid port range: {token}")
                start = int(start_text)
                end = int(end_text)
                if start < 1 or end > 65535 or start > end:
                    raise BackendCommandError(f"Invalid port range: {token}")
                rendered = f"{start}-{end}"
            else:
                if not token.isdigit():
                    raise BackendCommandError(f"Invalid port: {token}")
                port = int(token)
                if port < 1 or port > 65535:
                    raise BackendCommandError(f"Invalid port: {token}")
                rendered = str(port)
            if rendered in seen:
                continue
            seen.add(rendered)
            rendered_tokens.append(rendered)
        if source and source not in {"*", "any", "all"}:
            source = str(ip_network(source, strict=False))
        else:
            source = ""
        normalized.append({"ports": ", ".join(rendered_tokens), "protocol": protocol, "source": source, "enabled": enabled})
    return json.dumps(normalized, separators=(",", ":"))


def _firewall_rules_payload(merged_env: dict[str, str]) -> list[dict]:
    rules_value = merged_env.get("LIMRISTEM_MAIL_FIREWALL_RULES", "")
    if rules_value:
        return json.loads(_normalize_firewall_rules_json(rules_value))
    items = []
    tcp_ports = ", ".join(
        token
        for token in re.split(
            r"[\s,]+",
            merged_env.get("LIMRISTEM_MAIL_FIREWALL_ALLOWED_TCP_PORTS", FIREWALL_DEFAULT_MAP["firewall-allowed-tcp-ports"]).strip(),
        )
        if token
    )
    udp_ports = ", ".join(
        token
        for token in re.split(
            r"[\s,]+",
            merged_env.get("LIMRISTEM_MAIL_FIREWALL_ALLOWED_UDP_PORTS", FIREWALL_DEFAULT_MAP["firewall-allowed-udp-ports"]).strip(),
        )
        if token
    )
    if tcp_ports:
        items.append({"ports": tcp_ports, "protocol": "tcp", "source": "", "enabled": "yes"})
    if udp_ports:
        items.append({"ports": udp_ports, "protocol": "udp", "source": "", "enabled": "yes"})
    return items


def _legacy_firewall_port_payload(rules: list[dict]) -> dict[str, str]:
    legacy = {"tcp": [], "udp": []}
    for item in rules:
        if item.get("enabled") != "yes" or item.get("source"):
            continue
        protocol = item.get("protocol")
        if protocol not in legacy:
            continue
        ports = ", ".join(token for token in re.split(r"[\s,]+", str(item.get("ports", "")).strip()) if token)
        if ports:
            legacy[protocol].append(ports)
    return {
        "firewall-allowed-tcp-ports": ", ".join(legacy["tcp"]).strip(),
        "firewall-allowed-udp-ports": ", ".join(legacy["udp"]).strip(),
    }


def _normalize_legacy_firewall_ports(value: str) -> str:
    tokens = [token for token in re.split(r"[\s,]+", value.strip()) if token]
    normalized: list[str] = []
    seen: set[str] = set()
    for token in tokens:
        if "-" in token:
            start_text, end_text = token.split("-", 1)
            if not start_text.isdigit() or not end_text.isdigit():
                raise BackendCommandError(f"Invalid port range: {token}")
            start = int(start_text)
            end = int(end_text)
            if start < 1 or end > 65535 or start > end:
                raise BackendCommandError(f"Invalid port range: {token}")
            rendered = f"{start}-{end}"
        else:
            if not token.isdigit():
                raise BackendCommandError(f"Invalid port: {token}")
            port = int(token)
            if port < 1 or port > 65535:
                raise BackendCommandError(f"Invalid port: {token}")
            rendered = str(port)
        if rendered in seen:
            continue
        seen.add(rendered)
        normalized.append(rendered)
    return ", ".join(normalized)


def _update_legacy_firewall_rule_json(protocol: str, ports: str, rules_json: str) -> str:
    rules = json.loads(rules_json)
    updated = False
    result = []
    for item in rules:
        if item.get("protocol") == protocol and not item.get("source") and item.get("enabled", "yes") == "yes" and not updated:
            if ports:
                updated_item = dict(item)
                updated_item["ports"] = ports
                result.append(updated_item)
            updated = True
            continue
        result.append(item)
    if not updated and ports:
        result.append({"ports": ports, "protocol": protocol, "source": "", "enabled": "yes"})
    return json.dumps(result, separators=(",", ":"))


def _apply_firewall() -> None:
    script_path = _script_path("manage-firewall.sh")
    if not script_path.is_file():
        raise BackendCommandError("Firewall management script is not installed")
    _run_external([str(script_path), "apply"])


def _detect_rspamd_user() -> str:
    for name in ("_rspamd", "rspamd"):
        result = subprocess.run(["id", name], capture_output=True, text=True, check=False)
        if result.returncode == 0:
            return name
    return ""


def _write_rspamd_limits(env: dict[str, str]) -> None:
    rspamd_dir = Path("/etc/rspamd/local.d")
    rspamd_dir.mkdir(parents=True, exist_ok=True)
    actions_conf = rspamd_dir / "actions.conf"
    greylist_conf = rspamd_dir / "greylist.conf"
    actions_conf.write_text(
        "\n".join(
            [
                f"reject = {env.get('LIMRISTEM_MAIL_RSPAMD_ACTION_REJECT', '15')};",
                f"add_header = {env.get('LIMRISTEM_MAIL_RSPAMD_ACTION_ADD_HEADER', '6')};",
                f"greylist = {env.get('LIMRISTEM_MAIL_RSPAMD_ACTION_GREYLIST', '4')};",
                "",
            ]
        ),
        encoding="utf-8",
    )
    greylist_conf.write_text(
        "\n".join(
            [
                f"enabled = {env.get('LIMRISTEM_MAIL_RSPAMD_GREYLIST_ENABLED', 'true')};",
                f"timeout = {env.get('LIMRISTEM_MAIL_RSPAMD_GREYLIST_DELAY', '5m')};",
                f"expire = {env.get('LIMRISTEM_MAIL_RSPAMD_GREYLIST_EXPIRE', '35d')};",
                'key_prefix = "grey";',
                "",
            ]
        ),
        encoding="utf-8",
    )
    service_user = _detect_rspamd_user()
    if service_user:
        _run_external_best_effort(["chown", f"root:{service_user}", str(actions_conf), str(greylist_conf)])
        _run_external_best_effort(["chmod", "640", str(actions_conf), str(greylist_conf)])


def _apply_limits() -> None:
    env = _load_env_file(_resolve_main_env_file())
    if shutil.which("postconf"):
        _run_external_best_effort(["postconf", "-e", f"anvil_rate_time_unit = {env.get('LIMRISTEM_MAIL_POSTFIX_RATE_TIME_UNIT', '60s')}"])
        _run_external_best_effort(["postconf", "-e", f"smtpd_client_connection_rate_limit = {env.get('LIMRISTEM_MAIL_POSTFIX_CLIENT_CONNECTION_RATE_LIMIT', '30')}"])
        _run_external_best_effort(["postconf", "-e", f"smtpd_client_message_rate_limit = {env.get('LIMRISTEM_MAIL_POSTFIX_CLIENT_MESSAGE_RATE_LIMIT', '100')}"])
        _run_external_best_effort(["systemctl", "reload", "postfix"])
        _run_external_best_effort(["systemctl", "restart", "postfix"])
    if env.get("LIMRISTEM_MAIL_ENABLE_RSPAMD", "yes") == "yes":
        try:
            _write_rspamd_limits(env)
        except Exception:
            pass
        _run_external_best_effort(["systemctl", "restart", "rspamd"])
    _run_external_best_effort(["systemctl", "restart", "--no-block", "limristem-mail"])


def _queue_list_payload() -> list[dict]:
    if not shutil.which("postqueue"):
        return []
    result = subprocess.run(["postqueue", "-j"], capture_output=True, text=True, check=False)
    if result.returncode != 0:
        return []
    payload = []
    for line in result.stdout.splitlines():
        line = line.strip()
        if not line:
            continue
        try:
            payload.append(json.loads(line))
        except json.JSONDecodeError:
            continue
    return payload


def _queue_action(action: str, queue_id: str) -> None:
    flag = {"delete": "-d", "hold": "-h", "release": "-H", "requeue": "-r"}.get(action)
    if not flag:
        raise BackendCommandError("Unsupported queue action")
    _run_external(["postsuper", flag, queue_id])


def _list_fail2ban_jails() -> list[str]:
    if not shutil.which("fail2ban-client"):
        return []
    ping = subprocess.run(["fail2ban-client", "ping"], capture_output=True, text=True, check=False)
    if ping.returncode != 0:
        return []
    result = subprocess.run(["fail2ban-client", "status"], capture_output=True, text=True, check=False)
    if result.returncode != 0:
        return []
    lines = [line for line in result.stdout.splitlines() if "Jail list:" in line]
    if not lines:
        return []
    return [item.strip() for item in lines[-1].split(":", 1)[1].split(",") if item.strip()]


def _jail_banned_ips(jail: str) -> list[str]:
    result = subprocess.run(["fail2ban-client", "status", jail], capture_output=True, text=True, check=False)
    if result.returncode != 0:
        return []
    lines = [line for line in result.stdout.splitlines() if "Banned IP list:" in line]
    if not lines:
        return []
    return [item.strip() for item in lines[-1].split(":", 1)[1].split() if item.strip()]


def _ensure_manual_ban_file() -> Path:
    path = _manual_ban_file()
    path.parent.mkdir(parents=True, exist_ok=True)
    path.touch(exist_ok=True)
    return path


def _load_manual_bans() -> dict[str, dict[str, str]]:
    path = _manual_ban_file()
    if not path.exists():
        return {}
    items: dict[str, dict[str, str]] = {}
    with path.open(newline="", encoding="utf-8") as handle:
        reader = csv.reader(handle, delimiter="\t")
        for row in reader:
            if len(row) < 2:
                continue
            items[row[1]] = {"created_at": row[0], "reason": row[2] if len(row) > 2 else "manual"}
    return items


def _write_manual_bans(items: dict[str, dict[str, str]]) -> None:
    path = _ensure_manual_ban_file()
    with path.open("w", newline="", encoding="utf-8") as handle:
        writer = csv.writer(handle, delimiter="\t")
        for ip_value, metadata in sorted(items.items()):
            writer.writerow([metadata.get("created_at", ""), ip_value, metadata.get("reason", "manual")])


def _list_bans_payload() -> list[dict]:
    manual = _load_manual_bans()
    items_by_ip: dict[str, dict] = {}
    for jail in _list_fail2ban_jails():
        for ip_value in _jail_banned_ips(jail):
            entry = items_by_ip.setdefault(
                ip_value,
                {
                    "created_at": manual.get(ip_value, {}).get("created_at", ""),
                    "ip": ip_value,
                    "reason": manual.get(ip_value, {}).get("reason", "fail2ban"),
                    "source": "manual, fail2ban" if ip_value in manual else "fail2ban",
                    "jail": "",
                    "services": [],
                },
            )
            if jail and jail not in entry["services"]:
                entry["services"].append(jail)
    for ip_value, metadata in manual.items():
        items_by_ip.setdefault(
            ip_value,
            {
                "created_at": metadata.get("created_at", ""),
                "ip": ip_value,
                "reason": metadata.get("reason", "manual"),
                "source": "manual",
                "jail": "manual",
                "services": [],
            },
        )
    items = list(items_by_ip.values())
    for item in items:
        services = sorted(item.get("services", []))
        if item.get("source") == "manual, fail2ban" and not services:
            item["source"] = "manual"
        item["services"] = ", ".join(services) if services else "manual"
        item["jail"] = item["services"]
    items.sort(key=lambda item: (item.get("created_at", ""), item.get("ip", "")), reverse=True)
    return items


def _ban_ip(ip_value: str, reason: str) -> None:
    ip_address(ip_value)
    jails = _list_fail2ban_jails()
    if not jails:
        raise BackendCommandError("Fail2ban is unavailable.")
    applied = 0
    for jail in jails:
        result = subprocess.run(["fail2ban-client", "set", jail, "banip", ip_value], capture_output=True, text=True, check=False)
        if result.returncode == 0:
            applied += 1
    if applied == 0:
        raise BackendCommandError("Unable to apply the ban to any active fail2ban jail.")
    manual = _load_manual_bans()
    manual[ip_value] = {"created_at": subprocess.run(["date", "-u", "+%Y-%m-%dT%H:%M:%SZ"], capture_output=True, text=True, check=False).stdout.strip(), "reason": reason.replace("\t", " ") or "manual"}
    _write_manual_bans(manual)


def _unban_ip(ip_value: str) -> None:
    ip_address(ip_value)
    manual = _load_manual_bans()
    manual.pop(ip_value, None)
    _write_manual_bans(manual)
    for jail in _list_fail2ban_jails():
        if ip_value in _jail_banned_ips(jail):
            subprocess.run(["fail2ban-client", "set", jail, "unbanip", ip_value], capture_output=True, text=True, check=False)


def _backup_show(as_json: bool) -> str:
    payload = _load_backup_config_payload()
    if as_json:
        return json.dumps(payload)
    return "\n".join(f"{key}={payload[key]}" for key in sorted(payload)) + "\n"


def _backup_list(as_json: bool) -> str:
    payload = _list_backup_runs_payload()
    if as_json:
        return json.dumps(payload)
    return "\n".join(f"{item['name']} {item['path']}" for item in payload) + ("\n" if payload else "")


def _backup_list_schedules() -> str:
    return json.dumps(_load_backup_schedules_raw())


def _backup_list_storages() -> str:
    return json.dumps(_mask_storage_records(_load_backup_storages_raw()))


def _backup_save_schedule(payload_json: str, as_json: bool) -> str:
    payload = json.loads(payload_json or "{}")
    normalized = _normalize_schedule_payload(payload)
    _write_schedule_record(normalized)
    _sync_legacy_schedule_state()
    _sync_schedule_units()
    return json.dumps(normalized) if as_json else ""


def _backup_delete_schedule(schedule_id: str) -> str:
    if not SAFE_RESOURCE_ID_RE.fullmatch(schedule_id):
        raise BackendCommandError("Invalid schedule ID")
    paths = _ensure_backup_state(create=True)
    items = [item for item in _load_json_file(paths["schedules"], default=[]) if item.get("id") != schedule_id]
    _write_json_file(paths["schedules"], items)
    _sync_legacy_schedule_state()
    _sync_schedule_units()
    return ""


def _backup_save_storage(payload_json: str, as_json: bool) -> str:
    payload = json.loads(payload_json or "{}")
    normalized = _normalize_storage_payload(payload)
    token = ""
    if normalized.get("encrypt") == "yes" and not normalized.get("public_key_path"):
        token, public_key_path = _generate_storage_keypair(normalized["id"])
        normalized["public_key_path"] = public_key_path
    elif normalized.get("encrypt") != "yes":
        paths = _ensure_backup_state(create=True)
        try:
            (paths["keys_dir"] / f"{normalized['id']}.pub.pem").unlink(missing_ok=True)
        except Exception:
            pass
        normalized["public_key_path"] = ""
    _write_storage_record(normalized)
    _sync_legacy_storage_state()
    _sync_storage_profile()
    if not as_json:
        return ""
    record = dict(normalized)
    original = dict(normalized)
    for field in ("password", "secret_access_key"):
        record[field] = ""
        record[f"has_{field}"] = bool(original.get(field))
    return json.dumps({"storage": record, "private_key_token": token})


def _backup_delete_storage(storage_id: str) -> str:
    if not SAFE_RESOURCE_ID_RE.fullmatch(storage_id):
        raise BackendCommandError("Invalid storage ID")
    paths = _ensure_backup_state(create=True)
    storages = [item for item in _load_json_file(paths["storages"], default=[]) if item.get("id") != storage_id]
    schedules = _load_json_file(paths["schedules"], default=[])
    for schedule in schedules:
        if schedule.get("storage_id") == storage_id:
            schedule["storage_id"] = ""
    _write_json_file(paths["storages"], storages)
    _write_json_file(paths["schedules"], schedules)
    try:
        (paths["keys_dir"] / f"{storage_id}.pub.pem").unlink(missing_ok=True)
    except Exception:
        pass
    _sync_legacy_storage_state()
    _sync_legacy_schedule_state()
    _sync_storage_profile()
    return ""


def _backup_consume_private_key(token: str) -> str:
    if not SAFE_PRIVATE_KEY_TOKEN_RE.fullmatch(token):
        raise BackendCommandError("Invalid private key token")
    paths = _ensure_backup_state(create=True)
    _cleanup_expired_pending_keys(paths)
    pending_dir = paths["pending_keys_dir"].resolve(strict=False)
    key_file = (pending_dir / token).resolve(strict=False)
    try:
        key_file.relative_to(pending_dir)
    except ValueError as exc:
        raise BackendCommandError("Invalid private key token") from exc
    if not key_file.is_file():
        raise BackendCommandError("Private key not found")
    if _pending_key_expired(key_file):
        key_file.unlink(missing_ok=True)
        raise BackendCommandError("Private key expired (download window exceeded 30 minutes)")
    data = key_file.read_text(encoding="utf-8")
    key_file.unlink(missing_ok=True)
    return data


def _backup_test_storage(storage_id: str, as_json: bool) -> str:
    storage = next((item for item in _load_backup_storages_raw() if item.get("id") == storage_id), None)
    if not storage:
        raise BackendCommandError("Storage not found")
    storage_type = storage.get("type", "local")
    ok = True
    detail = "Connection successful."
    if storage_type == "local":
        target_path = Path(storage.get("path") or "/var/backups/limristem-mail")
        target_path.mkdir(parents=True, exist_ok=True)
        if not _is_writable(target_path / ".limristem-mail-backup-test"):
            ok = False
            detail = "Local storage path is not writable."
    else:
        if not shutil.which("rclone"):
            ok = False
            detail = "rclone is not installed."
        else:
            file_descriptor, tmp_name = tempfile.mkstemp(prefix="limristem-mail-rclone-", suffix=".conf")
            os.close(file_descriptor)
            tmp_config = Path(tmp_name)
            try:
                record = dict(storage)
                if storage_type in {"sftp", "ftp", "ftps"} and record.get("password"):
                    record["password_obscured"] = _obscure_rclone_password(record["password"])
                _write_rclone_profile(tmp_config, record)
                target = _storage_remote_target(record)
                result = subprocess.run(["rclone", "lsf", "--config", str(tmp_config), target, "--max-depth", "1"], capture_output=True, text=True, check=False)
                if result.returncode != 0:
                    ok = False
                    detail = "Remote storage test failed."
            finally:
                try:
                    tmp_config.unlink(missing_ok=True)
                except Exception:
                    pass
    payload = {"storage_id": storage_id, "ok": ok, "detail": detail}
    if as_json:
        return json.dumps(payload)
    if not ok:
        raise BackendCommandError(detail)
    return detail + "\n"


def _backup_set_pairs(pairs: Iterable[tuple[str, str]]) -> str:
    main_env_file = _resolve_main_env_file()
    backup_env_file = _resolve_backup_env_file()
    for key, value in pairs:
        env_key = BACKUP_ENV_MAP.get(key)
        if not env_key:
            raise BackendCommandError(f"Unknown backup key: {key}")
        target_file = main_env_file if key == "backup-enabled" else backup_env_file
        _upsert_env_value(target_file, env_key, value)
    _sync_storage_profile()
    _sync_legacy_storage_state()
    _sync_legacy_schedule_state()
    _sync_schedule_units()
    return ""


def _backup_run(schedule_id: str | None) -> str:
    command = ["bash", str(_scripts_dir() / "backup.sh")]
    if schedule_id:
        command.extend(["--schedule-id", schedule_id])
    _run_external(command)
    return ""


def _firewall_show(as_json: bool) -> str:
    env = _load_env_file(_resolve_main_env_file())
    enabled = env.get("LIMRISTEM_MAIL_FIREWALL_ENABLED", FIREWALL_DEFAULT_MAP["firewall-enabled"])
    rules = _firewall_rules_payload(env)
    if as_json:
        payload = {
            "firewall-enabled": enabled,
            **_legacy_firewall_port_payload(rules),
            "firewall-rules": rules,
        }
        return json.dumps(payload)
    return f"firewall-enabled={enabled}\nfirewall-rules-json={json.dumps(rules, separators=(',', ':'))}\n"


def _firewall_set_pairs(pairs: Iterable[tuple[str, str]]) -> str:
    env_file = _resolve_main_env_file()
    env = _load_env_file(env_file)
    rules_json = json.dumps(_firewall_rules_payload(env), separators=(",", ":"))
    rules_updated = False
    for key, value in pairs:
        env_key = FIREWALL_ENV_MAP.get(key)
        if not env_key:
            raise BackendCommandError(f"Unknown firewall key: {key}")
        if key == "firewall-enabled":
            if value not in {"yes", "no"}:
                raise BackendCommandError("firewall-enabled must be yes or no")
            _upsert_env_value(env_file, env_key, value)
            continue
        if key == "firewall-rules-json":
            rules_json = _normalize_firewall_rules_json(value)
            rules_updated = True
            continue
        if key in {"firewall-allowed-tcp-ports", "firewall-allowed-udp-ports"}:
            protocol = "udp" if key == "firewall-allowed-udp-ports" else "tcp"
            normalized_ports = _normalize_legacy_firewall_ports(value) if value else ""
            rules_json = _update_legacy_firewall_rule_json(protocol, normalized_ports, rules_json)
            rules_updated = True
            continue
        _upsert_env_value(env_file, env_key, value)
    if rules_updated:
        _upsert_env_value(env_file, FIREWALL_ENV_MAP["firewall-rules-json"], rules_json)
        legacy_ports = _legacy_firewall_port_payload(json.loads(rules_json))
        _upsert_env_value(env_file, FIREWALL_ENV_MAP["firewall-allowed-tcp-ports"], legacy_ports["firewall-allowed-tcp-ports"])
        _upsert_env_value(env_file, FIREWALL_ENV_MAP["firewall-allowed-udp-ports"], legacy_ports["firewall-allowed-udp-ports"])
    _apply_firewall()
    return ""


def _limits_show(as_json: bool) -> str:
    env = _load_env_file(_resolve_main_env_file())
    payload = {key: env.get(env_key, LIMIT_DEFAULT_MAP[key]) for key, env_key in LIMIT_ENV_MAP.items()}
    if as_json:
        return json.dumps(payload)
    return "\n".join(f"{key}={payload[key]}" for key in sorted(payload)) + "\n"


def _limits_set_pairs(pairs: Iterable[tuple[str, str]]) -> str:
    env_file = _resolve_main_env_file()
    for key, value in pairs:
        env_key = LIMIT_ENV_MAP.get(key)
        if not env_key:
            raise BackendCommandError(f"Unknown limit key: {key}")
        _upsert_env_value(env_file, env_key, value)
    _apply_limits()
    return ""


def _api_show(as_json: bool) -> str:
    env = _load_env_file(_resolve_main_env_file())
    api_user = env.get("LIMRISTEM_MAIL_API_ADMIN_USER", "admin")
    api_bind = env.get("LIMRISTEM_MAIL_API_BIND", "127.0.0.1")
    api_port = env.get("LIMRISTEM_MAIL_API_PORT", "8080")
    hostname = env.get("LIMRISTEM_MAIL_HOSTNAME", subprocess.run(["hostname", "-f"], capture_output=True, text=True, check=False).stdout.strip() or "localhost")
    endpoint = f"https://{hostname}/" if env.get("LIMRISTEM_MAIL_ENABLE_NGINX", "no") == "yes" else f"http://{api_bind}:{api_port}/"
    payload = {"api_admin_user": api_user, "api_endpoint": endpoint, "hostname": hostname}
    if as_json:
        return json.dumps(payload)
    return f"API admin user: {api_user}\nAPI endpoint  : {endpoint}\nHostname      : {hostname}\n"


def _api_regenerate(as_json: bool) -> str:
    env = _load_env_file(_resolve_main_env_file())
    new_pass = secrets.token_urlsafe(18)
    new_hash = API_HASH_CONTEXT.hash(new_pass)
    _upsert_env_value(_resolve_main_env_file(), "LIMRISTEM_MAIL_API_ADMIN_PASS_HASH", new_hash)
    _run_external_best_effort(["systemctl", "restart", "--no-block", "limristem-mail"])
    api_user = env.get("LIMRISTEM_MAIL_API_ADMIN_USER", "admin")
    api_bind = env.get("LIMRISTEM_MAIL_API_BIND", "127.0.0.1")
    api_port = env.get("LIMRISTEM_MAIL_API_PORT", "8080")
    hostname = env.get("LIMRISTEM_MAIL_HOSTNAME", subprocess.run(["hostname", "-f"], capture_output=True, text=True, check=False).stdout.strip() or "localhost")
    endpoint = f"https://{hostname}/" if env.get("LIMRISTEM_MAIL_ENABLE_NGINX", "no") == "yes" else f"http://{api_bind}:{api_port}/"
    payload = {"api_admin_user": api_user, "api_admin_pass": new_pass, "api_endpoint": endpoint, "hostname": hostname}
    if as_json:
        return json.dumps(payload)
    return f"API admin user    : {api_user}\nAPI admin password: {new_pass}\nAPI endpoint      : {endpoint}\n\nConserva questa password: nel file env viene salvato solo l'hash.\n"


def execute_command(command_name: str, *args: str, allow_fallback: bool = True) -> str:
    try:
        if command_name == "add":
            target = args[0] if args else ""
            if target == "domain":
                return _add_domain(args[1:])
            if target == "account":
                return _add_account(args[1:])
            raise BackendCommandError(f"Unsupported add target: {target or '<missing>'}")
        if command_name == "show":
            target = args[0] if args else ""
            if target == "dns":
                positionals, options = _split_positionals_and_options(args[1:])
                if len(positionals) != 1:
                    raise BackendCommandError("Usage: limristem-mail show dns <domain> [--json|--zone]")
                return _show_domain_dns(positionals[0], as_json=bool(options.get("json")), as_zone=bool(options.get("zone")))
            if target == "services":
                _, options = _split_positionals_and_options(args[1:])
                return _show_services(bool(options.get("json")))
            if target == "backup":
                _, options = _split_positionals_and_options(args[1:])
                return _backup_show(bool(options.get("json")))
            if target == "api":
                _, options = _split_positionals_and_options(args[1:])
                return _api_show(bool(options.get("json")))
            if target == "limits":
                _, options = _split_positionals_and_options(args[1:])
                return _limits_show(bool(options.get("json")))
            if target == "firewall":
                _, options = _split_positionals_and_options(args[1:])
                return _firewall_show(bool(options.get("json")))
            if target == "ssl":
                _, options = _split_positionals_and_options(args[1:])
                return _ssl_show(bool(options.get("json")))
            raise BackendCommandError(f"Unsupported show target: {target or '<missing>'}")
        if command_name == "monitor":
            target = args[0] if args else ""
            if target == "services":
                _, options = _split_positionals_and_options(args[1:])
                return _show_services(bool(options.get("json")))
            raise BackendCommandError(f"Unsupported monitor target: {target or '<missing>'}")
        if command_name == "start":
            target = args[0] if args else ""
            if target in {"api", "panel"}:
                return _start_uvicorn(target, args[1:])
            raise BackendCommandError(f"Unsupported start target: {target or '<missing>'}")
        if command_name == "queue":
            action = args[0] if args else ""
            if action == "list":
                return json.dumps(_queue_list_payload()) if len(args) > 1 and args[1] == "--json" else ""
            raise FallbackToScript
        if command_name == "bans":
            action = args[0] if args else ""
            if action == "list":
                return json.dumps(_list_bans_payload()) if len(args) > 1 and args[1] == "--json" else ""
            if action == "ban":
                _ban_ip(args[1], args[2] if len(args) > 2 else "manual")
                return ""
            if action == "unban":
                _unban_ip(args[1])
                return ""
            raise FallbackToScript
        if command_name == "limits":
            action = args[0] if args else ""
            if action == "show":
                return _limits_show(len(args) > 1 and args[1] == "--json")
            if action == "set":
                return _limits_set_pairs([(args[1], args[2])])
            if action == "set-many":
                if (len(args) - 1) % 2 != 0:
                    raise BackendCommandError("set-many requires <key> <value> pairs")
                pairs = list(zip(args[1::2], args[2::2]))
                return _limits_set_pairs(pairs)
            raise FallbackToScript
        if command_name == "firewall":
            action = args[0] if args else ""
            if action == "show":
                return _firewall_show(len(args) > 1 and args[1] == "--json")
            if action == "set":
                return _firewall_set_pairs([(args[1], args[2])])
            if action == "set-many":
                if (len(args) - 1) % 2 != 0:
                    raise BackendCommandError("set-many requires <key> <value> pairs")
                pairs = list(zip(args[1::2], args[2::2]))
                return _firewall_set_pairs(pairs)
            raise FallbackToScript
        if command_name == "api":
            action = args[0] if args else ""
            if action == "show":
                return _api_show(len(args) > 1 and args[1] == "--json")
            if action == "regenerate":
                return _api_regenerate(len(args) > 1 and args[1] == "--json")
            raise FallbackToScript
        if command_name == "settings":
            action = args[0] if args else ""
            if action == "show":
                _, options = _split_positionals_and_options(args[1:])
                return _settings_show(bool(options.get("json")))
            if action == "set":
                return _settings_set_pairs([(args[1], args[2])])
            if action == "set-many":
                if (len(args) - 1) % 2 != 0:
                    raise BackendCommandError("set-many requires <key> <value> pairs")
                return _settings_set_pairs(list(zip(args[1::2], args[2::2])))
            if action == "upload-favicon":
                as_json = len(args) > 4 and args[4] in {"--json", "yes"}
                return _settings_upload_favicon(args[1], args[2], args[3], as_json)
            if action == "check-update":
                _, options = _split_positionals_and_options(args[1:])
                return _settings_check_update(as_json=bool(options.get("json")), apply_if_enabled=bool(options.get("apply-if-enabled")))
            if action == "apply-update":
                _, options = _split_positionals_and_options(args[1:])
                return _settings_apply_update(as_json=bool(options.get("json")))
            raise FallbackToScript
        if command_name == "dns":
            action = args[0] if args else ""
            if action == "configure-cloudflare":
                return _configure_domain_dns(args[1:])
            if action == "disable":
                return _disable_domain_dns(args[1:])
            if action == "sync-dkim":
                return _sync_domain_dkim_dns(args[1:])
            raise BackendCommandError(f"Unsupported dns action: {action or '<missing>'}")
        if command_name == "ssl":
            action = args[0] if args else ""
            if action == "show":
                return _ssl_show(len(args) > 1 and args[1] == "--json")
            raise FallbackToScript
        if command_name == "backup":
            action = args[0] if args else ""
            if action == "show":
                return _backup_show(len(args) > 1 and args[1] == "--json")
            if action == "list":
                return _backup_list(len(args) > 1 and args[1] == "--json")
            if action == "list-schedules":
                return _backup_list_schedules()
            if action == "list-storages":
                return _backup_list_storages()
            if action == "save-schedule":
                return _backup_save_schedule(args[1], len(args) > 2 and args[2] in {"--json", "yes"})
            if action == "delete-schedule":
                return _backup_delete_schedule(args[1])
            if action == "save-storage":
                return _backup_save_storage(args[1], len(args) > 2 and args[2] in {"--json", "yes"})
            if action == "delete-storage":
                return _backup_delete_storage(args[1])
            if action == "test-storage":
                return _backup_test_storage(args[1], len(args) > 2 and args[2] in {"--json", "yes"})
            if action == "consume-private-key":
                return _backup_consume_private_key(args[1])
            if action == "set":
                return _backup_set_pairs([(args[1], args[2])])
            if action == "set-many":
                if (len(args) - 1) % 2 != 0:
                    raise BackendCommandError("set-many requires <key> <value> pairs")
                pairs = list(zip(args[1::2], args[2::2]))
                return _backup_set_pairs(pairs)
            if action == "run":
                return _backup_run(args[1] if len(args) > 1 else None)
            raise FallbackToScript
        raise BackendCommandError(f"Unknown limristem-mail command: {command_name}")
    except FallbackToScript:
        if not allow_fallback:
            raise BackendCommandError(f"Unsupported limristem-mail command: {command_name} {' '.join(args)}".strip())
        return _run_script_fallback(command_name, *args)


USAGE = """Usage: limristem-mail <command> [args...]

Commands:
  add        Create domains and accounts from the unified Python CLI
  show       Display DNS plans, service health, and config summaries
  backup     Manage backup settings, schedules, storages, and runs
  queue      Manage the mail queue
  bans       Manage bans
  limits     Manage rate and security limits
  firewall   Manage firewall settings
  api        Manage API credentials
  dns        Configure DNS providers and publish DKIM records
  settings   Manage panel branding and update settings
  ssl        Manage the shared TLS certificate for mail and web services
  monitor    Monitor Limristem eMail-related services
  start      Run the API or panel server from the unified app

Examples:
  limristem-mail add domain example.com
  limristem-mail add account postmaster@example.com very secure password --quota-mb 4096
  limristem-mail show dns example.com --zone
  limristem-mail dns configure-cloudflare example.com --account-id <account> --zone-id <zone> --api-token <token> --enable-sync
  limristem-mail dns sync-dkim example.com --json
  limristem-mail ssl show --json
  limristem-mail monitor services --json
  limristem-mail start api --host 127.0.0.1 --port 8080
"""


def main(argv: list[str] | None = None) -> int:
    args = list(sys.argv[1:] if argv is None else argv)
    if not args:
        print(USAGE, file=sys.stderr, end="")
        return 1
    command_name, *command_args = args
    aliases = {
        "backups": "backup",
        "ban": "bans",
        "limit": "limits",
        "status": "show",
        "serve": "start",
    }
    command_name = aliases.get(command_name, command_name)
    try:
        output = execute_command(command_name, *command_args)
    except BackendCommandError as exc:
        print(exc.detail, file=sys.stderr)
        return exc.returncode
    if output:
        sys.stdout.write(output)
        if not output.endswith("\n"):
            sys.stdout.write("\n")
    return 0
