from __future__ import annotations import json import logging import threading import time from contextlib import asynccontextmanager from pathlib import Path from typing import Any, Literal from urllib.parse import urlencode from fastapi import FastAPI, HTTPException, Query, Request from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pydantic import BaseModel, Field from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.export_bundles import ( bundle_download_filename, create_multi_playlist_bundle, create_single_playlist_bundle, default_bundle_root, resolve_bundle_download_path, ) from musicdl.catalogsync.playlist_artifacts import locate_playlist_dir from musicdl.catalogsync.repository import CatalogRepository from musicdl.catalogsync.resolver_stats import ( default_resolver_stats_db_path, initialize_resolver_stats_database, ) from musicdl.catalogsync.services import CatalogSyncService from .config import CatalogsyncEnvManager from .jobdefs import job_lane_type from .maintenance import LocalDedupeBlockedError, LocalMaintenanceService from .repository import OpsRepository from .runner import JOB_STAGE_SEQUENCES, OpsRunner JobType = Literal[ "catalog_sync", "collect_only", "sync_only", "sync_download", "download_only", "upload_only", "download_upload", ] CommandType = Literal["pause", "resume", "cancel", "retry_item", "force_retry_item"] ALLOWED_COLLECT_SOURCES = {"netease", "qq", "kuwo"} ALLOWED_DOWNLOAD_SOURCES = {"qq", "kuwo", "migu", "qianqian", "kugou", "netease"} RETRY_COMMAND_TYPES = {"retry_item", "force_retry_item"} LOGGER = logging.getLogger(__name__) DOWNLOAD_JOB_TYPES = tuple( job_type for job_type, stages in JOB_STAGE_SEQUENCES.items() if "download" in stages ) LIVE_DASHBOARD_SNAPSHOT_INTERVAL_SECONDS = 0.5 PLAYLIST_SORT_FIELDS = {"id", "platform", "name", "play_count"} PLAYLIST_SORT_DIRECTIONS = {"asc", "desc"} PLAYLIST_SORT_DEFAULTS = { "id": "desc", "platform": "asc", "name": "asc", "play_count": "desc", } def _format_speed_text(speed_bytes_per_sec: int | None) -> str: speed_value = int(speed_bytes_per_sec or 0) if speed_value <= 0: return "0 B/s" if speed_value >= 1024 * 1024: return f"{speed_value / (1024 * 1024):.1f} MB/s" if speed_value >= 1024: return f"{speed_value / 1024:.1f} KB/s" return f"{speed_value} B/s" def _normalize_allowed_list(value: Any, allowed: set[str]) -> list[str]: if not value: return [] parts: list[str] = [] if isinstance(value, str): parts = [part.strip() for part in value.split(",") if part and part.strip()] elif isinstance(value, list): parts = [str(item).strip() for item in value if str(item).strip()] else: return [] normalized: list[str] = [] seen: set[str] = set() for item in parts: if item in allowed and item not in seen: normalized.append(item) seen.add(item) return normalized def _read_env_text(env_path: Path) -> str: if not env_path.exists(): return "" return env_path.read_text(encoding="utf-8") def _resolve_playlists_root(env_values: dict[str, str], catalog_repo: CatalogRepository) -> Path | None: root_dir = str(env_values.get("ROOT_DIR") or "").strip() if root_dir: path = Path(root_dir).resolve() / "playlists" path.mkdir(parents=True, exist_ok=True) return path library_dir = str(env_values.get("LIBRARY_DIR") or "").strip() if library_dir: path = Path(library_dir).resolve().parent / "playlists" path.mkdir(parents=True, exist_ok=True) return path library_root = catalog_repo.get_default_local_library_root() if library_root is None: return None path = library_root.parent / "playlists" path.mkdir(parents=True, exist_ok=True) return path def _collect_folder_listing(folder_path: Path) -> list[dict[str, Any]]: if not folder_path.exists(): return [] items: list[dict[str, Any]] = [] for child in sorted(folder_path.rglob("*")): if not child.is_file(): continue try: stat = child.stat() size_bytes = int(stat.st_size) except OSError: size_bytes = 0 items.append( { "relative_path": child.relative_to(folder_path).as_posix(), "absolute_path": str(child.resolve()), "size_bytes": size_bytes, } ) return items def _serialize_job(job: Any) -> dict[str, Any]: return { "id": int(job.id), "job_type": str(job.job_type), "status": getattr(job.status, "value", str(job.status)), "priority": int(job.priority), "requested_by": job.requested_by, "config_snapshot": dict(job.config_snapshot or {}), "sources": list(job.sources or []), "download_sources": list(job.download_sources or []), "playlist_scope": dict(job.playlist_scope or {}), "created_at": job.created_at, "started_at": job.started_at, "ended_at": job.ended_at, "last_error": job.last_error, "resume_token": job.resume_token, } def _display_from_payload(payload: dict[str, Any]) -> str | None: if not isinstance(payload, dict): return None for key in ("row", "playlist_row", "upload_row"): row = payload.get(key) if isinstance(row, dict): name = str(row.get("name") or row.get("song_name") or "").strip() if name: return name return str(payload.get("display_name") or payload.get("name") or "").strip() or None def _list_jobs(repo: OpsRepository, limit: int = 100) -> list[dict[str, Any]]: rows = repo._fetchall("SELECT * FROM job_runs ORDER BY id DESC LIMIT ?", (int(limit),)) return [_serialize_job(repo._row_to_job(row)) for row in rows] def _job_stages(repo: OpsRepository, job_id: int) -> list[dict[str, Any]]: return [ { "id": int(stage.id), "job_run_id": int(stage.job_run_id), "stage_type": str(stage.stage_type), "status": stage.status.value, "seq_no": int(stage.seq_no), "total_items": int(stage.total_items), "pending_items": int(stage.pending_items), "running_items": int(stage.running_items), "success_items": int(stage.success_items), "failed_items": int(stage.failed_items), "skipped_items": int(stage.skipped_items), "started_at": stage.started_at, "ended_at": stage.ended_at, "last_error": stage.last_error, } for stage in repo.list_job_stages(job_id) ] def _job_commands(repo: OpsRepository, job_id: int) -> list[dict[str, Any]]: rows = repo._fetchall( """ SELECT * FROM job_commands WHERE job_run_id = ? ORDER BY id DESC LIMIT 100 """, (int(job_id),), ) return [dict(row) for row in rows] def _queued_download_job_count(repo: OpsRepository) -> int: if not DOWNLOAD_JOB_TYPES: return 0 placeholders = ", ".join("?" for _ in DOWNLOAD_JOB_TYPES) row = repo._fetchone( f""" SELECT COUNT(*) AS count_value FROM job_runs WHERE status = ? AND job_type IN ({placeholders}) """, ("queued", *DOWNLOAD_JOB_TYPES), ) return int(row["count_value"]) if row else 0 def _dashboard_summary( repo: OpsRepository, *, task_rows: list[dict[str, Any]] | None = None, ) -> dict[str, int]: rows = repo._fetchall( """ SELECT status, COUNT(*) AS count_value FROM job_runs GROUP BY status """ ) counts = {str(row["status"]): int(row["count_value"]) for row in rows} if task_rows is None: queued_download_jobs = _queued_download_job_count(repo) else: queued_download_jobs = sum( 1 for row in task_rows if str(row.get("status")) == "queued" and str(row.get("lane_type")) == "download" ) return { "total_jobs": int(sum(counts.values())), "queued_jobs": int(counts.get("queued", 0)), "queued_download_jobs": int(queued_download_jobs), "running_jobs": int(counts.get("running", 0)), "paused_jobs": int(counts.get("paused", 0) + counts.get("pause_requested", 0)), "failed_jobs": int(counts.get("failed", 0) + counts.get("completed_with_errors", 0)), } def _serialize_worker_row(row: dict[str, Any]) -> dict[str, Any]: payload_text = str(row.get("current_display_text") or "").strip() fallback_text = str(row.get("song_name") or row.get("playlist_name") or row.get("item_key") or "").strip() speed_bytes_per_sec = int(row.get("speed_bytes_per_sec") or 0) return { "id": int(row["id"]), "job_run_id": row["job_run_id"], "job_stage_id": row["job_stage_id"], "worker_name": row["worker_name"], "status": row["status"], "current_job_item_id": row["current_job_item_id"], "current_song_id": row["current_song_id"], "current_playlist_id": row["current_playlist_id"], "display_text": payload_text or fallback_text, "stage_type": row.get("stage_type"), "item_type": row.get("item_type"), "last_progress_text": row.get("last_progress_text"), "downloaded_bytes": int(row.get("downloaded_bytes") or 0), "total_bytes": int(row.get("total_bytes") or 0), "speed_bytes_per_sec": speed_bytes_per_sec, "speed_text": _format_speed_text(speed_bytes_per_sec), "processed_count": int(row["processed_count"] or 0), "error_count": int(row["error_count"] or 0), "heartbeat_at": row.get("heartbeat_at"), } def _playlist_source_stats(repo: OpsRepository) -> list[dict[str, Any]]: rows = repo._fetchall( """ SELECT pp.platform, pp.pool_kind, pp.name AS pool_name, COUNT(DISTINCT pl.id) AS playlist_count FROM playlist_pools AS pp LEFT JOIN pool_playlists AS rel ON rel.pool_id = pp.id LEFT JOIN playlists AS pl ON pl.id = rel.playlist_id GROUP BY pp.id, pp.platform, pp.pool_kind, pp.name ORDER BY pp.platform ASC, pp.pool_kind ASC, pp.id ASC """ ) return [ { "platform": str(row["platform"]), "pool_kind": str(row["pool_kind"]), "pool_name": str(row["pool_name"]), "playlist_count": int(row["playlist_count"] or 0), } for row in rows ] def _playlist_filter_options(catalog_repo: CatalogRepository) -> dict[str, list[str]]: platform_rows = catalog_repo._fetchall( "SELECT DISTINCT platform FROM playlists WHERE platform IS NOT NULL AND TRIM(platform) != '' ORDER BY platform ASC" ) pool_kind_rows = catalog_repo._fetchall( "SELECT DISTINCT pool_kind FROM playlist_pools WHERE pool_kind IS NOT NULL AND TRIM(pool_kind) != '' ORDER BY pool_kind ASC" ) return { "platforms": [str(row["platform"]) for row in platform_rows], "pool_kinds": [str(row["pool_kind"]) for row in pool_kind_rows], } def _normalize_playlist_ids(values: list[Any]) -> list[int]: normalized: list[int] = [] seen: set[int] = set() for raw in values: try: playlist_id = int(raw) except (TypeError, ValueError): continue if playlist_id <= 0 or playlist_id in seen: continue normalized.append(playlist_id) seen.add(playlist_id) return normalized def _validate_playlist_page_size(page_size: int) -> int: if page_size not in {20, 50, 100}: raise HTTPException(status_code=422, detail="page_size must be one of 20, 50, 100") return page_size def _normalize_optional_filter(value: str | None) -> str | None: if value is None: return None normalized = value.strip() return normalized or None def _normalize_playlist_sort( sort_by: str | None, sort_dir: str | None, ) -> tuple[str | None, str | None]: normalized_sort_by = _normalize_optional_filter(sort_by) normalized_sort_dir = _normalize_optional_filter(sort_dir) if normalized_sort_by is None: return None, None if normalized_sort_by not in PLAYLIST_SORT_FIELDS: raise HTTPException(status_code=422, detail="sort_by must be one of id, platform, name, play_count") if normalized_sort_dir is None: normalized_sort_dir = PLAYLIST_SORT_DEFAULTS[normalized_sort_by] normalized_sort_dir = normalized_sort_dir.lower() if normalized_sort_dir not in PLAYLIST_SORT_DIRECTIONS: raise HTTPException(status_code=422, detail="sort_dir must be one of asc, desc") return normalized_sort_by, normalized_sort_dir def _playlist_query_string( filters: dict[str, Any], *, overrides: dict[str, Any] | None = None, ) -> str: payload = dict(filters or {}) payload.update(overrides or {}) params: list[tuple[str, str]] = [ ("page", str(int(payload.get("page") or 1))), ("page_size", str(int(payload.get("page_size") or 50))), ] for key in ("keyword", "platform", "pool_kind", "status", "sort_by", "sort_dir"): value = payload.get(key) if value in (None, ""): continue params.append((key, str(value))) if payload.get("wanted_only"): params.append(("wanted_only", "1")) return urlencode(params) def _build_playlist_sort_links(filters: dict[str, Any]) -> dict[str, dict[str, str]]: links: dict[str, dict[str, str]] = {} current_sort_by = str(filters.get("sort_by") or "") current_sort_dir = str(filters.get("sort_dir") or "") for field in PLAYLIST_SORT_FIELDS: is_current = current_sort_by == field if is_current: next_dir = "desc" if current_sort_dir == "asc" else "asc" indicator = "^" if current_sort_dir == "asc" else "v" else: next_dir = PLAYLIST_SORT_DEFAULTS[field] indicator = "" links[field] = { "href": "?" + _playlist_query_string( filters, overrides={"page": 1, "sort_by": field, "sort_dir": next_dir}, ), "indicator": indicator, } return links def _parse_optional_bool_query(value: str | bool | None, *, field_name: str) -> bool: if value is None: return False if isinstance(value, bool): return value normalized = str(value).strip().lower() if not normalized: return False if normalized in {"1", "true", "t", "yes", "y", "on"}: return True if normalized in {"0", "false", "f", "no", "n", "off"}: return False raise HTTPException(status_code=422, detail=f"{field_name} must be a valid boolean value") def _list_playlist_page_or_422( catalog_repo: CatalogRepository, *, page: int, page_size: int, platform: str | None, pool_kind: str | None, status: str | None, keyword: str | None, wanted_only: bool, sort_by: str | None, sort_dir: str | None, ) -> dict[str, Any]: try: return catalog_repo.list_playlist_page( page=page, page_size=page_size, platform=platform, pool_kind=pool_kind, status=status, keyword=keyword, wanted_only=wanted_only, sort_by=sort_by, sort_dir=sort_dir, ) except ValueError as exc: raise HTTPException(status_code=422, detail=str(exc)) from exc def _job_workers(repo: OpsRepository, job_id: int, *, active_only: bool = False) -> list[dict[str, Any]]: return [ _serialize_worker_row(row) for row in repo.list_job_workers(job_id, active_only=active_only, limit=50) ] def _job_running_items(repo: OpsRepository, job_id: int) -> list[dict[str, Any]]: workers = { int(worker["current_job_item_id"]): worker for worker in _job_workers(repo, job_id, active_only=False) if worker.get("current_job_item_id") is not None } items = repo.list_job_items_with_context(job_id, statuses=["running"], limit=100) serialized: list[dict[str, Any]] = [] for item in items: payload = item.get("payload") or {} worker = workers.get(int(item["id"])) serialized.append( { "id": int(item["id"]), "stage_type": item.get("stage_type"), "item_type": item.get("item_type"), "status": item.get("status"), "song_id": item.get("song_id"), "playlist_id": item.get("playlist_id"), "worker_name": worker.get("worker_name") if worker else None, "display_name": ( (worker.get("display_text") if worker else None) or item.get("song_name") or item.get("playlist_name") or _display_from_payload(payload) or str(item.get("item_key") or "") ), "started_at": item.get("started_at"), } ) return serialized def _dashboard_workers(repo: OpsRepository) -> list[dict[str, Any]]: rows = repo._fetchall( """ SELECT jw.*, js.stage_type, ji.item_type, ji.item_key, s.name AS song_name, p.name AS playlist_name FROM job_workers AS jw JOIN job_items AS ji ON ji.id = jw.current_job_item_id AND ji.status = 'running' JOIN job_stages AS js ON js.id = ji.job_stage_id JOIN job_runs AS jr ON jr.id = js.job_run_id AND jr.status IN ('running', 'pause_requested') LEFT JOIN songs AS s ON s.id = COALESCE(jw.current_song_id, ji.song_id) LEFT JOIN playlists AS p ON p.id = COALESCE(jw.current_playlist_id, ji.playlist_id) ORDER BY CASE WHEN jw.status = 'running' THEN 0 WHEN jw.status = 'paused' THEN 1 ELSE 2 END, COALESCE(jw.heartbeat_at, '') DESC, jw.id DESC LIMIT 50 """ ) return [_serialize_worker_row(dict(row)) for row in rows] def _dashboard_running_items(repo: OpsRepository) -> list[dict[str, Any]]: rows = repo._fetchall( """ SELECT i.*, s.stage_type, s.job_run_id, w.worker_name, w.current_display_text, p.name AS playlist_name, sg.name AS song_name FROM job_items AS i JOIN job_stages AS s ON s.id = i.job_stage_id LEFT JOIN job_workers AS w ON w.current_job_item_id = i.id LEFT JOIN playlists AS p ON p.id = i.playlist_id LEFT JOIN songs AS sg ON sg.id = i.song_id WHERE i.status = 'running' ORDER BY i.id DESC LIMIT 100 """ ) items: list[dict[str, Any]] = [] for row in rows: payload = json.loads(row["payload_json"] or "{}") items.append( { "id": int(row["id"]), "job_run_id": row["job_run_id"], "stage_type": row["stage_type"], "item_type": row["item_type"], "status": row["status"], "song_id": row["song_id"], "playlist_id": row["playlist_id"], "worker_name": row["worker_name"], "display_name": ( str(row["current_display_text"] or "").strip() or str(row["song_name"] or "").strip() or str(row["playlist_name"] or "").strip() or _display_from_payload(payload) or str(row["item_key"]) ), "started_at": row["started_at"], } ) return items def _download_stats(repo: OpsRepository) -> dict[str, int]: stats = repo.get_download_stats() playlist_row = repo._fetchone("SELECT COUNT(*) AS count_value FROM playlists") pool_row = repo._fetchone("SELECT COUNT(*) AS count_value FROM playlist_pools") running_song_row = repo._fetchone( """ SELECT COUNT(*) AS count_value FROM job_items WHERE status = 'running' AND song_id IS NOT NULL """ ) stats["total_playlists"] = int(playlist_row["count_value"]) if playlist_row else 0 stats["total_playlist_pools"] = int(pool_row["count_value"]) if pool_row else 0 stats["running_song_items"] = int(running_song_row["count_value"]) if running_song_row else 0 return stats def _dashboard_transfer_stats(workers: list[dict[str, Any]]) -> dict[str, Any]: download_speed_bytes_per_sec = sum( int(worker.get("speed_bytes_per_sec") or 0) for worker in workers if str(worker.get("stage_type") or "").lower() == "download" ) return { "download_speed_bytes_per_sec": int(download_speed_bytes_per_sec), "download_speed_text": _format_speed_text(download_speed_bytes_per_sec), "upload_speed_bytes_per_sec": 0, "upload_speed_text": "-", } def _dashboard_payload(repo: OpsRepository, *, include_task_rows: bool = False) -> dict[str, Any]: active_job = repo.get_active_job() task_rows = repo.list_task_center_rows(limit=50) if include_task_rows else None workers = _dashboard_workers(repo) payload = { "summary": _dashboard_summary(repo, task_rows=task_rows), "download_stats": _download_stats(repo), "transfer_stats": _dashboard_transfer_stats(workers), "workers": workers, "running_items": _dashboard_running_items(repo), "playlist_sources": _playlist_source_stats(repo), "active_job": _serialize_job(active_job) if active_job is not None else None, "jobs": _list_jobs(repo, limit=10), } if task_rows is not None: payload["task_rows"] = task_rows recent_events = repo._fetchall( """ SELECT id, job_run_id, event_type, message, created_at FROM job_events ORDER BY id DESC LIMIT 5 """ ) payload["recent_events"] = [dict(row) for row in recent_events] return payload def _split_dashboard_task_rows(task_rows: list[dict[str, Any]]) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: done_statuses = {"completed", "completed_with_errors", "failed", "canceled"} doing_rows: list[dict[str, Any]] = [] done_rows: list[dict[str, Any]] = [] for row in task_rows or []: if str(row.get("status") or "").lower() in done_statuses: done_rows.append(row) else: doing_rows.append(row) done_rows.sort( key=lambda row: ( str(row.get("ended_at") or row.get("started_at") or row.get("created_at") or ""), int(row.get("id") or 0), ), reverse=True, ) done_rows = done_rows[:10] return doing_rows, done_rows def _job_detail_payload(repo: OpsRepository, job_id: int) -> dict[str, Any]: job = repo.get_job(job_id) if job is None: raise HTTPException(status_code=404, detail="job not found") return { "job": _serialize_job(job), "stages": _job_stages(repo, job_id), "commands": _job_commands(repo, job_id), "workers": _job_workers(repo, job_id, active_only=False), "running_items": _job_running_items(repo, job_id), "download_stats": _download_stats(repo), "playlist_progress": repo.list_job_playlist_progress(job_id), } def _ensure_job_stages(repo: OpsRepository, job_id: int, job_type: str) -> None: if repo.list_job_stages(job_id): return for seq_no, stage_type in enumerate(JOB_STAGE_SEQUENCES.get(job_type, []), start=1): repo.create_stage(job_run_id=job_id, stage_type=stage_type, seq_no=seq_no) class JobCreateRequest(BaseModel): job_type: JobType = Field(default="catalog_sync") requested_by: str | None = None sources: list[Literal["netease", "qq", "kuwo"]] = Field(default_factory=list) download_sources: list[Literal["qq", "kuwo", "migu", "qianqian", "kugou", "netease"]] = Field( default_factory=list ) playlist_scope: dict[str, Any] = Field(default_factory=dict) config_snapshot: dict[str, Any] | None = None class JobCommandRequest(BaseModel): command_type: CommandType target_item_id: int | None = Field(default=None, ge=1) payload: dict[str, Any] = Field(default_factory=dict) class EnvUpdateRequest(BaseModel): content: str = "" note: str | None = None class PlaylistBulkRequest(BaseModel): playlist_ids: list[int] = Field(default_factory=list) requested_by: str | None = None marked_by: str | None = None def _create_scoped_playlist_job( repo: OpsRepository, env_manager: CatalogsyncEnvManager, *, job_type: Literal["download_only", "sync_download", "sync_only"], playlist_ids: list[int], requested_by: str | None = None, ): normalized_ids = _normalize_playlist_ids(playlist_ids) if not normalized_ids: return None snapshot = env_manager.build_job_snapshot() sources = _normalize_allowed_list( snapshot.get("sources") or snapshot.get("SOURCES"), ALLOWED_COLLECT_SOURCES, ) download_sources = _normalize_allowed_list( snapshot.get("download_sources") or snapshot.get("DOWNLOAD_SOURCES"), ALLOWED_DOWNLOAD_SOURCES, ) job_id = repo.create_job( job_type=job_type, requested_by=requested_by, config_snapshot=dict(snapshot), sources=sources, download_sources=download_sources, playlist_scope={"playlist_ids": normalized_ids}, ) _ensure_job_stages(repo, job_id, job_type) job = repo.get_job(job_id) if job is None: raise HTTPException(status_code=500, detail="job create failed") return _serialize_job(job) def _build_playlist_job( repo: OpsRepository, env_manager: CatalogsyncEnvManager, *, job_type: Literal["download_only", "sync_download", "sync_only"], payload: PlaylistBulkRequest, ) -> dict[str, Any]: playlist_ids = _normalize_playlist_ids(payload.playlist_ids) if not playlist_ids: raise HTTPException(status_code=422, detail="playlist_ids is required") job = _create_scoped_playlist_job( repo, env_manager, job_type=job_type, playlist_ids=playlist_ids, requested_by=payload.requested_by, ) if job is None: raise HTTPException(status_code=500, detail="job create failed") return {"job": job} def _build_adaptive_download_jobs( repo: OpsRepository, catalog_repo: CatalogRepository, env_manager: CatalogsyncEnvManager, *, payload: PlaylistBulkRequest, ) -> dict[str, Any]: playlist_ids = _normalize_playlist_ids(payload.playlist_ids) if not playlist_ids: raise HTTPException(status_code=422, detail="playlist_ids is required") rows_by_id = _playlist_export_rows_by_id_or_404(catalog_repo, playlist_ids) download_playlist_ids: list[int] = [] sync_download_playlist_ids: list[int] = [] skipped_downloaded_playlist_ids: list[int] = [] for playlist_id in playlist_ids: row = rows_by_id.get(int(playlist_id)) if row is None: continue state_code = str(row.get("state_code") or "") if state_code == "unsynced": sync_download_playlist_ids.append(int(playlist_id)) continue if state_code == "downloaded": skipped_downloaded_playlist_ids.append(int(playlist_id)) continue download_playlist_ids.append(int(playlist_id)) download_job = _create_scoped_playlist_job( repo, env_manager, job_type="download_only", playlist_ids=download_playlist_ids, requested_by=payload.requested_by, ) sync_download_job = _create_scoped_playlist_job( repo, env_manager, job_type="sync_download", playlist_ids=sync_download_playlist_ids, requested_by=payload.requested_by, ) jobs = [job for job in (download_job, sync_download_job) if job is not None] response: dict[str, Any] = { "playlist_ids": playlist_ids, "download_playlist_ids": download_playlist_ids, "sync_download_playlist_ids": sync_download_playlist_ids, "skipped_downloaded_playlist_ids": skipped_downloaded_playlist_ids, "download_job": download_job, "sync_download_job": sync_download_job, "job": jobs[0] if len(jobs) == 1 else None, } if not jobs: response["message"] = "no runnable playlist download jobs were created" return response def _playlist_export_rows_by_id_or_404( catalog_repo: CatalogRepository, playlist_ids: list[int], ) -> dict[int, dict[str, Any]]: playlist_rows = catalog_repo.list_playlist_export_state_rows(playlist_ids) if not playlist_rows: raise HTTPException(status_code=404, detail="playlist not found") rows_by_id = {int(row["id"]): dict(row) for row in playlist_rows} missing_playlist_ids = [ int(playlist_id) for playlist_id in playlist_ids if int(playlist_id) not in rows_by_id ] if missing_playlist_ids: raise HTTPException( status_code=404, detail={ "message": "playlist not found", "missing_playlist_ids": missing_playlist_ids, }, ) return rows_by_id def create_app( db_path: str | Path, env_path: str | Path, *, start_runner: bool = False, runner_sleep_seconds: float = 1.0, ) -> FastAPI: db_file = Path(db_path) env_file = Path(env_path) initialize_database(db_file).close() initialize_resolver_stats_database(default_resolver_stats_db_path(db_file)).close() repo = OpsRepository(db_file) catalog_repo = CatalogRepository(db_file) maintenance_service = LocalMaintenanceService(db_file) env_manager = CatalogsyncEnvManager( db_path=db_file, env_file_path=env_file, repository=repo, ) templates_dir = Path(__file__).resolve().parents[1] / "templates" static_dir = Path(__file__).resolve().parents[1] / "static" templates = Jinja2Templates(directory=str(templates_dir)) runner_stop_event: threading.Event | None = None runner_thread: threading.Thread | None = None if start_runner: runner = OpsRunner(repository=repo, sleep_seconds=runner_sleep_seconds) runner_restart_delay = max(min(float(runner_sleep_seconds), 1.0), 0.1) def _run_runner_with_supervision(stop_event: threading.Event) -> None: while not stop_event.is_set(): try: LOGGER.info("Starting embedded ops runner") runner.run_forever(stop_event) LOGGER.info("Embedded ops runner stopped") return except Exception: LOGGER.exception( "Embedded ops runner crashed; restarting in %.2f seconds", runner_restart_delay, ) if stop_event.wait(runner_restart_delay): return @asynccontextmanager async def lifespan(_: FastAPI): nonlocal runner_stop_event, runner_thread if start_runner: if runner_thread is None or not runner_thread.is_alive(): runner_stop_event = threading.Event() runner_thread = threading.Thread( target=_run_runner_with_supervision, args=(runner_stop_event,), daemon=True, name="catalogsync-ops-runner", ) runner_thread.start() try: yield finally: if runner_stop_event is not None: runner_stop_event.set() if runner_thread is not None: runner_thread.join(timeout=5.0) app = FastAPI(title="Catalogsync Operations Console", lifespan=lifespan) app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") @app.get("/", include_in_schema=False) def index_redirect(): return RedirectResponse(url="/dashboard", status_code=307) @app.get("/dashboard", response_class=HTMLResponse) def dashboard(request: Request): dashboard_payload = _dashboard_payload(repo, include_task_rows=True) task_rows = dashboard_payload.get("task_rows", []) doing_task_rows, done_task_rows = _split_dashboard_task_rows(task_rows) env_values = env_manager.load_current() return templates.TemplateResponse( name="ops/dashboard.html", request=request, context={ "title": "Dashboard", "sse_url": "/api/events/stream", "dashboard_api_url": "/api/dashboard", "summary": dashboard_payload["summary"], "download_stats": dashboard_payload["download_stats"], "transfer_stats": dashboard_payload["transfer_stats"], "task_rows": task_rows, "doing_task_rows": doing_task_rows, "done_task_rows": done_task_rows, "workers": dashboard_payload["workers"], "running_items": dashboard_payload["running_items"], "playlist_sources": dashboard_payload["playlist_sources"], "active_job": dashboard_payload["active_job"], "jobs": dashboard_payload["jobs"], "maintenance_local_duplicates_scan_api": "/api/maintenance/local-duplicates", "maintenance_local_duplicates_dedupe_api": "/api/maintenance/local-duplicates/dedupe", "job_type_options": [ ("catalog_sync", "Collect + Sync + Download"), ("collect_only", "Collect Only"), ("sync_only", "Sync Only"), ("sync_download", "Sync + Download"), ("download_only", "Download Only"), ("upload_only", "Upload Only"), ("download_upload", "Download + Upload"), ], "default_sources": ",".join( _normalize_allowed_list( env_values.get("SOURCES") or "netease,qq,kuwo", ALLOWED_COLLECT_SOURCES, ) or ["netease", "qq", "kuwo"] ), "default_download_sources": ",".join( _normalize_allowed_list( env_values.get("DOWNLOAD_SOURCES") or env_values.get("download_sources"), ALLOWED_DOWNLOAD_SOURCES, ) or ["qq", "kuwo", "migu", "qianqian", "kugou", "netease"] ), }, ) @app.get("/jobs", response_class=HTMLResponse) def jobs_page(request: Request): return templates.TemplateResponse( name="ops/jobs.html", request=request, context={"title": "Jobs", "jobs": _list_jobs(repo, limit=200)}, ) @app.get("/jobs/{job_id}", response_class=HTMLResponse) def job_detail_page(request: Request, job_id: int): payload = _job_detail_payload(repo, job_id) return templates.TemplateResponse( name="ops/job_detail.html", request=request, context={ "title": f"Job {job_id}", "job": payload["job"], "stages": payload["stages"], "commands": payload["commands"], "workers": payload["workers"], "running_items": payload["running_items"], "download_stats": payload["download_stats"], "playlist_progress": payload["playlist_progress"], "command_endpoint": f"/api/jobs/{job_id}/commands", }, ) @app.get("/playlists", response_class=HTMLResponse) def playlists_page( request: Request, page: int = Query(default=1, ge=1), page_size: int = Query(default=50), platform: str | None = Query(default=None), pool_kind: str | None = Query(default=None), status: str | None = Query(default=None), keyword: str | None = Query(default=None), wanted_only: str | None = Query(default=None), sort_by: str | None = Query(default=None), sort_dir: str | None = Query(default=None), ): normalized_page_size = _validate_playlist_page_size(int(page_size)) normalized_platform = _normalize_optional_filter(platform) normalized_pool_kind = _normalize_optional_filter(pool_kind) normalized_status = _normalize_optional_filter(status) normalized_keyword = _normalize_optional_filter(keyword) normalized_wanted_only = _parse_optional_bool_query(wanted_only, field_name="wanted_only") normalized_sort_by, normalized_sort_dir = _normalize_playlist_sort(sort_by, sort_dir) playlist_page = _list_playlist_page_or_422( catalog_repo, page=page, page_size=normalized_page_size, platform=normalized_platform, pool_kind=normalized_pool_kind, status=normalized_status, keyword=normalized_keyword, wanted_only=normalized_wanted_only, sort_by=normalized_sort_by, sort_dir=normalized_sort_dir, ) filters = { "page": int(page), "page_size": normalized_page_size, "platform": normalized_platform or "", "pool_kind": normalized_pool_kind or "", "status": normalized_status or "", "keyword": normalized_keyword or "", "wanted_only": normalized_wanted_only, "sort_by": normalized_sort_by or "", "sort_dir": normalized_sort_dir or "", } return templates.TemplateResponse( name="ops/playlists.html", request=request, context={ "title": "Playlists", "playlist_page": playlist_page, "playlists": playlist_page["items"], "filters": filters, "sort_links": _build_playlist_sort_links(filters), "previous_page_url": ( "?" + _playlist_query_string(filters, overrides={"page": playlist_page["page"] - 1}) if playlist_page["page"] > 1 else None ), "next_page_url": ( "?" + _playlist_query_string(filters, overrides={"page": playlist_page["page"] + 1}) if playlist_page["page"] < playlist_page["total_pages"] else None ), "filter_options": _playlist_filter_options(catalog_repo), "playlist_sources": _playlist_source_stats(repo), "default_sources": ",".join( _normalize_allowed_list( env_manager.load_current().get("SOURCES") or "netease,qq,kuwo", ALLOWED_COLLECT_SOURCES, ) or ["netease", "qq", "kuwo"] ), }, ) @app.get("/songs", response_class=HTMLResponse) def songs_page(request: Request): rows = repo._fetchall( """ SELECT id, platform, remote_song_id, name, singers, updated_at FROM songs ORDER BY id DESC LIMIT 200 """ ) return templates.TemplateResponse( name="ops/songs.html", request=request, context={ "title": "Songs", "songs": [dict(row) for row in rows], "download_stats": _download_stats(repo), "workers": _dashboard_workers(repo), "running_items": _dashboard_running_items(repo), }, ) @app.get("/logs", response_class=HTMLResponse) def logs_page(request: Request): events = repo._fetchall( """ SELECT id, job_run_id, event_type, message, created_at FROM job_events ORDER BY id DESC LIMIT 200 """ ) return templates.TemplateResponse( name="ops/logs.html", request=request, context={"title": "Logs", "events": [dict(row) for row in events]}, ) @app.get("/config", response_class=HTMLResponse) def config_page(request: Request): return templates.TemplateResponse( name="ops/config.html", request=request, context={ "title": "Config", "env_content": _read_env_text(env_file), "env_values": env_manager.load_current(), "revisions": env_manager.list_revisions(limit=50), }, ) @app.get("/api/dashboard") def api_dashboard(include_task_rows: bool = Query(default=True)): return _dashboard_payload(repo, include_task_rows=include_task_rows) @app.get("/api/maintenance/local-duplicates") def api_local_duplicates(sample_limit: int = Query(default=20, ge=1, le=200)): return maintenance_service.scan_local_duplicates(sample_limit=sample_limit) @app.post("/api/maintenance/local-duplicates/dedupe") def api_local_duplicates_dedupe(sample_limit: int = Query(default=20, ge=1, le=200)): try: return maintenance_service.dedupe_local_duplicates(sample_limit=sample_limit) except LocalDedupeBlockedError as exc: raise HTTPException(status_code=409, detail=str(exc)) from exc @app.get("/api/jobs") def api_jobs(limit: int = Query(default=100, ge=1, le=500)): return {"items": _list_jobs(repo, limit=limit)} @app.get("/api/jobs/{job_id}") def api_job_detail(job_id: int): return _job_detail_payload(repo, job_id) @app.get("/api/jobs/{job_id}/playlists/{playlist_id}/songs") def api_job_playlist_songs( job_id: int, playlist_id: int, limit: int = Query(default=500, ge=1, le=2000), ): job = repo.get_job(job_id) if job is None: raise HTTPException(status_code=404, detail="job not found") return { "items": repo.list_job_playlist_song_progress( job_id=job_id, playlist_id=playlist_id, limit=limit, ) } @app.get("/api/playlists/{playlist_id}/songs") def api_playlist_songs( playlist_id: int, limit: int = Query(default=2000, ge=1, le=5000), ): playlist_rows = catalog_repo.list_playlists_by_ids([playlist_id]) if not playlist_rows: raise HTTPException(status_code=404, detail="playlist not found") playlist_row = dict(playlist_rows[0]) return { "playlist": { "id": int(playlist_row["id"]), "platform": str(playlist_row["platform"]), "remote_playlist_id": str(playlist_row["remote_playlist_id"]), "name": str(playlist_row["name"]), "play_count": ( int(playlist_row["play_count"]) if playlist_row["play_count"] is not None else None ), }, "items": catalog_repo.list_playlist_song_details(playlist_id, limit=limit), } @app.get("/api/playlists/{playlist_id}/export-folder") def api_playlist_export_folder(playlist_id: int): playlist_rows = catalog_repo.list_playlists_by_ids([playlist_id]) if not playlist_rows: raise HTTPException(status_code=404, detail="playlist not found") playlist_row = dict(playlist_rows[0]) playlist_payload = { "id": int(playlist_row["id"]), "platform": str(playlist_row["platform"]), "remote_playlist_id": str(playlist_row["remote_playlist_id"]), "name": str(playlist_row["name"]), "play_count": ( int(playlist_row["play_count"]) if playlist_row["play_count"] is not None else None ), } env_values = env_manager.load_current() playlists_root = _resolve_playlists_root(env_values, catalog_repo) if playlists_root is None: raise HTTPException(status_code=500, detail="playlists root is not configured") folder_path = CatalogSyncService( repository=catalog_repo, playlists_root=playlists_root, ).ensure_playlist_artifacts_for_playlist(playlist_id) if folder_path is None or not folder_path.exists(): folder_path = locate_playlist_dir(playlists_root, playlist_payload) if folder_path is None or not folder_path.exists(): raise HTTPException(status_code=404, detail="playlist export folder not found") return { "playlist": playlist_payload, "folder": { "path": str(folder_path.resolve()), "exists": True, "files": _collect_folder_listing(folder_path), }, } @app.get("/api/playlists/{playlist_id}/export.zip") def api_playlist_export_zip(playlist_id: int): playlist_rows = catalog_repo.list_playlist_export_state_rows([playlist_id]) if not playlist_rows: raise HTTPException(status_code=404, detail="playlist not found") playlist_row = dict(playlist_rows[0]) state_code = str(playlist_row.get("state_code") or "") if state_code != "downloaded": return JSONResponse( status_code=409, content={ "playlist_id": int(playlist_row["id"]), "state_code": state_code, "message": "playlist is not ready for zip export", }, ) env_values = env_manager.load_current() playlists_root = _resolve_playlists_root(env_values, catalog_repo) if playlists_root is None: raise HTTPException(status_code=500, detail="playlists root is not configured") export_service = CatalogSyncService(repository=catalog_repo, playlists_root=playlists_root) playlist_payload = { "id": int(playlist_row["id"]), "platform": str(playlist_row.get("platform") or ""), "remote_playlist_id": str(playlist_row.get("remote_playlist_id") or ""), "name": str(playlist_row.get("name") or ""), } folder_path = export_service.ensure_playlist_artifacts_for_playlist(int(playlist_row["id"])) if folder_path is None or not folder_path.exists(): folder_path = locate_playlist_dir(playlists_root, playlist_payload) if folder_path is None or not folder_path.exists(): raise HTTPException(status_code=404, detail="playlist export folder not found") bundle_path = create_single_playlist_bundle( playlist_dir=folder_path, bundle_root=default_bundle_root(), platform=str(playlist_row.get("platform") or ""), playlist_id=int(playlist_row["id"]), playlist_name=str(playlist_row.get("name") or ""), ) return FileResponse( path=str(bundle_path), media_type="application/zip", filename=bundle_download_filename(bundle_path), ) @app.get("/api/exports/bundles/{bundle_name}.zip") def api_export_bundle_download(bundle_name: str): bundle_path = resolve_bundle_download_path(default_bundle_root(), bundle_name) if bundle_path is None or not bundle_path.exists() or not bundle_path.is_file(): raise HTTPException(status_code=404, detail="bundle not found") return FileResponse( path=str(bundle_path), media_type="application/zip", filename=bundle_download_filename(bundle_path), ) @app.get("/api/playlists") def api_playlists( page: int = Query(default=1, ge=1), page_size: int = Query(default=50), platform: str | None = Query(default=None), pool_kind: str | None = Query(default=None), status: str | None = Query(default=None), keyword: str | None = Query(default=None), wanted_only: str | None = Query(default=None), sort_by: str | None = Query(default=None), sort_dir: str | None = Query(default=None), ): normalized_page_size = _validate_playlist_page_size(int(page_size)) normalized_platform = _normalize_optional_filter(platform) normalized_pool_kind = _normalize_optional_filter(pool_kind) normalized_status = _normalize_optional_filter(status) normalized_keyword = _normalize_optional_filter(keyword) normalized_wanted_only = _parse_optional_bool_query(wanted_only, field_name="wanted_only") normalized_sort_by, normalized_sort_dir = _normalize_playlist_sort(sort_by, sort_dir) return _list_playlist_page_or_422( catalog_repo, page=page, page_size=normalized_page_size, platform=normalized_platform, pool_kind=normalized_pool_kind, status=normalized_status, keyword=normalized_keyword, wanted_only=normalized_wanted_only, sort_by=normalized_sort_by, sort_dir=normalized_sort_dir, ) @app.post("/api/playlists/mark-wanted") def api_mark_playlists_wanted(payload: PlaylistBulkRequest): playlist_ids = _normalize_playlist_ids(payload.playlist_ids) if not playlist_ids: raise HTTPException(status_code=422, detail="playlist_ids is required") catalog_repo.mark_playlists_wanted(playlist_ids, marked_by=payload.marked_by or payload.requested_by) return {"playlist_ids": playlist_ids, "marked_count": len(playlist_ids)} @app.post("/api/playlists/unmark-wanted") def api_unmark_playlists_wanted(payload: PlaylistBulkRequest): playlist_ids = _normalize_playlist_ids(payload.playlist_ids) if not playlist_ids: raise HTTPException(status_code=422, detail="playlist_ids is required") catalog_repo.unmark_playlists_wanted(playlist_ids) return {"playlist_ids": playlist_ids, "unmarked_count": len(playlist_ids)} @app.post("/api/playlists/download", status_code=201) def api_download_selected_playlists(payload: PlaylistBulkRequest): return _build_adaptive_download_jobs( repo, catalog_repo, env_manager, payload=payload, ) @app.post("/api/playlists/sync", status_code=201) def api_sync_selected_playlists(payload: PlaylistBulkRequest): return _build_playlist_job(repo, env_manager, job_type="sync_only", payload=payload) @app.post("/api/playlists/sync-download", status_code=201) def api_sync_download_selected_playlists(payload: PlaylistBulkRequest): return _build_playlist_job(repo, env_manager, job_type="sync_download", payload=payload) @app.post("/api/playlists/export") def api_export_selected_playlists(payload: PlaylistBulkRequest): playlist_ids = _normalize_playlist_ids(payload.playlist_ids) if not playlist_ids: raise HTTPException(status_code=422, detail="playlist_ids is required") env_values = env_manager.load_current() playlists_root = _resolve_playlists_root(env_values, catalog_repo) if playlists_root is None: raise HTTPException(status_code=500, detail="playlists root is not configured") export_service = CatalogSyncService( repository=catalog_repo, playlists_root=playlists_root, ) rows_by_id = _playlist_export_rows_by_id_or_404(catalog_repo, playlist_ids) exported_playlist_ids: list[int] = [] download_playlist_ids: list[int] = [] sync_download_playlist_ids: list[int] = [] for playlist_id in playlist_ids: row = rows_by_id.get(int(playlist_id)) if row is None: continue state_code = str(row.get("state_code") or "") if state_code == "downloaded": folder_path = export_service.ensure_playlist_artifacts_for_playlist(int(playlist_id)) if folder_path is not None: exported_playlist_ids.append(int(playlist_id)) continue if state_code == "unsynced": sync_download_playlist_ids.append(int(playlist_id)) continue download_playlist_ids.append(int(playlist_id)) download_job = _create_scoped_playlist_job( repo, env_manager, job_type="download_only", playlist_ids=download_playlist_ids, requested_by=payload.requested_by, ) sync_download_job = _create_scoped_playlist_job( repo, env_manager, job_type="sync_download", playlist_ids=sync_download_playlist_ids, requested_by=payload.requested_by, ) return { "playlist_ids": playlist_ids, "exported_playlist_ids": exported_playlist_ids, "exported_count": len(exported_playlist_ids), "download_job": download_job, "sync_download_job": sync_download_job, } @app.post("/api/playlists/export-zip") def api_export_selected_playlists_zip(payload: PlaylistBulkRequest): playlist_ids = _normalize_playlist_ids(payload.playlist_ids) if not playlist_ids: raise HTTPException(status_code=422, detail="playlist_ids is required") rows_by_id = _playlist_export_rows_by_id_or_404(catalog_repo, playlist_ids) ready_playlist_ids: list[int] = [] blocked_playlist_ids: list[int] = [] download_playlist_ids: list[int] = [] sync_download_playlist_ids: list[int] = [] for playlist_id in playlist_ids: row = rows_by_id.get(int(playlist_id)) if row is None: continue state_code = str(row.get("state_code") or "") if state_code == "downloaded": ready_playlist_ids.append(int(playlist_id)) continue blocked_playlist_ids.append(int(playlist_id)) if state_code == "unsynced": sync_download_playlist_ids.append(int(playlist_id)) continue download_playlist_ids.append(int(playlist_id)) if blocked_playlist_ids: return { "status": "queued", "message": "selected playlists include non-ready items; export zip queued", "playlist_ids": playlist_ids, "ready_playlist_ids": ready_playlist_ids, "blocked_playlist_ids": blocked_playlist_ids, "download_job": _create_scoped_playlist_job( repo, env_manager, job_type="download_only", playlist_ids=download_playlist_ids, requested_by=payload.requested_by, ), "sync_download_job": _create_scoped_playlist_job( repo, env_manager, job_type="sync_download", playlist_ids=sync_download_playlist_ids, requested_by=payload.requested_by, ), } env_values = env_manager.load_current() playlists_root = _resolve_playlists_root(env_values, catalog_repo) if playlists_root is None: raise HTTPException(status_code=500, detail="playlists root is not configured") export_service = CatalogSyncService(repository=catalog_repo, playlists_root=playlists_root) playlist_dirs: list[Path] = [] for playlist_id in ready_playlist_ids: row = rows_by_id.get(int(playlist_id)) if row is None: continue playlist_payload = { "id": int(row["id"]), "platform": str(row.get("platform") or ""), "remote_playlist_id": str(row.get("remote_playlist_id") or ""), "name": str(row.get("name") or ""), } folder_path = export_service.ensure_playlist_artifacts_for_playlist(playlist_id) if folder_path is None or not folder_path.exists(): folder_path = locate_playlist_dir(playlists_root, playlist_payload) if folder_path is None or not folder_path.exists(): raise HTTPException(status_code=404, detail="playlist export folder not found") playlist_dirs.append(folder_path) bundle_path = create_multi_playlist_bundle( playlist_dirs=playlist_dirs, bundle_root=default_bundle_root(), ) return { "status": "ready", "playlist_ids": ready_playlist_ids, "download_url": f"/api/exports/bundles/{bundle_path.stem}.zip", } @app.post("/api/jobs", status_code=201) def api_create_job(payload: JobCreateRequest): snapshot = payload.config_snapshot or env_manager.build_job_snapshot() sources = _normalize_allowed_list(list(payload.sources), ALLOWED_COLLECT_SOURCES) or _normalize_allowed_list( snapshot.get("sources") or snapshot.get("SOURCES"), ALLOWED_COLLECT_SOURCES, ) download_sources = _normalize_allowed_list( list(payload.download_sources), ALLOWED_DOWNLOAD_SOURCES, ) or _normalize_allowed_list( snapshot.get("download_sources") or snapshot.get("DOWNLOAD_SOURCES"), ALLOWED_DOWNLOAD_SOURCES, ) job_id = repo.create_job( job_type=payload.job_type, requested_by=payload.requested_by, config_snapshot=dict(snapshot), sources=sources, download_sources=download_sources, playlist_scope=payload.playlist_scope or {}, ) _ensure_job_stages(repo, job_id, payload.job_type) job = repo.get_job(job_id) if job is None: raise HTTPException(status_code=500, detail="job create failed") return {"job": _serialize_job(job)} @app.post("/api/jobs/{job_id}/commands", status_code=201) def api_create_job_command(job_id: int, payload: JobCommandRequest): job = repo.get_job(job_id) if job is None: raise HTTPException(status_code=404, detail="job not found") command_type = payload.command_type if command_type in RETRY_COMMAND_TYPES and payload.target_item_id is None: raise HTTPException( status_code=422, detail="target_item_id is required for retry commands", ) command_id = repo.create_command( job_run_id=job_id, command_type=command_type, target_item_id=payload.target_item_id, payload=payload.payload, ) return {"job_id": int(job_id), "command_id": int(command_id), "command_type": command_type} @app.get("/api/config/env") def api_get_env(): return { "path": str(env_file.resolve()), "content": _read_env_text(env_file), "parsed": env_manager.load_current(), } @app.put("/api/config/env") def api_put_env(payload: EnvUpdateRequest): env_file.parent.mkdir(parents=True, exist_ok=True) env_file.write_text(payload.content, encoding="utf-8") revision_id = env_manager.save_revision(note=payload.note) revision = repo.get_config_revision(revision_id) return { "saved": True, "revision": revision, "content": _read_env_text(env_file), "parsed": env_manager.load_current(), } @app.get("/api/config/revisions") def api_get_revisions(limit: int = Query(default=50, ge=1, le=500)): return {"items": env_manager.list_revisions(limit=limit)} @app.post("/api/config/revisions/{revision_id}/apply") def api_apply_revision(revision_id: int): try: env_manager.apply_revision(revision_id) except ValueError: raise HTTPException(status_code=404, detail="revision not found") from None revision = repo.get_config_revision(revision_id) return { "applied": True, "revision": revision, "content": _read_env_text(env_file), "parsed": env_manager.load_current(), } @app.get("/api/events/stream") def api_events_stream(once: bool = Query(default=False)): def event_stream(): while True: snapshot = _dashboard_payload(repo) payload = json.dumps(snapshot, ensure_ascii=False) yield f"event: snapshot\ndata: {payload}\n\n" if once: break yield ": heartbeat\n\n" time.sleep(LIVE_DASHBOARD_SNAPSHOT_INTERVAL_SECONDS) return StreamingResponse( event_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) return app