import base64
from ipaddress import ip_network
import json
import logging
import mimetypes
from pathlib import Path
import re
import secrets
from time import monotonic
from typing import Callable, Optional
from urllib.parse import urlencode

from fastapi import APIRouter, BackgroundTasks, Depends, File, Form, HTTPException, Query, Request, Response, UploadFile, status
from fastapi.responses import FileResponse, JSONResponse, PlainTextResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel, ValidationError
from sqlalchemy.orm import Session

from .. import models, schemas
from ..admin_ops import (
    load_api_credentials,
    load_backup_config,
    load_backup_runs,
    load_backup_schedules,
    load_backup_storages,
    load_bans,
    load_firewall_config,
    load_limits,
    load_panel_settings,
    load_queue,
    load_ssl_config,
    panel_template_dir,
    apply_update_now,
    check_for_updates,
    regenerate_api_credentials,
    run_root_script,
    save_backup_schedule,
    save_backup_storage,
    test_backup_storage,
)
from ..cache import get_cache
from ..db import get_db
from ..dns_providers import zone_relative_record_name
from ..security import (
    build_panel_login_csrf_token,
    clear_auth_failures,
    client_address,
    delete_panel_session,
    issue_panel_login_csrf,
    issue_panel_session,
    is_rate_limited,
    load_panel_session,
    panel_admin_username,
    pop_panel_flash,
    register_auth_failure,
    require_admin,
    require_panel_csrf,
    require_panel_session,
    set_panel_flash,
    validate_panel_login_csrf,
    verify_panel_secret,
)
from ..settings import get_settings
from .accounts import (
    create_account as api_create_account,
    delete_account as api_delete_account,
    serialize_account,
    update_account as api_update_account,
)
from .aliases import create_alias as api_create_alias, delete_alias as api_delete_alias, update_alias as api_update_alias
from .domains import (
    configure_dns_provider as api_configure_dns_provider,
    create_domain as api_create_domain,
    delete_domain as api_delete_domain,
    get_domain_or_404,
    rotate_dkim as api_rotate_dkim,
    suggested_dns as api_suggested_dns,
    sync_dkim_dns as api_sync_dkim_dns,
    update_domain as api_update_domain,
)
from .redirects import create_redirect as api_create_redirect, delete_redirect as api_delete_redirect, update_redirect as api_update_redirect

router = APIRouter(tags=["admin"])
settings = get_settings()
templates = Jinja2Templates(directory=str(panel_template_dir()))
logger = logging.getLogger(__name__)
DOMAIN_DETAIL_TABS = {"settings", "accounts", "aliases", "redirects", "dns"}
BACKUP_PANEL_TABS = {"settings", "schedules", "storages", "runs"}
SETTINGS_PANEL_TABS = {"general", "ssl", "limits", "api"}
DNS_EXPORTABLE_RECORD_KEYS = (
    "mx",
    "mail",
    "spf",
    "srs_spf",
    "dmarc",
    "dkim",
    "jmap",
    "imap",
    "imaps",
    "pop3",
    "pop3s",
    "submission",
    "submissions",
    "autoconfig",
    "autodiscover",
    "mta_sts_host",
    "mta_sts",
    "tls_rpt",
)

BACKUP_FIELD_ORDER = [
    "backup-enabled",
    "backup-local-dir",
    "backup-retention-days",
]
BACKUP_SCHEDULE_FIELD_ORDER = [
    "name",
    "enabled",
    "cron_minute",
    "cron_hour",
    "cron_day",
    "cron_month",
    "cron_weekday",
    "backup_type",
    "full_weekday",
    "compression",
    "include_database",
    "include_redis",
    "storage_id",
]
BACKUP_STORAGE_FIELD_ORDER = [
    "id",
    "name",
    "type",
    "remote_name",
    "path",
    "host",
    "port",
    "user",
    "password",
    "bucket",
    "region",
    "endpoint",
    "access_key_id",
    "secret_access_key",
    "encrypt",
]
BACKUP_FIELD_LABELS = {
    "backup-enabled": "Timer enabled",
    "backup-local-dir": "Local directory",
    "backup-retention-days": "Retention days",
    "name": "Name",
    "enabled": "Enabled",
    "oncalendar": "Schedule",
    "cron_minute": "Minute",
    "cron_hour": "Hour",
    "cron_day": "Day of month",
    "cron_month": "Month",
    "cron_weekday": "Weekday",
    "backup_type": "Backup type",
    "full_weekday": "Full backup weekday",
    "compression": "Compression",
    "include_database": "Include MariaDB dump",
    "include_redis": "Include Redis",
    "storage_id": "Target storage",
    "type": "Storage type",
    "remote_name": "Remote profile name",
    "path": "Remote path",
    "host": "Host",
    "port": "Port",
    "user": "Username",
    "password": "Password",
    "bucket": "Bucket",
    "region": "Region",
    "endpoint": "Endpoint",
    "access_key_id": "Access key ID",
    "secret_access_key": "Secret access key",
    "encrypt": "Encrypt backups",
}
BACKUP_FIELD_OPTIONS = {
    "backup-enabled": [("yes", "yes"), ("no", "no")],
    "backup_type": [("auto", "auto"), ("full", "full"), ("incremental", "incremental")],
    "backup-type": [("auto", "auto"), ("full", "full"), ("incremental", "incremental")],
    "full_weekday": [
        ("Mon", "Mon"),
        ("Tue", "Tue"),
        ("Wed", "Wed"),
        ("Thu", "Thu"),
        ("Fri", "Fri"),
        ("Sat", "Sat"),
        ("Sun", "Sun"),
    ],
    "backup-full-weekday": [
        ("Mon", "Mon"),
        ("Tue", "Tue"),
        ("Wed", "Wed"),
        ("Thu", "Thu"),
        ("Fri", "Fri"),
        ("Sat", "Sat"),
        ("Sun", "Sun"),
    ],
    "compression": [("gzip", "gzip"), ("none", "none")],
    "backup-compression": [("gzip", "gzip"), ("none", "none")],
    "include_database": [("yes", "yes"), ("no", "no")],
    "include_redis": [("yes", "yes"), ("no", "no")],
    "backup-include-redis": [("yes", "yes"), ("no", "no")],
    "enabled": [("yes", "yes"), ("no", "no")],
    "encrypt": [("yes", "yes"), ("no", "no")],
    "type": [
        ("local", "Local only"),
        ("s3", "S3"),
        ("sftp", "SFTP"),
        ("ftp", "FTP"),
        ("ftps", "FTPS"),
    ],
    "backup-storage-type": [
        ("local", "Local only"),
        ("s3", "S3"),
        ("sftp", "SFTP"),
        ("ftp", "FTP"),
        ("ftps", "FTPS"),
    ],
}
BACKUP_NUMERIC_FIELDS = {"backup-retention-days", "backup-storage-port", "port"}
BACKUP_SECRET_FIELDS = {"backup-db-root-password", "backup-storage-password", "backup-storage-secret-access-key", "password", "secret_access_key"}
OPTIONAL_BACKUP_NUMERIC_FIELDS = {"backup-storage-port", "port"}
BACKUP_LEGACY_KEYS = {
    "oncalendar",
    "backup-type",
    "backup-full-weekday",
    "backup-db-mode",
    "backup-db-root-password",
    "backup-remote-targets",
    "backup-oncalendar",
    "backup-compression",
    "backup-include-redis",
    "backup-storage-type",
    "backup-storage-remote-name",
    "backup-storage-path",
    "backup-storage-host",
    "backup-storage-port",
    "backup-storage-user",
    "backup-storage-password",
    "backup-storage-bucket",
    "backup-storage-region",
    "backup-storage-endpoint",
    "backup-storage-access-key-id",
    "backup-storage-secret-access-key",
}
BACKUP_NOTICE_MESSAGES = {
    "backup-config-updated": "Backup configuration updated.",
    "backup-schedule-updated": "Backup schedule saved.",
    "backup-schedule-deleted": "Backup schedule deleted.",
    "backup-storage-updated": "Backup storage saved.",
    "backup-storage-deleted": "Backup storage deleted.",
    "backup-storage-tested": "Backup storage test completed.",
    "backup-run-started": "Backup started.",
}
LIMIT_NOTICE_MESSAGES = {
    "limit-updated": "Service limits updated.",
}
FIREWALL_FIELD_ORDER = ["firewall-enabled", "firewall-rules-json", "firewall-allowed-tcp-ports", "firewall-allowed-udp-ports"]
FIREWALL_STATUS_OPTIONS = [("yes", "Enabled"), ("no", "Disabled")]
FIREWALL_PROTOCOL_OPTIONS = [("tcp", "TCP"), ("udp", "UDP")]
FIREWALL_NOTICE_MESSAGES = {
    "firewall-updated": "Firewall configuration updated.",
    "ban-created": "IP banned.",
    "ban-deleted": "IP unbanned.",
}
QUEUE_NOTICE_MESSAGES = {
    "queue-hold": "Message placed on hold.",
    "queue-release": "Message released from hold.",
    "queue-requeue": "Message requeued.",
    "queue-delete": "Message deleted from queue.",
}
SSL_NOTICE_MESSAGES = {
    "ssl-selfsigned": "Self-signed certificate generated and applied.",
    "ssl-letsencrypt": "Let's Encrypt certificate requested and applied.",
    "ssl-manual": "Uploaded certificate installed and applied.",
}
MAX_SSL_FILE_SIZE_BYTES = 1024 * 1024
SAFE_PRIVATE_KEY_TOKEN_RE = re.compile(r"^[a-z0-9][a-z0-9_.-]{0,63}-[a-f0-9]{24}\.pem$")
SAFE_QUEUE_ID_RE = re.compile(r"^[A-Za-z0-9]{1,64}$")


def ensure_web_panel_enabled() -> None:
    if not settings.enable_web_panel:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Web panel is disabled")


def validate_queue_id(queue_id: str) -> str:
    if not SAFE_QUEUE_ID_RE.fullmatch(queue_id):
        raise HTTPException(status_code=400, detail="Invalid queue ID")
    return queue_id


def _strip_matching_quotes(value: str) -> str:
    normalized = value.strip()
    if len(normalized) >= 2 and normalized[0] == normalized[-1] and normalized[0] in {'"', "'"}:
        return normalized[1:-1].strip()
    return normalized


def normalize_backup_value(key: str, value: str) -> str:
    if key not in {*BACKUP_FIELD_ORDER, *BACKUP_SCHEDULE_FIELD_ORDER, *BACKUP_STORAGE_FIELD_ORDER, *BACKUP_LEGACY_KEYS}:
        raise HTTPException(status_code=400, detail=f"Unsupported backup setting: {key}")

    if key in BACKUP_FIELD_OPTIONS:
        value = _strip_matching_quotes(value)
        canonical_values = {option_value.lower(): option_value for option_value, _ in BACKUP_FIELD_OPTIONS[key]}
        normalized_value = canonical_values.get(value.lower())
        if normalized_value is None:
            raise HTTPException(status_code=400, detail=f"Invalid value for {key}")
        return normalized_value

    if key in BACKUP_NUMERIC_FIELDS:
        if not value.strip() and key in OPTIONAL_BACKUP_NUMERIC_FIELDS:
            return ""
        try:
            parsed_value = int(value)
        except ValueError as exc:
            raise HTTPException(status_code=400, detail=f"Invalid value for {key}") from exc
        if parsed_value < 0:
            raise HTTPException(status_code=400, detail=f"Invalid value for {key}")
        return str(parsed_value)

    if key in {"backup-oncalendar", "oncalendar"}:
        schedules = [item.strip() for item in value.replace("\r", "").splitlines() if item.strip()]
        if not schedules:
            raise HTTPException(status_code=400, detail=f"Invalid value for {key}")
        return "\n".join(schedules)

    return value


BACKUP_CRON_DEFAULTS = {
    "cron_minute": "15",
    "cron_hour": "3",
    "cron_day": "*",
    "cron_month": "*",
    "cron_weekday": "*",
}
BACKUP_WEEKDAY_MAP = {
    "0": "Sun",
    "1": "Mon",
    "2": "Tue",
    "3": "Wed",
    "4": "Thu",
    "5": "Fri",
    "6": "Sat",
    "7": "Sun",
    "sun": "Sun",
    "mon": "Mon",
    "tue": "Tue",
    "wed": "Wed",
    "thu": "Thu",
    "fri": "Fri",
    "sat": "Sat",
}
BACKUP_WEEKDAY_NAMES = {"Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"}


def normalize_backup_cron_field(field_name: str, value: str) -> str:
    rendered = value.strip() or "*"
    if field_name == "cron_weekday":
        if rendered == "*":
            return rendered
        if not re.fullmatch(r"[A-Za-z0-7]+", rendered):
            raise HTTPException(status_code=400, detail=f"Invalid value for {field_name}")
        normalized = BACKUP_WEEKDAY_MAP.get(rendered.lower(), rendered.title()[:3])
        if normalized not in BACKUP_WEEKDAY_NAMES:
            raise HTTPException(status_code=400, detail=f"Invalid value for {field_name}")
        return normalized
    if rendered == "*":
        return rendered
    if not rendered.isdigit():
        raise HTTPException(status_code=400, detail=f"Invalid value for {field_name}")
    parsed = int(rendered)
    limits = {
        "cron_minute": (0, 59),
        "cron_hour": (0, 23),
        "cron_day": (1, 31),
        "cron_month": (1, 12),
    }
    minimum, maximum = limits[field_name]
    if parsed < minimum or parsed > maximum:
        raise HTTPException(status_code=400, detail=f"Invalid value for {field_name}")
    return str(parsed)


def build_oncalendar_from_cron_fields(
    cron_minute: str,
    cron_hour: str,
    cron_day: str,
    cron_month: str,
    cron_weekday: str,
) -> str:
    normalized_minute = normalize_backup_cron_field("cron_minute", cron_minute)
    normalized_hour = normalize_backup_cron_field("cron_hour", cron_hour)
    normalized_day = normalize_backup_cron_field("cron_day", cron_day)
    normalized_month = normalize_backup_cron_field("cron_month", cron_month)
    normalized_weekday = normalize_backup_cron_field("cron_weekday", cron_weekday)
    minute_value = normalized_minute if normalized_minute == "*" else f"{int(normalized_minute):02d}"
    hour_value = normalized_hour if normalized_hour == "*" else f"{int(normalized_hour):02d}"
    weekday_prefix = "" if normalized_weekday == "*" else f"{normalized_weekday} "
    return f"{weekday_prefix}*-{normalized_month}-{normalized_day} {hour_value}:{minute_value}:00"


def parse_backup_schedule_to_cron_fields(schedule: Optional[dict]) -> dict[str, str]:
    values = dict(schedule or {})
    raw_oncalendar = str(values.get("oncalendar", "*-*-* 03:15:00")).replace("\r", "")
    first_line = next((line.strip() for line in raw_oncalendar.splitlines() if line.strip()), "")
    match = re.match(
        r"^(?:(?P<weekday>[A-Za-z]+)\s+)?\*-(?P<month>\*|\d{1,2})-(?P<day>\*|\d{1,2})\s+(?P<hour>\*|\d{1,2}):(?P<minute>\*|\d{1,2}):\d{2}$",
        first_line,
    )
    cron_fields = dict(BACKUP_CRON_DEFAULTS)
    cron_expression = "15 3 * * *"
    if match:
        cron_fields = {
            "cron_minute": normalize_backup_cron_field("cron_minute", match.group("minute")),
            "cron_hour": normalize_backup_cron_field("cron_hour", match.group("hour")),
            "cron_day": normalize_backup_cron_field("cron_day", match.group("day")),
            "cron_month": normalize_backup_cron_field("cron_month", match.group("month")),
            "cron_weekday": normalize_backup_cron_field("cron_weekday", match.group("weekday") or "*"),
        }
        cron_expression = " ".join(
            (
                cron_fields["cron_minute"],
                cron_fields["cron_hour"],
                cron_fields["cron_day"],
                cron_fields["cron_month"],
                cron_fields["cron_weekday"],
            )
        )
    elif first_line:
        cron_expression = "custom"
    values.update(cron_fields)
    values["cron_expression"] = cron_expression
    return values


def normalize_firewall_value(key: str, value: str) -> str:
    if key not in FIREWALL_FIELD_ORDER:
        raise HTTPException(status_code=400, detail=f"Unsupported firewall setting: {key}")
    if key == "firewall-enabled":
        if value not in {"yes", "no"}:
            raise HTTPException(status_code=400, detail=f"Invalid value for {key}")
        return value
    if key == "firewall-rules-json":
        try:
            rules = json.loads(value or "[]")
        except json.JSONDecodeError as exc:
            raise HTTPException(status_code=400, detail=f"Invalid value for {key}") from exc
        return json.dumps(normalize_firewall_rules(rules), separators=(",", ":"))
    normalized_ports = []
    for raw_port in value.replace(",", " ").split():
        if not raw_port.isdigit():
            raise HTTPException(status_code=400, detail=f"Invalid value for {key}")
        port = int(raw_port)
        if port < 1 or port > 65535:
            raise HTTPException(status_code=400, detail=f"Invalid value for {key}")
        normalized_ports.append(str(port))
    deduplicated_ports = list(dict.fromkeys(normalized_ports))
    return ", ".join(deduplicated_ports)


def normalize_firewall_rules(rules: list[dict]) -> list[dict[str, str]]:
    normalized: list[dict[str, str]] = []
    seen_ports: set[tuple[str, str, str, str]] = set()
    for rule in rules:
        ports = str(rule.get("ports", "")).strip()
        protocol = str(rule.get("protocol", "tcp")).strip().lower()
        source = str(rule.get("source", "")).strip()
        enabled = str(rule.get("enabled", "yes")).strip().lower()
        if not any((ports, source)) and not rule.get("ports"):
            continue
        if protocol not in {"tcp", "udp"}:
            raise HTTPException(status_code=400, detail="Invalid firewall rule protocol")
        if enabled not in {"yes", "no"}:
            raise HTTPException(status_code=400, detail="Invalid firewall rule status")
        normalized_tokens: list[str] = []
        for raw_token in re.split(r"[\s,]+", ports):
            if not raw_token:
                continue
            if "-" in raw_token:
                start_text, end_text = raw_token.split("-", 1)
                if not start_text.isdigit() or not end_text.isdigit():
                    raise HTTPException(status_code=400, detail="Invalid firewall rule ports")
                start_port = int(start_text)
                end_port = int(end_text)
                if start_port < 1 or end_port > 65535 or start_port > end_port:
                    raise HTTPException(status_code=400, detail="Invalid firewall rule ports")
                token = f"{start_port}-{end_port}"
            else:
                if not raw_token.isdigit():
                    raise HTTPException(status_code=400, detail="Invalid firewall rule ports")
                port = int(raw_token)
                if port < 1 or port > 65535:
                    raise HTTPException(status_code=400, detail="Invalid firewall rule ports")
                token = str(port)
            if token not in normalized_tokens:
                normalized_tokens.append(token)
        if not normalized_tokens:
            continue
        normalized_source = ""
        if source and source not in {"*", "any", "all"}:
            try:
                normalized_source = str(ip_network(source, strict=False))
            except ValueError as exc:
                raise HTTPException(status_code=400, detail="Invalid firewall rule source") from exc
        normalized_rule = {
            "ports": ", ".join(normalized_tokens),
            "protocol": protocol,
            "source": normalized_source,
            "enabled": enabled,
        }
        dedupe_key = tuple(normalized_rule.values())
        if dedupe_key in seen_ports:
            continue
        seen_ports.add(dedupe_key)
        normalized.append(normalized_rule)
    return normalized


def queue_item_reason(item: dict) -> str:
    reasons: list[str] = []
    for key in ("reason", "delay_reason", "status"):
        value = str(item.get(key, "")).strip()
        if value and value not in reasons:
            reasons.append(value)
    for recipient in item.get("recipients") or []:
        if not isinstance(recipient, dict):
            continue
        for key in ("delay_reason", "status"):
            value = str(recipient.get(key, "")).strip()
            if value and value not in reasons:
                reasons.append(value)
    return " | ".join(reasons) if reasons else "—"


def queue_item_state(item: dict) -> str:
    for key in ("queue_name", "status"):
        value = str(item.get(key, "")).strip()
        if value:
            return value
    return "—"


def pair_panel_updates(keys: list[str], values: list[str]) -> list[tuple[str, str]]:
    if len(keys) != len(values):
        raise HTTPException(status_code=400, detail="Mismatched configuration payload")
    return list(zip(keys, values))


def build_backup_storage_validation_config(current_config: dict[str, str], updates: list[tuple[str, str]]) -> dict[str, str]:
    merged = dict(current_config)
    for item_key, item_value in updates:
        if item_key in BACKUP_SECRET_FIELDS and not item_value:
            continue
        merged[item_key] = item_value
    return merged


def validate_backup_storage_config(config: dict[str, str]) -> None:
    storage_type = config.get("backup-storage-type") or config.get("type", "local")
    if storage_type == "local":
        return

    required_fields = {
        "s3": (
            ("backup-storage-remote-name", "remote_name"),
            ("backup-storage-bucket", "bucket"),
            ("backup-storage-access-key-id", "access_key_id"),
            ("backup-storage-secret-access-key", "secret_access_key"),
        ),
        "sftp": (
            ("backup-storage-remote-name", "remote_name"),
            ("backup-storage-host", "host"),
            ("backup-storage-user", "user"),
            ("backup-storage-password", "password"),
        ),
        "ftp": (
            ("backup-storage-remote-name", "remote_name"),
            ("backup-storage-host", "host"),
            ("backup-storage-user", "user"),
            ("backup-storage-password", "password"),
        ),
        "ftps": (
            ("backup-storage-remote-name", "remote_name"),
            ("backup-storage-host", "host"),
            ("backup-storage-user", "user"),
            ("backup-storage-password", "password"),
        ),
    }.get(storage_type, ())
    for field_options in required_fields:
        if any(str(config.get(field_name, "")).strip() for field_name in field_options):
            continue
        raise HTTPException(status_code=400, detail=f"Missing value for {field_options[-1]}")


def dns_export_rows(dns_records: dict[str, str]) -> list[dict[str, object]]:
    rows: list[dict[str, object]] = []
    for key in DNS_EXPORTABLE_RECORD_KEYS:
        raw_value = dns_records.get(key)
        if not raw_value or " IN " not in raw_value:
            continue
        name, _, remainder = raw_value.partition(" IN ")
        record_type, _, content = remainder.partition(" ")
        row: dict[str, object] = {
            "key": key,
            "name": name,
            "type": record_type,
            "content": content.strip(),
            "raw": raw_value,
        }
        if record_type == "MX":
            priority, _, target = content.strip().partition(" ")
            if priority.isdigit():
                row["priority"] = int(priority)
                row["content"] = target.strip()
        rows.append(row)
    return rows


def dns_zone_export(domain_name: str, dns_records: dict[str, str]) -> str:
    lines = [f"$ORIGIN {domain_name}.", "$TTL 3600"]
    for row in dns_export_rows(dns_records):
        owner = zone_relative_record_name(domain_name, str(row["name"]))
        if row["type"] == "MX" and "priority" in row:
            lines.append(f"{owner} IN MX {row['priority']} {row['content']}")
        else:
            lines.append(f"{owner} IN {row['type']} {row['content']}")
    if dns_records.get("arc"):
        lines.append(f"; {dns_records['arc']}")
    if dns_records.get("ptr"):
        lines.append(f"; {dns_records['ptr']}")
    if dns_records.get("mta_sts_policy"):
        lines.append(f"; MTA-STS policy URL: {dns_records['mta_sts_policy']}")
    return "\n".join(lines) + "\n"


def dns_json_export(domain: dict, dns_records: dict[str, str]) -> dict[str, object]:
    return {
        "domain": domain["name"],
        "records": dns_export_rows(dns_records),
        "notes": {
            key: dns_records[key]
            for key in ("arc", "ptr", "mta_sts_policy")
            if dns_records.get(key)
        },
    }

def build_model(model_cls: type[BaseModel], **kwargs):
    try:
        return model_cls(**kwargs)
    except ValidationError as exc:
        message = exc.errors()[0].get("msg", "Invalid input")
        raise HTTPException(status_code=400, detail=message) from exc



def panel_redirect(path: str, **params: str | int) -> RedirectResponse:
    query = urlencode({key: value for key, value in params.items() if value not in {None, ""}})
    target = f"{path}?{query}" if query else path
    return RedirectResponse(target, status_code=status.HTTP_303_SEE_OTHER)



def serialize_domain(domain: models.Domain) -> dict:
    return {
        "id": domain.id,
        "name": domain.name,
        "is_active": domain.is_active,
        "max_users": domain.max_users,
        "dmarc_policy": domain.dmarc_policy,
        "dkim_selector": domain.dkim_selector,
        "dkim_public_key": domain.dkim_public_key,
        "dkim_private_path": domain.dkim_private_path,
        "dns_provider": domain.dns_provider or "",
        "dns_account_id": domain.dns_account_id or "",
        "dns_zone_id": domain.dns_zone_id or "",
        "dns_sync_enabled": bool(domain.dns_sync_enabled),
        "dns_last_sync_at": domain.dns_last_sync_at,
        "dns_last_sync_status": domain.dns_last_sync_status or "",
        "dns_has_api_token": domain.dns_has_api_token,
        "created_at": domain.created_at,
        "account_count": len(domain.accounts),
        "alias_count": len(domain.aliases),
        "redirect_count": len(domain.redirects),
    }



def serialize_alias(alias: models.Alias) -> dict:
    domain = alias.domain
    source = f"{alias.source_local}@{domain.name}" if domain else alias.source_local
    return {
        "id": alias.id,
        "source": source,
        "destination": alias.destination,
        "is_active": alias.is_active,
        "created_at": alias.created_at,
        "domain_id": alias.domain_id,
    }



def serialize_redirect(redirect: models.Redirect) -> dict:
    domain = redirect.domain
    source = f"{redirect.source_local}@{domain.name}" if domain else redirect.source_local
    return {
        "id": redirect.id,
        "source": source,
        "target_email": redirect.target_email,
        "is_active": redirect.is_active,
        "created_at": redirect.created_at,
        "domain_id": redirect.domain_id,
    }



def panel_context(request: Request, session: dict, page: str, **extra) -> dict:
    context = {
        "request": request,
        "hostname": settings.hostname,
        "server_name": settings.server_name,
        "panel_title": settings.panel_title,
        "panel_favicon_url": "/panel/favicon" if settings.panel_favicon_path else "",
        "page": page,
        "panel_user": session["username"],
        "csrf_token": session["csrf_token"],
        "backup_field_order": BACKUP_FIELD_ORDER,
        "backup_schedule_field_order": BACKUP_SCHEDULE_FIELD_ORDER,
        "backup_storage_field_order": BACKUP_STORAGE_FIELD_ORDER,
        "backup_field_labels": BACKUP_FIELD_LABELS,
        "backup_field_options": BACKUP_FIELD_OPTIONS,
        "backup_numeric_fields": BACKUP_NUMERIC_FIELDS,
        "backup_secret_fields": BACKUP_SECRET_FIELDS,
        "firewall_status_options": FIREWALL_STATUS_OPTIONS,
        "firewall_protocol_options": FIREWALL_PROTOCOL_OPTIONS,
    }
    context.update(extra)
    return context


def _upload_to_base64(upload: UploadFile, label: str) -> str:
    upload.file.seek(0, 2)
    size = upload.file.tell()
    upload.file.seek(0)
    if size > MAX_SSL_FILE_SIZE_BYTES:
        raise HTTPException(
            status_code=400,
            detail=f"{label} upload exceeds the {MAX_SSL_FILE_SIZE_BYTES // (1024 * 1024)} MiB limit",
        )
    content = upload.file.read()
    if not content:
        raise HTTPException(status_code=400, detail=f"{label} upload is empty")
    return base64.b64encode(content).decode("ascii")


def _run_ssl_json(*args: str) -> dict:
    output = run_root_script("manage-ssl.sh", *args).strip()
    try:
        return json.loads(output or "{}")
    except json.JSONDecodeError as exc:
        raise HTTPException(status_code=500, detail="manage-ssl.sh returned invalid JSON output") from exc


def normalize_backup_tab(tab: Optional[str]) -> str:
    if tab in BACKUP_PANEL_TABS:
        return tab
    return "schedules"


def normalize_settings_tab(tab: Optional[str]) -> str:
    if tab in SETTINGS_PANEL_TABS:
        return tab
    return "general"


def list_domains_context(db: Session) -> dict:
    domains = db.query(models.Domain).order_by(models.Domain.name.asc()).all()
    return {"domains": [serialize_domain(domain) for domain in domains]}


def normalize_domain_tab(tab: Optional[str]) -> str:
    if tab in DOMAIN_DETAIL_TABS:
        return tab
    return "settings"


def panel_domain_path(domain_id: int) -> str:
    return f"/panel/domains/{domain_id}"


def panel_domain_redirect(
    domain_id: int,
    *,
    notice: Optional[str] = None,
    error: Optional[str] = None,
    tab: str = "settings",
    show_create: bool = False,
) -> RedirectResponse:
    params: dict[str, str | int] = {"tab": normalize_domain_tab(tab)}
    if notice:
        params["notice"] = notice
    if error:
        params["error"] = error
    if show_create:
        params["show_create"] = "1"
    return panel_redirect(panel_domain_path(domain_id), **params)



def selected_domain_context(db: Session, domain_id: Optional[int]) -> dict:
    domains = db.query(models.Domain).order_by(models.Domain.name.asc()).all()
    selected = None
    if domains:
        if domain_id is None:
            selected = domains[0]
        else:
            selected = next((domain for domain in domains if domain.id == domain_id), None)
            if not selected:
                raise HTTPException(status_code=404, detail="Domain not found")

    if not selected:
        return {
            "domains": [],
            "selected_domain": None,
            "domain_accounts": [],
            "domain_aliases": [],
            "domain_redirects": [],
            "dns_records": {},
        }

    accounts = db.query(models.Account).filter(models.Account.domain_id == selected.id).order_by(models.Account.created_at.desc()).all()
    aliases = db.query(models.Alias).filter(models.Alias.domain_id == selected.id).order_by(models.Alias.created_at.desc()).all()
    redirects = db.query(models.Redirect).filter(models.Redirect.domain_id == selected.id).order_by(models.Redirect.created_at.desc()).all()
    dns_records = api_suggested_dns(selected.id, db, _="panel")
    selected_domain = serialize_domain(selected)
    used_accounts = len(accounts)
    selected_domain["used_accounts"] = used_accounts
    selected_domain["available_accounts"] = "Unlimited" if selected.max_users == 0 else max(selected.max_users - used_accounts, 0)
    selected_domain["max_accounts_display"] = "Unlimited" if selected.max_users == 0 else selected.max_users
    return {
        "domains": [serialize_domain(domain) for domain in domains],
        "selected_domain": selected_domain,
        "domain_accounts": [serialize_account(account) for account in accounts],
        "domain_aliases": [serialize_alias(alias) for alias in aliases],
        "domain_redirects": [serialize_redirect(redirect) for redirect in redirects],
        "dns_records": dns_records,
    }



def dashboard_data(db: Session) -> dict:
    domains = db.query(models.Domain).count()
    active_domains = db.query(models.Domain).filter(models.Domain.is_active.is_(True)).count()
    accounts = db.query(models.Account).count()
    aliases = db.query(models.Alias).count()
    redirects = db.query(models.Redirect).count()
    queue, queue_available = optional_dashboard_list(load_queue)
    bans, bans_available = optional_dashboard_list(load_bans)
    backup_runs, backup_runs_available = optional_dashboard_list(load_backup_runs)
    data = {
        "stats": {
            "domains": domains,
            "active_domains": active_domains,
            "accounts": accounts,
            "aliases": aliases,
            "redirects": redirects,
            "queue_items": len(queue),
            "bans": len(bans),
            "backup_runs": len(backup_runs),
        },
        "recent_queue": queue[:10],
        "recent_backups": backup_runs[:5],
    }
    unavailable_sections = []
    if not queue_available:
        unavailable_sections.append("queue")
    if not bans_available:
        unavailable_sections.append("bans")
    if not backup_runs_available:
        unavailable_sections.append("backups")
    if unavailable_sections:
        data["error"] = (
            "Some system data is unavailable in the current environment: "
            + ", ".join(unavailable_sections)
            + "."
        )
    return data


def optional_dashboard_list(loader: Callable[[], list[dict]]) -> tuple[list[dict], bool]:
    try:
        return loader(), True
    except HTTPException as exc:
        loader_name = getattr(loader, "__name__", loader.__class__.__name__)
        logger.warning("Unable to load dashboard system data via %s: %s", loader_name, exc.detail)
        return [], False


def optional_panel_value(loader: Callable[[], object], fallback, section: str) -> tuple[object, Optional[str]]:
    try:
        return loader(), None
    except HTTPException as exc:
        loader_name = getattr(loader, "__name__", loader.__class__.__name__)
        logger.warning("Unable to load panel system data via %s: %s", loader_name, exc.detail)
        if section == "SSL configuration":
            return fallback, None
        return fallback, f"Some system data is unavailable in the current environment: {section}."
    except Exception as exc:
        loader_name = getattr(loader, "__name__", loader.__class__.__name__)
        logger.warning("Unable to load panel system data via %s: %s", loader_name, exc)
        if section == "SSL configuration":
            return fallback, None
        return fallback, f"Some system data is unavailable in the current environment: {section}."


def render_panel_login(request: Request, error: Optional[str] = None, status_code: int = 200):
    csrf_token = build_panel_login_csrf_token(request)
    context = {
        "request": request,
        "hostname": settings.hostname,
        "panel_title": settings.panel_title,
        "panel_favicon_url": "/panel/favicon" if settings.panel_favicon_path else "",
        "csrf_token": csrf_token,
        "error": error,
    }
    response = templates.TemplateResponse(request, "panel_login.html", context, status_code=status_code)
    issue_panel_login_csrf(response, request, token=csrf_token)
    return response


@router.get("/admin/queue", response_model=list[dict])
def admin_queue(_: str = Depends(require_admin)):
    return load_queue()


@router.post("/admin/queue/{queue_id}/{action}", response_model=dict)
def admin_queue_action(queue_id: str, action: str, _: str = Depends(require_admin)):
    if action not in {"hold", "release", "requeue"}:
        raise HTTPException(status_code=400, detail="Unsupported queue action")
    queue_id = validate_queue_id(queue_id)
    run_root_script("manage-queue.sh", action, queue_id)
    return {"ok": True, "queue_id": queue_id, "action": action}


@router.delete("/admin/queue/{queue_id}", response_model=dict)
def admin_queue_delete(queue_id: str, _: str = Depends(require_admin)):
    queue_id = validate_queue_id(queue_id)
    run_root_script("manage-queue.sh", "delete", queue_id)
    return {"ok": True, "queue_id": queue_id, "action": "delete"}


@router.get("/admin/bans", response_model=list[dict])
def admin_bans(_: str = Depends(require_admin)):
    return load_bans()


@router.post("/admin/bans", response_model=dict)
def admin_ban_create(payload: schemas.AdminBanCreate, _: str = Depends(require_admin)):
    run_root_script("manage-bans.sh", "ban", payload.ip, payload.reason)
    return {"ok": True, "ip": payload.ip}


@router.delete("/admin/bans", response_model=dict)
def admin_ban_delete(ip: str = Query(...), _: str = Depends(require_admin)):
    run_root_script("manage-bans.sh", "unban", ip)
    return {"ok": True, "ip": ip}


@router.get("/admin/limits", response_model=dict)
def admin_limits(_: str = Depends(require_admin)):
    return load_limits()


@router.patch("/admin/limits", response_model=dict)
def admin_limit_update(payload: schemas.AdminLimitUpdate, _: str = Depends(require_admin)):
    run_root_script("manage-limits.sh", "set", payload.key, payload.value)
    return {"ok": True, "key": payload.key, "value": payload.value}


@router.get("/admin/backups", response_model=dict)
def admin_backups(_: str = Depends(require_admin)):
    return {
        "config": load_backup_config(),
        "runs": load_backup_runs(),
        "schedules": load_backup_schedules(),
        "storages": load_backup_storages(),
    }


@router.patch("/admin/backups", response_model=dict)
def admin_backup_update(payload: schemas.AdminBackupUpdate, _: str = Depends(require_admin)):
    normalized_value = normalize_backup_value(payload.key, payload.value)
    run_root_script("manage-backups.sh", "set", payload.key, normalized_value)
    return {"ok": True, "key": payload.key, "value": normalized_value}


@router.get("/admin/backups/schedules", response_model=list[dict])
def admin_backup_schedules(_: str = Depends(require_admin)):
    return load_backup_schedules()


@router.post("/admin/backups/schedules", response_model=dict)
def admin_backup_schedule_save_api(payload: dict, _: str = Depends(require_admin)):
    return save_backup_schedule(json.dumps(payload))


@router.delete("/admin/backups/schedules/{schedule_id}", response_model=dict)
def admin_backup_schedule_delete_api(schedule_id: str, _: str = Depends(require_admin)):
    run_root_script("manage-backups.sh", "delete-schedule", schedule_id)
    return {"ok": True, "id": schedule_id}


@router.get("/admin/backups/storages", response_model=list[dict])
def admin_backup_storages(_: str = Depends(require_admin)):
    return load_backup_storages()


@router.post("/admin/backups/storages", response_model=dict)
def admin_backup_storage_save_api(payload: dict, _: str = Depends(require_admin)):
    return save_backup_storage(json.dumps(payload))


@router.delete("/admin/backups/storages/{storage_id}", response_model=dict)
def admin_backup_storage_delete_api(storage_id: str, _: str = Depends(require_admin)):
    run_root_script("manage-backups.sh", "delete-storage", storage_id)
    return {"ok": True, "id": storage_id}


@router.post("/admin/backups/storages/{storage_id}/test", response_model=dict)
def admin_backup_storage_test_api(storage_id: str, _: str = Depends(require_admin)):
    return test_backup_storage(storage_id)


@router.get("/admin/settings", response_model=dict)
def admin_settings(_: str = Depends(require_admin)):
    return load_panel_settings()


@router.patch("/admin/settings", response_model=dict)
def admin_settings_update(payload: schemas.AdminSettingsUpdate, _: str = Depends(require_admin)):
    pairs: list[tuple[str, str]] = []
    if payload.server_name is not None:
        pairs.append(("server-name", payload.server_name))
    if payload.panel_title is not None:
        pairs.append(("panel-title", payload.panel_title))
    if payload.auto_updates_enabled is not None:
        pairs.append(("auto-updates-enabled", "yes" if payload.auto_updates_enabled else "no"))
    if not pairs:
        return load_panel_settings()
    command = ["set-many"]
    for key, value in pairs:
        command.extend((key, value))
    output = run_root_script("manage-settings.sh", *command)
    return json.loads(output or "{}")


@router.post("/admin/settings/favicon", response_model=dict)
def admin_settings_favicon_upload(
    favicon_file: UploadFile = File(...),
    _: str = Depends(require_admin),
):
    import tempfile
    encoded = _upload_to_base64(favicon_file, "favicon")
    with tempfile.NamedTemporaryFile(mode="w", suffix=".b64", delete=False) as tmp:
        tmp.write(encoded)
        tmp_path = tmp.name
    try:
        output = run_root_script(
            "manage-settings.sh",
            "upload-favicon",
            favicon_file.filename or "favicon.ico",
            favicon_file.content_type or "image/x-icon",
            tmp_path,
            "--json",
        )
    finally:
        Path(tmp_path).unlink(missing_ok=True)
    return json.loads(output or "{}")


@router.post("/admin/settings/check-update", response_model=dict)
def admin_settings_check_update(_: str = Depends(require_admin)):
    return check_for_updates()


@router.post("/admin/settings/apply-update", response_model=dict)
def admin_settings_apply_update(_: str = Depends(require_admin)):
    return apply_update_now()


@router.get("/admin/firewall", response_model=dict)
def admin_firewall(_: str = Depends(require_admin)):
    return load_firewall_config()


@router.patch("/admin/firewall", response_model=dict)
def admin_firewall_update(payload: schemas.AdminFirewallUpdate, _: str = Depends(require_admin)):
    normalized_value = normalize_firewall_value(payload.key, payload.value)
    run_root_script("manage-firewall.sh", "set", payload.key, normalized_value)
    return {"ok": True, "key": payload.key, "value": normalized_value}


@router.post("/admin/backups/run", response_model=dict)
def admin_backup_run(_: str = Depends(require_admin)):
    run_root_script("manage-backups.sh", "run")
    return {"ok": True}


@router.get("/panel/login")
def panel_login_form(request: Request):
    ensure_web_panel_enabled()
    if load_panel_session(request):
        return panel_redirect("/panel")
    return render_panel_login(request)


@router.post("/panel/login")
def panel_login_submit(
    request: Request,
    username: str = Form(...),
    password: str = Form(...),
    csrf_token: str = Form(...),
):
    ensure_web_panel_enabled()
    validate_panel_login_csrf(request, csrf_token)
    address = client_address(request)
    if is_rate_limited(address):
        return render_panel_login(request, error="Too many authentication attempts", status_code=status.HTTP_429_TOO_MANY_REQUESTS)

    correct_user = secrets.compare_digest(username, panel_admin_username())
    correct_pass = verify_panel_secret(password)
    if not (correct_user and correct_pass):
        register_auth_failure(address)
        return render_panel_login(request, error="Invalid username or password", status_code=status.HTTP_401_UNAUTHORIZED)

    clear_auth_failures(address)
    response = panel_redirect("/panel")
    issue_panel_session(response, request, username)
    return response


@router.post("/panel/logout")
def panel_logout(
    request: Request,
    response: Response,
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    redirect = panel_redirect("/panel/login", notice="logged-out")
    delete_panel_session(redirect, request)
    return redirect


@router.get("/panel")
def panel_dashboard(
    request: Request,
    notice: Optional[str] = Query(default=None),
    session: dict = Depends(require_panel_session),
    db: Session = Depends(get_db),
):
    ensure_web_panel_enabled()
    dashboard = dashboard_data(db)
    error = dashboard.pop("error", None)
    context = panel_context(request, session, "dashboard", notice=notice, error=error, **dashboard)
    return templates.TemplateResponse(request, "panel.html", context)


@router.get("/panel/domains")
def panel_domains(
    request: Request,
    domain_id: Optional[int] = Query(default=None),
    notice: Optional[str] = Query(default=None),
    show_create: Optional[str] = Query(default=None),
    session: dict = Depends(require_panel_session),
    db: Session = Depends(get_db),
):
    ensure_web_panel_enabled()
    if domain_id is not None:
        return panel_domain_redirect(domain_id, notice=notice)
    context = panel_context(
        request,
        session,
        "domains",
        notice=notice,
        show_create_form=show_create is not None,
        **list_domains_context(db),
    )
    return templates.TemplateResponse(request, "panel.html", context)


@router.get("/panel/domains/{domain_id}")
def panel_domain_detail(
    request: Request,
    domain_id: int,
    tab: Optional[str] = Query(default="settings"),
    notice: Optional[str] = Query(default=None),
    show_create: Optional[str] = Query(default=None),
    session: dict = Depends(require_panel_session),
    db: Session = Depends(get_db),
    error: Optional[str] = Query(default=None),
):
    ensure_web_panel_enabled()
    context = panel_context(
        request,
        session,
        "domain-detail",
        notice=notice,
        error=error,
        active_domain_tab=normalize_domain_tab(tab),
        show_create_form=show_create is not None,
        **selected_domain_context(db, domain_id),
    )
    return templates.TemplateResponse(request, "panel.html", context)


@router.get("/panel/domains/{domain_id}/dns/export")
def panel_domain_dns_export(
    request: Request,
    domain_id: int,
    format: str = Query(default="json"),
    session: dict = Depends(require_panel_session),
    db: Session = Depends(get_db),
):
    ensure_web_panel_enabled()
    _ = request, session
    context = selected_domain_context(db, domain_id)
    selected_domain = context["selected_domain"]
    if not selected_domain:
        raise HTTPException(status_code=404, detail="Domain not found")
    dns_records = context["dns_records"]
    filename_base = re.sub(r"[^A-Za-z0-9._-]+", "-", selected_domain["name"])
    if format == "zone":
        return PlainTextResponse(
            dns_zone_export(selected_domain["name"], dns_records),
            media_type="text/dns",
            headers={"Content-Disposition": f'attachment; filename="{filename_base}.zone"'},
        )
    if format == "json":
        return JSONResponse(
            dns_json_export(selected_domain, dns_records),
            headers={"Content-Disposition": f'attachment; filename="{filename_base}.json"'},
        )
    raise HTTPException(status_code=400, detail="Unsupported DNS export format")


@router.get("/panel/queue")
def panel_queue(
    request: Request,
    notice: Optional[str] = None,
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    queue, error = optional_panel_value(load_queue, [], "queue")
    queue = [
        dict(item, display_reason=queue_item_reason(item), display_state=queue_item_state(item)) if isinstance(item, dict) else item
        for item in queue
    ]
    context = panel_context(
        request,
        session,
        "queue",
        notice=QUEUE_NOTICE_MESSAGES.get(notice, notice),
        error=error,
        queue=queue,
    )
    return templates.TemplateResponse(request, "panel.html", context)


@router.get("/panel/backups")
def panel_backups(
    request: Request,
    notice: Optional[str] = None,
    error: Optional[str] = None,
    backup_tab: Optional[str] = None,
    schedule_id: Optional[str] = None,
    storage_id: Optional[str] = None,
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    backup_config, config_error = optional_panel_value(load_backup_config, {}, "backup configuration")
    backup_runs, runs_error = optional_panel_value(load_backup_runs, [], "backups")
    backup_schedules, schedules_error = optional_panel_value(load_backup_schedules, [], "backup schedules")
    backup_schedules = [parse_backup_schedule_to_cron_fields(item) for item in backup_schedules]
    backup_storages, storages_error = optional_panel_value(load_backup_storages, [], "backup storages")
    notice_message = BACKUP_NOTICE_MESSAGES.get(notice, notice)
    active_backup_tab = normalize_backup_tab(backup_tab)
    private_key_download_token = pop_panel_flash(request, session, "backup_private_key_token")
    selected_run_schedule_id = schedule_id or (backup_schedules[0].get("id", "") if backup_schedules else "")
    selected_schedule = {}
    if schedule_id:
        selected_schedule = next((item for item in backup_schedules if item.get("id") == schedule_id), {})
    selected_schedule = parse_backup_schedule_to_cron_fields(selected_schedule)
    selected_storage = {}
    if storage_id:
        selected_storage = next((item for item in backup_storages if item.get("id") == storage_id), {})
    storage_options = [("", "Local only")] + [
        (item.get("id", ""), f"{item.get('name', item.get('id', 'storage'))} ({item.get('type', 'local')})") for item in backup_storages
    ]
    context = panel_context(
        request,
        session,
        "backups",
        notice=notice_message,
        error=error or config_error or runs_error or schedules_error or storages_error,
        backup_config=backup_config,
        backup_runs=backup_runs,
        backup_schedules=backup_schedules,
        backup_storages=backup_storages,
        active_backup_tab=active_backup_tab,
        private_key_download_token=private_key_download_token,
        selected_run_schedule_id=selected_run_schedule_id,
        selected_backup_schedule=selected_schedule,
        selected_backup_storage=selected_storage,
        backup_storage_options=storage_options,
    )
    return templates.TemplateResponse(request, "panel.html", context)


@router.get("/panel/security")
def panel_security(
    request: Request,
    notice: Optional[str] = None,
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    bans, error = optional_panel_value(load_bans, [], "bans")
    firewall_config, firewall_error = optional_panel_value(load_firewall_config, {}, "firewall")
    firewall_rules = list(firewall_config.get("firewall-rules", []))
    while len(firewall_rules) < 4:
        firewall_rules.append({"ports": "", "protocol": "tcp", "source": "", "enabled": "yes"})
    context = panel_context(
        request,
        session,
        "security",
        notice=FIREWALL_NOTICE_MESSAGES.get(notice, notice),
        error=error or firewall_error,
        bans=bans,
        firewall_config=firewall_config,
        firewall_rules=firewall_rules,
    )
    return templates.TemplateResponse(request, "panel.html", context)


@router.get("/panel/settings")
def panel_settings(
    request: Request,
    notice: Optional[str] = None,
    error: Optional[str] = None,
    settings_tab: Optional[str] = None,
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    settings_info, settings_error = optional_panel_value(load_panel_settings, {}, "settings")
    ssl_info, ssl_error = optional_panel_value(load_ssl_config, {}, "SSL configuration")
    limits, limits_error = optional_panel_value(load_limits, {}, "limits")
    api_info, api_error = optional_panel_value(load_api_credentials, {}, "API credentials")
    active_settings_tab = normalize_settings_tab(settings_tab)
    context = panel_context(
        request,
        session,
        "settings",
        notice=notice,
        error=error or settings_error or ssl_error or limits_error or api_error,
        settings_info=settings_info,
        ssl_info=ssl_info,
        limits=limits,
        api_info=api_info,
        active_settings_tab=active_settings_tab,
    )
    return templates.TemplateResponse(request, "panel.html", context)


@router.get("/panel/limits")
def panel_limits(
    request: Request,
    notice: Optional[str] = None,
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    limits, error = optional_panel_value(load_limits, {}, "limits")
    context = panel_context(
        request,
        session,
        "limits",
        notice=LIMIT_NOTICE_MESSAGES.get(notice, notice),
        error=error,
        limits=limits,
    )
    return templates.TemplateResponse(request, "panel.html", context)


@router.get("/panel/ssl")
def panel_ssl(
    request: Request,
    notice: Optional[str] = None,
    error: Optional[str] = None,
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    ssl_info, ssl_error = optional_panel_value(load_ssl_config, {}, "SSL configuration")
    context = panel_context(
        request,
        session,
        "ssl",
        notice=SSL_NOTICE_MESSAGES.get(notice, notice),
        error=error or ssl_error,
        ssl_info=ssl_info,
    )
    return templates.TemplateResponse(request, "panel.html", context)


@router.get("/panel/api")
def panel_api(
    request: Request,
    notice: Optional[str] = None,
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    api_info, error = optional_panel_value(load_api_credentials, {}, "API credentials")
    generated_api_password = pop_panel_flash(request, session, "generated_api_password")
    context = panel_context(
        request,
        session,
        "api",
        notice=notice,
        error=error,
        api_info=api_info,
        generated_api_password=generated_api_password,
    )
    return templates.TemplateResponse(request, "panel.html", context)


@router.post("/panel/settings/general")
def panel_settings_general_save(
    request: Request,
    server_name: str = Form(""),
    panel_title: str = Form(""),
    auto_updates_enabled: str = Form("no"),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    payload = build_model(
        schemas.AdminSettingsUpdate,
        server_name=server_name or None,
        panel_title=panel_title or None,
        auto_updates_enabled=auto_updates_enabled == "yes",
    )
    output = run_root_script(
        "manage-settings.sh",
        "set-many",
        "server-name",
        payload.server_name or "",
        "panel-title",
        payload.panel_title or "",
        "auto-updates-enabled",
        "yes" if auto_updates_enabled == "yes" else "no",
    )
    if output:
        try:
            payload = json.loads(output)
            settings.server_name = payload.get("server_name", settings.server_name)
            settings.panel_title = payload.get("panel_title", settings.panel_title)
            settings.panel_favicon_path = payload.get("panel_favicon_path", settings.panel_favicon_path)
            settings.enable_auto_updates = payload.get("auto_updates_enabled", "no") == "yes"
        except json.JSONDecodeError:
            pass
    return panel_redirect("/panel/settings", settings_tab="general", notice="Settings updated.")


@router.post("/panel/settings/favicon")
def panel_settings_favicon_save(
    request: Request,
    favicon_file: UploadFile = File(...),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    import tempfile
    try:
        encoded = _upload_to_base64(favicon_file, "favicon")
        # Write base64 to a temp file to avoid CLI argument length limits
        with tempfile.NamedTemporaryFile(mode="w", suffix=".b64", delete=False) as tmp:
            tmp.write(encoded)
            tmp_path = tmp.name
        try:
            output = run_root_script(
                "manage-settings.sh",
                "upload-favicon",
                favicon_file.filename or "favicon.ico",
                favicon_file.content_type or "image/x-icon",
                tmp_path,
                "--json",
            )
        finally:
            Path(tmp_path).unlink(missing_ok=True)
        payload = json.loads(output or "{}")
        settings.panel_favicon_path = payload.get("panel_favicon_path", settings.panel_favicon_path)
    except HTTPException as exc:
        return panel_redirect("/panel/settings", settings_tab="general", error=exc.detail)
    except json.JSONDecodeError:
        return panel_redirect("/panel/settings", settings_tab="general", error="Unable to save favicon.")
    return panel_redirect("/panel/settings", settings_tab="general", notice="Favicon updated.")


@router.post("/panel/settings/check-update")
def panel_settings_check_update(
    request: Request,
    background_tasks: BackgroundTasks,
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    background_tasks.add_task(check_for_updates)
    return panel_redirect("/panel/settings", settings_tab="general", notice="Update check started in the background. Refresh the page in a few seconds.")


@router.post("/panel/settings/apply-update")
def panel_settings_apply_update_now(
    request: Request,
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    try:
        apply_update_now()
    except HTTPException as exc:
        return panel_redirect("/panel/settings", settings_tab="general", error=exc.detail)
    return panel_redirect("/panel/settings", settings_tab="general", notice="Update requested.")


@router.get("/panel/favicon")
def panel_favicon():
    favicon_path = settings.panel_favicon_path
    if not favicon_path:
        raise HTTPException(status_code=404, detail="Favicon not configured")
    path = Path(favicon_path).resolve(strict=False)
    allowed_roots = (
        (settings.base_dir / "config" / "panel-assets").resolve(strict=False),
        Path("/etc/limristem-mail.d/panel-assets").resolve(strict=False),
    )
    if not any(path == root or path.is_relative_to(root) for root in allowed_roots):
        raise HTTPException(status_code=404, detail="Favicon not found")
    if not path.is_file():
        raise HTTPException(status_code=404, detail="Favicon not found")
    if path.suffix.lower() not in {".ico", ".png", ".jpg", ".jpeg", ".gif", ".webp"}:
        raise HTTPException(status_code=404, detail="Favicon not found")
    media_type = mimetypes.guess_type(path.name)[0] or "image/x-icon"
    return FileResponse(path, media_type=media_type)


@router.post("/panel/api/regenerate")
def panel_api_regenerate(
    request: Request,
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    result = regenerate_api_credentials()
    new_pass = result.get("api_admin_pass", "")
    if new_pass:
        set_panel_flash(request, session, "generated_api_password", new_pass)
    return panel_redirect("/panel/api", notice="api-regenerated")


@router.get("/admin/ssl", response_model=dict)
def admin_ssl(_: str = Depends(require_admin)):
    return load_ssl_config()


@router.post("/admin/ssl/selfsigned", response_model=dict)
def admin_ssl_selfsigned(_: str = Depends(require_admin)):
    return _run_ssl_json("selfsigned", "--json")


@router.post("/admin/ssl/letsencrypt", response_model=dict)
def admin_ssl_letsencrypt(email: Optional[str] = Query(default=None), _: str = Depends(require_admin)):
    command = ["letsencrypt"]
    if email:
        command.append(email)
    command.append("--json")
    return _run_ssl_json(*command)


@router.post("/admin/ssl/manual", response_model=dict)
def admin_ssl_manual(
    certificate_file: UploadFile = File(...),
    private_key_file: UploadFile = File(...),
    _: str = Depends(require_admin),
):
    import tempfile
    cert_b64 = _upload_to_base64(certificate_file, "certificate")
    key_b64 = _upload_to_base64(private_key_file, "private key")
    # Write base64 to temp files to avoid CLI argument length limits
    cert_tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".cert.b64", delete=False)
    key_tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".key.b64", delete=False)
    cert_tmp.write(cert_b64)
    key_tmp.write(key_b64)
    cert_tmp.close()
    key_tmp.close()
    try:
        return _run_ssl_json("import-pem", cert_tmp.name, key_tmp.name, "--json")
    finally:
        Path(cert_tmp.name).unlink(missing_ok=True)
        Path(key_tmp.name).unlink(missing_ok=True)


@router.post("/panel/domains/create")
def panel_create_domain(
    request: Request,
    name: str = Form(...),
    max_users: int = Form(0),
    dmarc_policy: str = Form("reject"),
    dkim_selector: str = Form("default"),
    generate_dkim: Optional[str] = Form(None),
    is_active: Optional[str] = Form(None),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
    db: Session = Depends(get_db),
    cache=Depends(get_cache),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    payload = build_model(
        schemas.DomainCreate,
        name=name,
        max_users=max_users,
        dmarc_policy=dmarc_policy,
        dkim_selector=dkim_selector,
        generate_dkim=generate_dkim is not None,
        is_active=is_active is not None,
    )
    domain = api_create_domain(payload, db, cache, _="panel")
    return panel_domain_redirect(domain.id, notice="domain-created")


@router.post("/panel/ssl/selfsigned")
def panel_ssl_selfsigned(
    request: Request,
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    try:
        _run_ssl_json("selfsigned", "--json")
    except HTTPException as exc:
        return panel_redirect("/panel/ssl", error=exc.detail)
    return panel_redirect("/panel/ssl", notice="ssl-selfsigned")


@router.post("/panel/ssl/letsencrypt")
def panel_ssl_letsencrypt(
    request: Request,
    email: str = Form(""),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    command = ["letsencrypt"]
    if email.strip():
        command.append(email.strip())
    command.append("--json")
    try:
        _run_ssl_json(*command)
    except HTTPException as exc:
        return panel_redirect("/panel/ssl", error=exc.detail)
    return panel_redirect("/panel/ssl", notice="ssl-letsencrypt")


@router.post("/panel/ssl/manual")
def panel_ssl_manual(
    request: Request,
    certificate_file: UploadFile = File(...),
    private_key_file: UploadFile = File(...),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    try:
        cert_b64 = _upload_to_base64(certificate_file, "certificate")
        key_b64 = _upload_to_base64(private_key_file, "private key")
        _run_ssl_json("import-pem", cert_b64, key_b64, "--json")
    except HTTPException as exc:
        return panel_redirect("/panel/ssl", error=exc.detail)
    return panel_redirect("/panel/ssl", notice="ssl-manual")


@router.post("/panel/domains/{domain_id}")
def panel_update_domain(
    request: Request,
    domain_id: int,
    action: str = Form("update"),
    max_users: int = Form(0),
    dmarc_policy: str = Form("reject"),
    is_active: Optional[str] = Form(None),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
    db: Session = Depends(get_db),
    cache=Depends(get_cache),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    if action == "delete":
        api_delete_domain(domain_id, db, cache, _="panel")
        return panel_redirect("/panel/domains", notice="domain-deleted")
    payload = build_model(
        schemas.DomainUpdate,
        max_users=max_users,
        dmarc_policy=dmarc_policy,
        is_active=is_active is not None,
    )
    api_update_domain(domain_id, payload, db, cache, _="panel")
    return panel_domain_redirect(domain_id, notice="domain-updated")


@router.post("/panel/domains/{domain_id}/dkim")
def panel_rotate_dkim(
    request: Request,
    domain_id: int,
    selector: str = Form("default"),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
    db: Session = Depends(get_db),
    cache=Depends(get_cache),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    result = api_rotate_dkim(domain_id, selector, db, cache, _="panel")
    dns_sync = result.get("dns_sync") if isinstance(result, dict) else None
    if isinstance(dns_sync, dict) and dns_sync.get("status") == "error":
        return panel_domain_redirect(domain_id, error=str(dns_sync.get("detail", "DNS sync failed")), tab="settings")
    return panel_domain_redirect(domain_id, notice="dkim-rotated")


@router.post("/panel/domains/{domain_id}/dns/provider")
def panel_update_domain_dns_provider(
    request: Request,
    domain_id: int,
    dns_provider: str = Form("cloudflare"),
    dns_account_id: str = Form(""),
    dns_zone_id: str = Form(""),
    dns_api_token: str = Form(""),
    dns_sync_enabled: Optional[str] = Form(None),
    clear_api_token: Optional[str] = Form(None),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
    db: Session = Depends(get_db),
    cache=Depends(get_cache),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    payload = build_model(
        schemas.DomainDnsProviderUpdate,
        dns_provider=dns_provider,
        dns_account_id=dns_account_id or None,
        dns_zone_id=dns_zone_id or None,
        dns_api_token=dns_api_token or None,
        dns_sync_enabled=dns_sync_enabled is not None,
        clear_api_token=clear_api_token is not None,
    )
    try:
        api_configure_dns_provider(domain_id, payload, db, cache, _="panel")
    except HTTPException as exc:
        return panel_domain_redirect(domain_id, error=exc.detail, tab="settings")
    return panel_domain_redirect(domain_id, notice="dns-provider-updated", tab="settings")


@router.post("/panel/domains/{domain_id}/dns/sync-dkim")
def panel_sync_dkim_dns(
    request: Request,
    domain_id: int,
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
    db: Session = Depends(get_db),
    cache=Depends(get_cache),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    try:
        api_sync_dkim_dns(domain_id, db, cache, _="panel")
    except HTTPException as exc:
        return panel_domain_redirect(domain_id, error=exc.detail, tab="settings")
    return panel_domain_redirect(domain_id, notice="dkim-dns-synced", tab="settings")


@router.post("/panel/accounts/create")
def panel_create_account(
    request: Request,
    domain_id: int = Form(...),
    local_part: str = Form(...),
    password: str = Form(...),
    quota_mb: int = Form(2048),
    is_active: Optional[str] = Form(None),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
    db: Session = Depends(get_db),
    cache=Depends(get_cache),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    domain = get_domain_or_404(db, domain_id)
    payload = build_model(
        schemas.AccountCreate,
        domain=domain.name,
        local_part=local_part,
        password=password,
        quota_mb=quota_mb,
        is_active=is_active is not None,
    )
    api_create_account(payload, db, cache, _="panel")
    return panel_domain_redirect(domain_id, notice="account-created", tab="accounts")


@router.post("/panel/accounts/{account_id}")
def panel_update_account(
    request: Request,
    account_id: int,
    action: str = Form("update"),
    quota_mb: Optional[int] = Form(None),
    password: str = Form(""),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
    db: Session = Depends(get_db),
    cache=Depends(get_cache),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    account = db.query(models.Account).filter(models.Account.id == account_id).first()
    if not account:
        raise HTTPException(status_code=404, detail="Account not found")
    domain_id = account.domain_id
    if action == "delete":
        api_delete_account(account_id, db, cache, _="panel")
        return panel_domain_redirect(domain_id, notice="account-deleted", tab="accounts")
    if action == "update-quota":
        payload = build_model(schemas.AccountUpdate, quota_mb=quota_mb)
        notice = "account-updated"
    elif action == "update-password":
        payload = build_model(schemas.AccountUpdate, password=password or None)
        notice = "account-updated"
    elif action == "suspend":
        payload = build_model(schemas.AccountUpdate, is_active=False)
        notice = "account-suspended"
    elif action == "restore":
        payload = build_model(schemas.AccountUpdate, is_active=True)
        notice = "account-restored"
    else:
        payload = build_model(
            schemas.AccountUpdate,
            quota_mb=quota_mb,
            password=password or None,
            is_active=account.is_active,
        )
        notice = "account-updated"
    api_update_account(account_id, payload, db, cache, _="panel")
    return panel_domain_redirect(domain_id, notice=notice, tab="accounts")


@router.post("/panel/aliases/create")
def panel_create_alias(
    request: Request,
    domain_id: int = Form(...),
    source_local: str = Form(...),
    destination: str = Form(...),
    is_active: Optional[str] = Form(None),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
    db: Session = Depends(get_db),
    cache=Depends(get_cache),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    domain = get_domain_or_404(db, domain_id)
    payload = build_model(
        schemas.AliasCreate,
        domain=domain.name,
        source_local=source_local,
        destination=destination,
        is_active=is_active is not None,
    )
    api_create_alias(payload, db, cache, _="panel")
    return panel_domain_redirect(domain_id, notice="alias-created", tab="aliases")


@router.post("/panel/aliases/{alias_id}")
def panel_update_alias(
    request: Request,
    alias_id: int,
    action: str = Form("update"),
    is_active: Optional[str] = Form(None),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
    db: Session = Depends(get_db),
    cache=Depends(get_cache),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    alias = db.query(models.Alias).filter(models.Alias.id == alias_id).first()
    if not alias:
        raise HTTPException(status_code=404, detail="Alias not found")
    domain_id = alias.domain_id
    if action == "delete":
        api_delete_alias(alias_id, db, cache, _="panel")
        return panel_domain_redirect(domain_id, notice="alias-deleted", tab="aliases")
    payload = build_model(schemas.AliasUpdate, is_active=is_active is not None)
    api_update_alias(alias_id, payload, db, cache, _="panel")
    return panel_domain_redirect(domain_id, notice="alias-updated", tab="aliases")


@router.post("/panel/redirects/create")
def panel_create_redirect(
    request: Request,
    domain_id: int = Form(...),
    source_local: str = Form(...),
    target_email: str = Form(...),
    is_active: Optional[str] = Form(None),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
    db: Session = Depends(get_db),
    cache=Depends(get_cache),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    domain = get_domain_or_404(db, domain_id)
    payload = build_model(
        schemas.RedirectCreate,
        domain=domain.name,
        source_local=source_local,
        target_email=target_email,
        is_active=is_active is not None,
    )
    api_create_redirect(payload, db, cache, _="panel")
    return panel_domain_redirect(domain_id, notice="redirect-created", tab="redirects")


@router.post("/panel/redirects/{redirect_id}")
def panel_update_redirect(
    request: Request,
    redirect_id: int,
    action: str = Form("update"),
    is_active: Optional[str] = Form(None),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
    db: Session = Depends(get_db),
    cache=Depends(get_cache),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    redirect = db.query(models.Redirect).filter(models.Redirect.id == redirect_id).first()
    if not redirect:
        raise HTTPException(status_code=404, detail="Redirect not found")
    domain_id = redirect.domain_id
    if action == "delete":
        api_delete_redirect(redirect_id, db, cache, _="panel")
        return panel_domain_redirect(domain_id, notice="redirect-deleted", tab="redirects")
    payload = build_model(schemas.RedirectUpdate, is_active=is_active is not None)
    api_update_redirect(redirect_id, payload, db, cache, _="panel")
    return panel_domain_redirect(domain_id, notice="redirect-updated", tab="redirects")


@router.post("/panel/queue/{queue_id}/{action}")
def panel_queue_action(
    request: Request,
    queue_id: str,
    action: str,
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    queue_id = validate_queue_id(queue_id)
    try:
        if action == "delete":
            run_root_script("manage-queue.sh", "delete", queue_id)
        elif action in {"hold", "release", "requeue"}:
            run_root_script("manage-queue.sh", action, queue_id)
        else:
            raise HTTPException(status_code=400, detail="Unsupported queue action")
    except HTTPException as exc:
        if exc.status_code >= 500:
            return panel_redirect("/panel/queue", error=exc.detail)
        raise
    return panel_redirect("/panel/queue", notice=f"queue-{action}")


@router.post("/panel/bans")
def panel_ban_create(
    request: Request,
    ip: str = Form(...),
    reason: str = Form("manual"),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    payload = build_model(schemas.AdminBanCreate, ip=ip, reason=reason)
    run_root_script("manage-bans.sh", "ban", payload.ip, payload.reason)
    return panel_redirect("/panel/security", notice="ban-created")


@router.post("/panel/bans/delete")
def panel_ban_delete(
    request: Request,
    ip: str = Form(...),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    run_root_script("manage-bans.sh", "unban", ip)
    return panel_redirect("/panel/security", notice="ban-deleted")


@router.post("/panel/limits")
def panel_limit_update(
    request: Request,
    key: list[str] = Form(...),
    value: list[str] = Form(...),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    updates = [build_model(schemas.AdminLimitUpdate, key=item_key, value=item_value) for item_key, item_value in pair_panel_updates(key, value)]
    command = ["set-many"]
    for payload in updates:
        command.extend((payload.key, payload.value))
    run_root_script("manage-limits.sh", *command)
    return panel_redirect("/panel/limits", notice="limit-updated")


@router.post("/panel/backups")
def panel_backup_update(
    request: Request,
    key: list[str] = Form(...),
    value: list[str] = Form(...),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    updates = [build_model(schemas.AdminBackupUpdate, key=item_key, value=item_value) for item_key, item_value in pair_panel_updates(key, value)]
    actionable_updates: list[tuple[str, str]] = []
    current_config = load_backup_config()
    skipped_password = False
    for payload in updates:
        if payload.key in BACKUP_SECRET_FIELDS and not payload.value:
            skipped_password = True
            continue
        actionable_updates.append((payload.key, normalize_backup_value(payload.key, payload.value)))
    validate_backup_storage_config(build_backup_storage_validation_config(current_config, actionable_updates))
    if not actionable_updates:
        notice = "backup-db-password-unchanged" if skipped_password else "backup-config-updated"
        return panel_redirect("/panel/backups", notice=notice)
    command = ["set-many"]
    for item_key, item_value in actionable_updates:
        command.extend((item_key, item_value))
    run_root_script("manage-backups.sh", *command)
    return panel_redirect("/panel/backups", notice="backup-config-updated")


@router.post("/panel/backups/schedules")
def panel_backup_schedule_save(
    request: Request,
    schedule_id: str = Form(""),
    name: str = Form(...),
    enabled: str = Form("yes"),
    cron_minute: str = Form(""),
    cron_hour: str = Form(""),
    cron_day: str = Form(""),
    cron_month: str = Form(""),
    cron_weekday: str = Form(""),
    oncalendar: str = Form(""),
    backup_type: str = Form("auto"),
    full_weekday: str = Form("Sun"),
    compression: str = Form("gzip"),
    include_database: str = Form("yes"),
    include_redis: str = Form("yes"),
    storage_id: str = Form(""),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    normalized_oncalendar = (
        build_oncalendar_from_cron_fields(cron_minute, cron_hour, cron_day, cron_month, cron_weekday)
        if any(item.strip() for item in (cron_minute, cron_hour, cron_day, cron_month, cron_weekday))
        else normalize_backup_value("oncalendar", oncalendar or "*-*-* 03:15:00")
    )
    try:
        result = save_backup_schedule(
            json.dumps(
                {
                    "id": schedule_id,
                    "name": name,
                    "enabled": normalize_backup_value("enabled", enabled),
                    "oncalendar": normalized_oncalendar,
                    "backup_type": normalize_backup_value("backup_type", backup_type),
                    "full_weekday": normalize_backup_value("full_weekday", full_weekday),
                    "compression": normalize_backup_value("compression", compression),
                    "include_database": normalize_backup_value("include_database", include_database),
                    "include_redis": normalize_backup_value("include_redis", include_redis),
                    "storage_id": storage_id.strip(),
                }
            )
        )
    except HTTPException as exc:
        return panel_redirect("/panel/backups", backup_tab="schedules", schedule_id=schedule_id, error=exc.detail)
    return panel_redirect("/panel/backups", backup_tab="schedules", schedule_id=result.get("id", schedule_id), notice="backup-schedule-updated")


@router.post("/panel/backups/schedules/delete")
def panel_backup_schedule_delete(
    request: Request,
    schedule_id: str = Form(...),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    run_root_script("manage-backups.sh", "delete-schedule", schedule_id)
    return panel_redirect("/panel/backups", backup_tab="schedules", notice="backup-schedule-deleted")


@router.post("/panel/backups/storages")
def panel_backup_storage_save(
    request: Request,
    storage_id: str = Form(""),
    name: str = Form(...),
    type: str = Form("local"),
    remote_name: str = Form(""),
    path: str = Form(""),
    host: str = Form(""),
    port: str = Form(""),
    user: str = Form(""),
    password: str = Form(""),
    bucket: str = Form(""),
    region: str = Form(""),
    endpoint: str = Form(""),
    access_key_id: str = Form(""),
    secret_access_key: str = Form(""),
    encrypt: str = Form("no"),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    try:
        result = save_backup_storage(
            json.dumps(
                {
                    "id": storage_id,
                    "name": name,
                    "type": normalize_backup_value("type", type),
                    "remote_name": remote_name,
                    "path": path,
                    "host": host,
                    "port": normalize_backup_value("port", port) if port.strip() else "",
                    "user": user,
                    "password": password,
                    "bucket": bucket,
                    "region": region,
                    "endpoint": endpoint,
                    "access_key_id": access_key_id,
                    "secret_access_key": secret_access_key,
                    "encrypt": normalize_backup_value("encrypt", encrypt),
                }
            )
        )
    except HTTPException as exc:
        return panel_redirect("/panel/backups", backup_tab="storages", storage_id=storage_id, error=exc.detail)
    saved_storage = result.get("storage", {})
    token = result.get("private_key_token", "")
    if token:
        set_panel_flash(request, session, "backup_private_key_token", token)
    return panel_redirect(
        "/panel/backups",
        backup_tab="storages",
        storage_id=saved_storage.get("id", storage_id),
        notice="backup-storage-updated",
    )


@router.post("/panel/backups/storages/delete")
def panel_backup_storage_delete(
    request: Request,
    storage_id: str = Form(...),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    run_root_script("manage-backups.sh", "delete-storage", storage_id)
    return panel_redirect("/panel/backups", backup_tab="storages", notice="backup-storage-deleted")


@router.post("/panel/backups/storages/test")
def panel_backup_storage_test(
    request: Request,
    storage_id: str = Form(...),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    result = test_backup_storage(storage_id)
    notice = result.get("detail", "backup-storage-tested")
    return panel_redirect("/panel/backups", backup_tab="storages", storage_id=storage_id, notice=notice)


@router.post("/panel/backups/private-key")
def panel_backup_private_key_download(
    request: Request,
    token: str = Form(...),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    if not SAFE_PRIVATE_KEY_TOKEN_RE.fullmatch(token):
        raise HTTPException(status_code=400, detail="Invalid private key token")
    key_data = run_root_script("manage-backups.sh", "consume-private-key", token)
    return PlainTextResponse(
        key_data,
        headers={"Content-Disposition": f'attachment; filename=\"limristem-mail-backup-{token}\"'},
    )


@router.post("/panel/firewall")
def panel_firewall_update(
    request: Request,
    firewall_enabled: str = Form(...),
    rule_ports: list[str] = Form(default=[]),
    rule_protocol: list[str] = Form(default=[]),
    rule_source: list[str] = Form(default=[]),
    rule_enabled: list[str] = Form(default=[]),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    if not (len(rule_ports) == len(rule_protocol) == len(rule_source) == len(rule_enabled)):
        raise HTTPException(status_code=400, detail="Mismatched firewall rule payload")
    rules = normalize_firewall_rules(
        [
            {"ports": ports, "protocol": protocol, "source": source, "enabled": enabled}
            for ports, protocol, source, enabled in zip(rule_ports, rule_protocol, rule_source, rule_enabled)
        ]
    )
    command = [
        "set-many",
        "firewall-enabled",
        normalize_firewall_value("firewall-enabled", firewall_enabled),
        "firewall-rules-json",
        normalize_firewall_value("firewall-rules-json", json.dumps(rules)),
    ]
    run_root_script("manage-firewall.sh", *command)
    return panel_redirect("/panel/security", notice="firewall-updated")


@router.post("/panel/backups/run")
def panel_backup_run(
    request: Request,
    schedule_id: str = Form(""),
    csrf_token: str = Form(...),
    session: dict = Depends(require_panel_session),
):
    ensure_web_panel_enabled()
    require_panel_csrf(session, csrf_token)
    try:
        if schedule_id:
            run_root_script("manage-backups.sh", "run", schedule_id)
        else:
            run_root_script("manage-backups.sh", "run")
    except HTTPException as exc:
        if exc.status_code >= 500:
            return panel_redirect("/panel/backups", backup_tab="runs", error=exc.detail)
        raise
    return panel_redirect("/panel/backups", backup_tab="runs", notice="backup-run-started")
