Files

1608 lines
61 KiB
Python

from __future__ import annotations
import json
import logging
import threading
import time
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any, Literal
from urllib.parse import urlencode
from fastapi import FastAPI, HTTPException, Query, Request
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel, Field
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.export_bundles import (
bundle_download_filename,
create_multi_playlist_bundle,
create_single_playlist_bundle,
default_bundle_root,
resolve_bundle_download_path,
)
from musicdl.catalogsync.playlist_artifacts import locate_playlist_dir
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.catalogsync.resolver_stats import (
default_resolver_stats_db_path,
initialize_resolver_stats_database,
)
from musicdl.catalogsync.services import CatalogSyncService
from .config import CatalogsyncEnvManager
from .jobdefs import job_lane_type
from .maintenance import LocalDedupeBlockedError, LocalMaintenanceService
from .repository import OpsRepository
from .runner import JOB_STAGE_SEQUENCES, OpsRunner
JobType = Literal[
"catalog_sync",
"collect_only",
"sync_only",
"sync_download",
"download_only",
"upload_only",
"download_upload",
]
CommandType = Literal["pause", "resume", "cancel", "retry_item", "force_retry_item"]
ALLOWED_COLLECT_SOURCES = {"netease", "qq", "kuwo"}
ALLOWED_DOWNLOAD_SOURCES = {"qq", "kuwo", "migu", "qianqian", "kugou", "netease"}
RETRY_COMMAND_TYPES = {"retry_item", "force_retry_item"}
LOGGER = logging.getLogger(__name__)
DOWNLOAD_JOB_TYPES = tuple(
job_type for job_type, stages in JOB_STAGE_SEQUENCES.items() if "download" in stages
)
LIVE_DASHBOARD_SNAPSHOT_INTERVAL_SECONDS = 0.5
PLAYLIST_SORT_FIELDS = {"id", "platform", "name", "play_count"}
PLAYLIST_SORT_DIRECTIONS = {"asc", "desc"}
PLAYLIST_SORT_DEFAULTS = {
"id": "desc",
"platform": "asc",
"name": "asc",
"play_count": "desc",
}
def _format_speed_text(speed_bytes_per_sec: int | None) -> str:
speed_value = int(speed_bytes_per_sec or 0)
if speed_value <= 0:
return "0 B/s"
if speed_value >= 1024 * 1024:
return f"{speed_value / (1024 * 1024):.1f} MB/s"
if speed_value >= 1024:
return f"{speed_value / 1024:.1f} KB/s"
return f"{speed_value} B/s"
def _normalize_allowed_list(value: Any, allowed: set[str]) -> list[str]:
if not value:
return []
parts: list[str] = []
if isinstance(value, str):
parts = [part.strip() for part in value.split(",") if part and part.strip()]
elif isinstance(value, list):
parts = [str(item).strip() for item in value if str(item).strip()]
else:
return []
normalized: list[str] = []
seen: set[str] = set()
for item in parts:
if item in allowed and item not in seen:
normalized.append(item)
seen.add(item)
return normalized
def _read_env_text(env_path: Path) -> str:
if not env_path.exists():
return ""
return env_path.read_text(encoding="utf-8")
def _resolve_playlists_root(env_values: dict[str, str], catalog_repo: CatalogRepository) -> Path | None:
root_dir = str(env_values.get("ROOT_DIR") or "").strip()
if root_dir:
path = Path(root_dir).resolve() / "playlists"
path.mkdir(parents=True, exist_ok=True)
return path
library_dir = str(env_values.get("LIBRARY_DIR") or "").strip()
if library_dir:
path = Path(library_dir).resolve().parent / "playlists"
path.mkdir(parents=True, exist_ok=True)
return path
library_root = catalog_repo.get_default_local_library_root()
if library_root is None:
return None
path = library_root.parent / "playlists"
path.mkdir(parents=True, exist_ok=True)
return path
def _collect_folder_listing(folder_path: Path) -> list[dict[str, Any]]:
if not folder_path.exists():
return []
items: list[dict[str, Any]] = []
for child in sorted(folder_path.rglob("*")):
if not child.is_file():
continue
try:
stat = child.stat()
size_bytes = int(stat.st_size)
except OSError:
size_bytes = 0
items.append(
{
"relative_path": child.relative_to(folder_path).as_posix(),
"absolute_path": str(child.resolve()),
"size_bytes": size_bytes,
}
)
return items
def _serialize_job(job: Any) -> dict[str, Any]:
return {
"id": int(job.id),
"job_type": str(job.job_type),
"status": getattr(job.status, "value", str(job.status)),
"priority": int(job.priority),
"requested_by": job.requested_by,
"config_snapshot": dict(job.config_snapshot or {}),
"sources": list(job.sources or []),
"download_sources": list(job.download_sources or []),
"playlist_scope": dict(job.playlist_scope or {}),
"created_at": job.created_at,
"started_at": job.started_at,
"ended_at": job.ended_at,
"last_error": job.last_error,
"resume_token": job.resume_token,
}
def _display_from_payload(payload: dict[str, Any]) -> str | None:
if not isinstance(payload, dict):
return None
for key in ("row", "playlist_row", "upload_row"):
row = payload.get(key)
if isinstance(row, dict):
name = str(row.get("name") or row.get("song_name") or "").strip()
if name:
return name
return str(payload.get("display_name") or payload.get("name") or "").strip() or None
def _list_jobs(repo: OpsRepository, limit: int = 100) -> list[dict[str, Any]]:
rows = repo._fetchall("SELECT * FROM job_runs ORDER BY id DESC LIMIT ?", (int(limit),))
return [_serialize_job(repo._row_to_job(row)) for row in rows]
def _job_stages(repo: OpsRepository, job_id: int) -> list[dict[str, Any]]:
return [
{
"id": int(stage.id),
"job_run_id": int(stage.job_run_id),
"stage_type": str(stage.stage_type),
"status": stage.status.value,
"seq_no": int(stage.seq_no),
"total_items": int(stage.total_items),
"pending_items": int(stage.pending_items),
"running_items": int(stage.running_items),
"success_items": int(stage.success_items),
"failed_items": int(stage.failed_items),
"skipped_items": int(stage.skipped_items),
"started_at": stage.started_at,
"ended_at": stage.ended_at,
"last_error": stage.last_error,
}
for stage in repo.list_job_stages(job_id)
]
def _job_commands(repo: OpsRepository, job_id: int) -> list[dict[str, Any]]:
rows = repo._fetchall(
"""
SELECT *
FROM job_commands
WHERE job_run_id = ?
ORDER BY id DESC
LIMIT 100
""",
(int(job_id),),
)
return [dict(row) for row in rows]
def _queued_download_job_count(repo: OpsRepository) -> int:
if not DOWNLOAD_JOB_TYPES:
return 0
placeholders = ", ".join("?" for _ in DOWNLOAD_JOB_TYPES)
row = repo._fetchone(
f"""
SELECT COUNT(*) AS count_value
FROM job_runs
WHERE status = ? AND job_type IN ({placeholders})
""",
("queued", *DOWNLOAD_JOB_TYPES),
)
return int(row["count_value"]) if row else 0
def _dashboard_summary(
repo: OpsRepository,
*,
task_rows: list[dict[str, Any]] | None = None,
) -> dict[str, int]:
rows = repo._fetchall(
"""
SELECT status, COUNT(*) AS count_value
FROM job_runs
GROUP BY status
"""
)
counts = {str(row["status"]): int(row["count_value"]) for row in rows}
if task_rows is None:
queued_download_jobs = _queued_download_job_count(repo)
else:
queued_download_jobs = sum(
1
for row in task_rows
if str(row.get("status")) == "queued" and str(row.get("lane_type")) == "download"
)
return {
"total_jobs": int(sum(counts.values())),
"queued_jobs": int(counts.get("queued", 0)),
"queued_download_jobs": int(queued_download_jobs),
"running_jobs": int(counts.get("running", 0)),
"paused_jobs": int(counts.get("paused", 0) + counts.get("pause_requested", 0)),
"failed_jobs": int(counts.get("failed", 0) + counts.get("completed_with_errors", 0)),
}
def _serialize_worker_row(row: dict[str, Any]) -> dict[str, Any]:
payload_text = str(row.get("current_display_text") or "").strip()
fallback_text = str(row.get("song_name") or row.get("playlist_name") or row.get("item_key") or "").strip()
speed_bytes_per_sec = int(row.get("speed_bytes_per_sec") or 0)
return {
"id": int(row["id"]),
"job_run_id": row["job_run_id"],
"job_stage_id": row["job_stage_id"],
"worker_name": row["worker_name"],
"status": row["status"],
"current_job_item_id": row["current_job_item_id"],
"current_song_id": row["current_song_id"],
"current_playlist_id": row["current_playlist_id"],
"display_text": payload_text or fallback_text,
"stage_type": row.get("stage_type"),
"item_type": row.get("item_type"),
"last_progress_text": row.get("last_progress_text"),
"downloaded_bytes": int(row.get("downloaded_bytes") or 0),
"total_bytes": int(row.get("total_bytes") or 0),
"speed_bytes_per_sec": speed_bytes_per_sec,
"speed_text": _format_speed_text(speed_bytes_per_sec),
"processed_count": int(row["processed_count"] or 0),
"error_count": int(row["error_count"] or 0),
"heartbeat_at": row.get("heartbeat_at"),
}
def _playlist_source_stats(repo: OpsRepository) -> list[dict[str, Any]]:
rows = repo._fetchall(
"""
SELECT
pp.platform,
pp.pool_kind,
pp.name AS pool_name,
COUNT(DISTINCT pl.id) AS playlist_count
FROM playlist_pools AS pp
LEFT JOIN pool_playlists AS rel ON rel.pool_id = pp.id
LEFT JOIN playlists AS pl ON pl.id = rel.playlist_id
GROUP BY pp.id, pp.platform, pp.pool_kind, pp.name
ORDER BY pp.platform ASC, pp.pool_kind ASC, pp.id ASC
"""
)
return [
{
"platform": str(row["platform"]),
"pool_kind": str(row["pool_kind"]),
"pool_name": str(row["pool_name"]),
"playlist_count": int(row["playlist_count"] or 0),
}
for row in rows
]
def _playlist_filter_options(catalog_repo: CatalogRepository) -> dict[str, list[str]]:
platform_rows = catalog_repo._fetchall(
"SELECT DISTINCT platform FROM playlists WHERE platform IS NOT NULL AND TRIM(platform) != '' ORDER BY platform ASC"
)
pool_kind_rows = catalog_repo._fetchall(
"SELECT DISTINCT pool_kind FROM playlist_pools WHERE pool_kind IS NOT NULL AND TRIM(pool_kind) != '' ORDER BY pool_kind ASC"
)
return {
"platforms": [str(row["platform"]) for row in platform_rows],
"pool_kinds": [str(row["pool_kind"]) for row in pool_kind_rows],
}
def _normalize_playlist_ids(values: list[Any]) -> list[int]:
normalized: list[int] = []
seen: set[int] = set()
for raw in values:
try:
playlist_id = int(raw)
except (TypeError, ValueError):
continue
if playlist_id <= 0 or playlist_id in seen:
continue
normalized.append(playlist_id)
seen.add(playlist_id)
return normalized
def _validate_playlist_page_size(page_size: int) -> int:
if page_size not in {20, 50, 100}:
raise HTTPException(status_code=422, detail="page_size must be one of 20, 50, 100")
return page_size
def _normalize_optional_filter(value: str | None) -> str | None:
if value is None:
return None
normalized = value.strip()
return normalized or None
def _normalize_playlist_sort(
sort_by: str | None,
sort_dir: str | None,
) -> tuple[str | None, str | None]:
normalized_sort_by = _normalize_optional_filter(sort_by)
normalized_sort_dir = _normalize_optional_filter(sort_dir)
if normalized_sort_by is None:
return None, None
if normalized_sort_by not in PLAYLIST_SORT_FIELDS:
raise HTTPException(status_code=422, detail="sort_by must be one of id, platform, name, play_count")
if normalized_sort_dir is None:
normalized_sort_dir = PLAYLIST_SORT_DEFAULTS[normalized_sort_by]
normalized_sort_dir = normalized_sort_dir.lower()
if normalized_sort_dir not in PLAYLIST_SORT_DIRECTIONS:
raise HTTPException(status_code=422, detail="sort_dir must be one of asc, desc")
return normalized_sort_by, normalized_sort_dir
def _playlist_query_string(
filters: dict[str, Any],
*,
overrides: dict[str, Any] | None = None,
) -> str:
payload = dict(filters or {})
payload.update(overrides or {})
params: list[tuple[str, str]] = [
("page", str(int(payload.get("page") or 1))),
("page_size", str(int(payload.get("page_size") or 50))),
]
for key in ("keyword", "platform", "pool_kind", "status", "sort_by", "sort_dir"):
value = payload.get(key)
if value in (None, ""):
continue
params.append((key, str(value)))
if payload.get("wanted_only"):
params.append(("wanted_only", "1"))
return urlencode(params)
def _build_playlist_sort_links(filters: dict[str, Any]) -> dict[str, dict[str, str]]:
links: dict[str, dict[str, str]] = {}
current_sort_by = str(filters.get("sort_by") or "")
current_sort_dir = str(filters.get("sort_dir") or "")
for field in PLAYLIST_SORT_FIELDS:
is_current = current_sort_by == field
if is_current:
next_dir = "desc" if current_sort_dir == "asc" else "asc"
indicator = "^" if current_sort_dir == "asc" else "v"
else:
next_dir = PLAYLIST_SORT_DEFAULTS[field]
indicator = ""
links[field] = {
"href": "?" + _playlist_query_string(
filters,
overrides={"page": 1, "sort_by": field, "sort_dir": next_dir},
),
"indicator": indicator,
}
return links
def _parse_optional_bool_query(value: str | bool | None, *, field_name: str) -> bool:
if value is None:
return False
if isinstance(value, bool):
return value
normalized = str(value).strip().lower()
if not normalized:
return False
if normalized in {"1", "true", "t", "yes", "y", "on"}:
return True
if normalized in {"0", "false", "f", "no", "n", "off"}:
return False
raise HTTPException(status_code=422, detail=f"{field_name} must be a valid boolean value")
def _list_playlist_page_or_422(
catalog_repo: CatalogRepository,
*,
page: int,
page_size: int,
platform: str | None,
pool_kind: str | None,
status: str | None,
keyword: str | None,
wanted_only: bool,
sort_by: str | None,
sort_dir: str | None,
) -> dict[str, Any]:
try:
return catalog_repo.list_playlist_page(
page=page,
page_size=page_size,
platform=platform,
pool_kind=pool_kind,
status=status,
keyword=keyword,
wanted_only=wanted_only,
sort_by=sort_by,
sort_dir=sort_dir,
)
except ValueError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
def _job_workers(repo: OpsRepository, job_id: int, *, active_only: bool = False) -> list[dict[str, Any]]:
return [
_serialize_worker_row(row)
for row in repo.list_job_workers(job_id, active_only=active_only, limit=50)
]
def _job_running_items(repo: OpsRepository, job_id: int) -> list[dict[str, Any]]:
workers = {
int(worker["current_job_item_id"]): worker
for worker in _job_workers(repo, job_id, active_only=False)
if worker.get("current_job_item_id") is not None
}
items = repo.list_job_items_with_context(job_id, statuses=["running"], limit=100)
serialized: list[dict[str, Any]] = []
for item in items:
payload = item.get("payload") or {}
worker = workers.get(int(item["id"]))
serialized.append(
{
"id": int(item["id"]),
"stage_type": item.get("stage_type"),
"item_type": item.get("item_type"),
"status": item.get("status"),
"song_id": item.get("song_id"),
"playlist_id": item.get("playlist_id"),
"worker_name": worker.get("worker_name") if worker else None,
"display_name": (
(worker.get("display_text") if worker else None)
or item.get("song_name")
or item.get("playlist_name")
or _display_from_payload(payload)
or str(item.get("item_key") or "")
),
"started_at": item.get("started_at"),
}
)
return serialized
def _dashboard_workers(repo: OpsRepository) -> list[dict[str, Any]]:
rows = repo._fetchall(
"""
SELECT
jw.*,
js.stage_type,
ji.item_type,
ji.item_key,
s.name AS song_name,
p.name AS playlist_name
FROM job_workers AS jw
JOIN job_items AS ji
ON ji.id = jw.current_job_item_id
AND ji.status = 'running'
JOIN job_stages AS js ON js.id = ji.job_stage_id
JOIN job_runs AS jr
ON jr.id = js.job_run_id
AND jr.status IN ('running', 'pause_requested')
LEFT JOIN songs AS s ON s.id = COALESCE(jw.current_song_id, ji.song_id)
LEFT JOIN playlists AS p ON p.id = COALESCE(jw.current_playlist_id, ji.playlist_id)
ORDER BY
CASE
WHEN jw.status = 'running' THEN 0
WHEN jw.status = 'paused' THEN 1
ELSE 2
END,
COALESCE(jw.heartbeat_at, '') DESC,
jw.id DESC
LIMIT 50
"""
)
return [_serialize_worker_row(dict(row)) for row in rows]
def _dashboard_running_items(repo: OpsRepository) -> list[dict[str, Any]]:
rows = repo._fetchall(
"""
SELECT
i.*,
s.stage_type,
s.job_run_id,
w.worker_name,
w.current_display_text,
p.name AS playlist_name,
sg.name AS song_name
FROM job_items AS i
JOIN job_stages AS s ON s.id = i.job_stage_id
LEFT JOIN job_workers AS w ON w.current_job_item_id = i.id
LEFT JOIN playlists AS p ON p.id = i.playlist_id
LEFT JOIN songs AS sg ON sg.id = i.song_id
WHERE i.status = 'running'
ORDER BY i.id DESC
LIMIT 100
"""
)
items: list[dict[str, Any]] = []
for row in rows:
payload = json.loads(row["payload_json"] or "{}")
items.append(
{
"id": int(row["id"]),
"job_run_id": row["job_run_id"],
"stage_type": row["stage_type"],
"item_type": row["item_type"],
"status": row["status"],
"song_id": row["song_id"],
"playlist_id": row["playlist_id"],
"worker_name": row["worker_name"],
"display_name": (
str(row["current_display_text"] or "").strip()
or str(row["song_name"] or "").strip()
or str(row["playlist_name"] or "").strip()
or _display_from_payload(payload)
or str(row["item_key"])
),
"started_at": row["started_at"],
}
)
return items
def _download_stats(repo: OpsRepository) -> dict[str, int]:
stats = repo.get_download_stats()
playlist_row = repo._fetchone("SELECT COUNT(*) AS count_value FROM playlists")
pool_row = repo._fetchone("SELECT COUNT(*) AS count_value FROM playlist_pools")
running_song_row = repo._fetchone(
"""
SELECT COUNT(*) AS count_value
FROM job_items
WHERE status = 'running' AND song_id IS NOT NULL
"""
)
stats["total_playlists"] = int(playlist_row["count_value"]) if playlist_row else 0
stats["total_playlist_pools"] = int(pool_row["count_value"]) if pool_row else 0
stats["running_song_items"] = int(running_song_row["count_value"]) if running_song_row else 0
return stats
def _dashboard_transfer_stats(workers: list[dict[str, Any]]) -> dict[str, Any]:
download_speed_bytes_per_sec = sum(
int(worker.get("speed_bytes_per_sec") or 0)
for worker in workers
if str(worker.get("stage_type") or "").lower() == "download"
)
return {
"download_speed_bytes_per_sec": int(download_speed_bytes_per_sec),
"download_speed_text": _format_speed_text(download_speed_bytes_per_sec),
"upload_speed_bytes_per_sec": 0,
"upload_speed_text": "-",
}
def _dashboard_payload(repo: OpsRepository, *, include_task_rows: bool = False) -> dict[str, Any]:
active_job = repo.get_active_job()
task_rows = repo.list_task_center_rows(limit=50) if include_task_rows else None
workers = _dashboard_workers(repo)
payload = {
"summary": _dashboard_summary(repo, task_rows=task_rows),
"download_stats": _download_stats(repo),
"transfer_stats": _dashboard_transfer_stats(workers),
"workers": workers,
"running_items": _dashboard_running_items(repo),
"playlist_sources": _playlist_source_stats(repo),
"active_job": _serialize_job(active_job) if active_job is not None else None,
"jobs": _list_jobs(repo, limit=10),
}
if task_rows is not None:
payload["task_rows"] = task_rows
recent_events = repo._fetchall(
"""
SELECT id, job_run_id, event_type, message, created_at
FROM job_events
ORDER BY id DESC
LIMIT 5
"""
)
payload["recent_events"] = [dict(row) for row in recent_events]
return payload
def _split_dashboard_task_rows(task_rows: list[dict[str, Any]]) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
done_statuses = {"completed", "completed_with_errors", "failed", "canceled"}
doing_rows: list[dict[str, Any]] = []
done_rows: list[dict[str, Any]] = []
for row in task_rows or []:
if str(row.get("status") or "").lower() in done_statuses:
done_rows.append(row)
else:
doing_rows.append(row)
done_rows.sort(
key=lambda row: (
str(row.get("ended_at") or row.get("started_at") or row.get("created_at") or ""),
int(row.get("id") or 0),
),
reverse=True,
)
done_rows = done_rows[:10]
return doing_rows, done_rows
def _job_detail_payload(repo: OpsRepository, job_id: int) -> dict[str, Any]:
job = repo.get_job(job_id)
if job is None:
raise HTTPException(status_code=404, detail="job not found")
return {
"job": _serialize_job(job),
"stages": _job_stages(repo, job_id),
"commands": _job_commands(repo, job_id),
"workers": _job_workers(repo, job_id, active_only=False),
"running_items": _job_running_items(repo, job_id),
"download_stats": _download_stats(repo),
"playlist_progress": repo.list_job_playlist_progress(job_id),
}
def _ensure_job_stages(repo: OpsRepository, job_id: int, job_type: str) -> None:
if repo.list_job_stages(job_id):
return
for seq_no, stage_type in enumerate(JOB_STAGE_SEQUENCES.get(job_type, []), start=1):
repo.create_stage(job_run_id=job_id, stage_type=stage_type, seq_no=seq_no)
class JobCreateRequest(BaseModel):
job_type: JobType = Field(default="catalog_sync")
requested_by: str | None = None
sources: list[Literal["netease", "qq", "kuwo"]] = Field(default_factory=list)
download_sources: list[Literal["qq", "kuwo", "migu", "qianqian", "kugou", "netease"]] = Field(
default_factory=list
)
playlist_scope: dict[str, Any] = Field(default_factory=dict)
config_snapshot: dict[str, Any] | None = None
class JobCommandRequest(BaseModel):
command_type: CommandType
target_item_id: int | None = Field(default=None, ge=1)
payload: dict[str, Any] = Field(default_factory=dict)
class EnvUpdateRequest(BaseModel):
content: str = ""
note: str | None = None
class PlaylistBulkRequest(BaseModel):
playlist_ids: list[int] = Field(default_factory=list)
requested_by: str | None = None
marked_by: str | None = None
def _create_scoped_playlist_job(
repo: OpsRepository,
env_manager: CatalogsyncEnvManager,
*,
job_type: Literal["download_only", "sync_download", "sync_only"],
playlist_ids: list[int],
requested_by: str | None = None,
):
normalized_ids = _normalize_playlist_ids(playlist_ids)
if not normalized_ids:
return None
snapshot = env_manager.build_job_snapshot()
sources = _normalize_allowed_list(
snapshot.get("sources") or snapshot.get("SOURCES"),
ALLOWED_COLLECT_SOURCES,
)
download_sources = _normalize_allowed_list(
snapshot.get("download_sources") or snapshot.get("DOWNLOAD_SOURCES"),
ALLOWED_DOWNLOAD_SOURCES,
)
job_id = repo.create_job(
job_type=job_type,
requested_by=requested_by,
config_snapshot=dict(snapshot),
sources=sources,
download_sources=download_sources,
playlist_scope={"playlist_ids": normalized_ids},
)
_ensure_job_stages(repo, job_id, job_type)
job = repo.get_job(job_id)
if job is None:
raise HTTPException(status_code=500, detail="job create failed")
return _serialize_job(job)
def _build_playlist_job(
repo: OpsRepository,
env_manager: CatalogsyncEnvManager,
*,
job_type: Literal["download_only", "sync_download", "sync_only"],
payload: PlaylistBulkRequest,
) -> dict[str, Any]:
playlist_ids = _normalize_playlist_ids(payload.playlist_ids)
if not playlist_ids:
raise HTTPException(status_code=422, detail="playlist_ids is required")
job = _create_scoped_playlist_job(
repo,
env_manager,
job_type=job_type,
playlist_ids=playlist_ids,
requested_by=payload.requested_by,
)
if job is None:
raise HTTPException(status_code=500, detail="job create failed")
return {"job": job}
def _build_adaptive_download_jobs(
repo: OpsRepository,
catalog_repo: CatalogRepository,
env_manager: CatalogsyncEnvManager,
*,
payload: PlaylistBulkRequest,
) -> dict[str, Any]:
playlist_ids = _normalize_playlist_ids(payload.playlist_ids)
if not playlist_ids:
raise HTTPException(status_code=422, detail="playlist_ids is required")
rows_by_id = _playlist_export_rows_by_id_or_404(catalog_repo, playlist_ids)
download_playlist_ids: list[int] = []
sync_download_playlist_ids: list[int] = []
skipped_downloaded_playlist_ids: list[int] = []
for playlist_id in playlist_ids:
row = rows_by_id.get(int(playlist_id))
if row is None:
continue
state_code = str(row.get("state_code") or "")
if state_code == "unsynced":
sync_download_playlist_ids.append(int(playlist_id))
continue
if state_code == "downloaded":
skipped_downloaded_playlist_ids.append(int(playlist_id))
continue
download_playlist_ids.append(int(playlist_id))
download_job = _create_scoped_playlist_job(
repo,
env_manager,
job_type="download_only",
playlist_ids=download_playlist_ids,
requested_by=payload.requested_by,
)
sync_download_job = _create_scoped_playlist_job(
repo,
env_manager,
job_type="sync_download",
playlist_ids=sync_download_playlist_ids,
requested_by=payload.requested_by,
)
jobs = [job for job in (download_job, sync_download_job) if job is not None]
response: dict[str, Any] = {
"playlist_ids": playlist_ids,
"download_playlist_ids": download_playlist_ids,
"sync_download_playlist_ids": sync_download_playlist_ids,
"skipped_downloaded_playlist_ids": skipped_downloaded_playlist_ids,
"download_job": download_job,
"sync_download_job": sync_download_job,
"job": jobs[0] if len(jobs) == 1 else None,
}
if not jobs:
response["message"] = "no runnable playlist download jobs were created"
return response
def _playlist_export_rows_by_id_or_404(
catalog_repo: CatalogRepository,
playlist_ids: list[int],
) -> dict[int, dict[str, Any]]:
playlist_rows = catalog_repo.list_playlist_export_state_rows(playlist_ids)
if not playlist_rows:
raise HTTPException(status_code=404, detail="playlist not found")
rows_by_id = {int(row["id"]): dict(row) for row in playlist_rows}
missing_playlist_ids = [
int(playlist_id) for playlist_id in playlist_ids if int(playlist_id) not in rows_by_id
]
if missing_playlist_ids:
raise HTTPException(
status_code=404,
detail={
"message": "playlist not found",
"missing_playlist_ids": missing_playlist_ids,
},
)
return rows_by_id
def create_app(
db_path: str | Path,
env_path: str | Path,
*,
start_runner: bool = False,
runner_sleep_seconds: float = 1.0,
) -> FastAPI:
db_file = Path(db_path)
env_file = Path(env_path)
initialize_database(db_file).close()
initialize_resolver_stats_database(default_resolver_stats_db_path(db_file)).close()
repo = OpsRepository(db_file)
catalog_repo = CatalogRepository(db_file)
maintenance_service = LocalMaintenanceService(db_file)
env_manager = CatalogsyncEnvManager(
db_path=db_file,
env_file_path=env_file,
repository=repo,
)
templates_dir = Path(__file__).resolve().parents[1] / "templates"
static_dir = Path(__file__).resolve().parents[1] / "static"
templates = Jinja2Templates(directory=str(templates_dir))
runner_stop_event: threading.Event | None = None
runner_thread: threading.Thread | None = None
if start_runner:
runner = OpsRunner(repository=repo, sleep_seconds=runner_sleep_seconds)
runner_restart_delay = max(min(float(runner_sleep_seconds), 1.0), 0.1)
def _run_runner_with_supervision(stop_event: threading.Event) -> None:
while not stop_event.is_set():
try:
LOGGER.info("Starting embedded ops runner")
runner.run_forever(stop_event)
LOGGER.info("Embedded ops runner stopped")
return
except Exception:
LOGGER.exception(
"Embedded ops runner crashed; restarting in %.2f seconds",
runner_restart_delay,
)
if stop_event.wait(runner_restart_delay):
return
@asynccontextmanager
async def lifespan(_: FastAPI):
nonlocal runner_stop_event, runner_thread
if start_runner:
if runner_thread is None or not runner_thread.is_alive():
runner_stop_event = threading.Event()
runner_thread = threading.Thread(
target=_run_runner_with_supervision,
args=(runner_stop_event,),
daemon=True,
name="catalogsync-ops-runner",
)
runner_thread.start()
try:
yield
finally:
if runner_stop_event is not None:
runner_stop_event.set()
if runner_thread is not None:
runner_thread.join(timeout=5.0)
app = FastAPI(title="Catalogsync Operations Console", lifespan=lifespan)
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
@app.get("/", include_in_schema=False)
def index_redirect():
return RedirectResponse(url="/dashboard", status_code=307)
@app.get("/dashboard", response_class=HTMLResponse)
def dashboard(request: Request):
dashboard_payload = _dashboard_payload(repo, include_task_rows=True)
task_rows = dashboard_payload.get("task_rows", [])
doing_task_rows, done_task_rows = _split_dashboard_task_rows(task_rows)
env_values = env_manager.load_current()
return templates.TemplateResponse(
name="ops/dashboard.html",
request=request,
context={
"title": "Dashboard",
"sse_url": "/api/events/stream",
"dashboard_api_url": "/api/dashboard",
"summary": dashboard_payload["summary"],
"download_stats": dashboard_payload["download_stats"],
"transfer_stats": dashboard_payload["transfer_stats"],
"task_rows": task_rows,
"doing_task_rows": doing_task_rows,
"done_task_rows": done_task_rows,
"workers": dashboard_payload["workers"],
"running_items": dashboard_payload["running_items"],
"playlist_sources": dashboard_payload["playlist_sources"],
"active_job": dashboard_payload["active_job"],
"jobs": dashboard_payload["jobs"],
"maintenance_local_duplicates_scan_api": "/api/maintenance/local-duplicates",
"maintenance_local_duplicates_dedupe_api": "/api/maintenance/local-duplicates/dedupe",
"job_type_options": [
("catalog_sync", "Collect + Sync + Download"),
("collect_only", "Collect Only"),
("sync_only", "Sync Only"),
("sync_download", "Sync + Download"),
("download_only", "Download Only"),
("upload_only", "Upload Only"),
("download_upload", "Download + Upload"),
],
"default_sources": ",".join(
_normalize_allowed_list(
env_values.get("SOURCES") or "netease,qq,kuwo",
ALLOWED_COLLECT_SOURCES,
)
or ["netease", "qq", "kuwo"]
),
"default_download_sources": ",".join(
_normalize_allowed_list(
env_values.get("DOWNLOAD_SOURCES") or env_values.get("download_sources"),
ALLOWED_DOWNLOAD_SOURCES,
)
or ["qq", "kuwo", "migu", "qianqian", "kugou", "netease"]
),
},
)
@app.get("/jobs", response_class=HTMLResponse)
def jobs_page(request: Request):
return templates.TemplateResponse(
name="ops/jobs.html",
request=request,
context={"title": "Jobs", "jobs": _list_jobs(repo, limit=200)},
)
@app.get("/jobs/{job_id}", response_class=HTMLResponse)
def job_detail_page(request: Request, job_id: int):
payload = _job_detail_payload(repo, job_id)
return templates.TemplateResponse(
name="ops/job_detail.html",
request=request,
context={
"title": f"Job {job_id}",
"job": payload["job"],
"stages": payload["stages"],
"commands": payload["commands"],
"workers": payload["workers"],
"running_items": payload["running_items"],
"download_stats": payload["download_stats"],
"playlist_progress": payload["playlist_progress"],
"command_endpoint": f"/api/jobs/{job_id}/commands",
},
)
@app.get("/playlists", response_class=HTMLResponse)
def playlists_page(
request: Request,
page: int = Query(default=1, ge=1),
page_size: int = Query(default=50),
platform: str | None = Query(default=None),
pool_kind: str | None = Query(default=None),
status: str | None = Query(default=None),
keyword: str | None = Query(default=None),
wanted_only: str | None = Query(default=None),
sort_by: str | None = Query(default=None),
sort_dir: str | None = Query(default=None),
):
normalized_page_size = _validate_playlist_page_size(int(page_size))
normalized_platform = _normalize_optional_filter(platform)
normalized_pool_kind = _normalize_optional_filter(pool_kind)
normalized_status = _normalize_optional_filter(status)
normalized_keyword = _normalize_optional_filter(keyword)
normalized_wanted_only = _parse_optional_bool_query(wanted_only, field_name="wanted_only")
normalized_sort_by, normalized_sort_dir = _normalize_playlist_sort(sort_by, sort_dir)
playlist_page = _list_playlist_page_or_422(
catalog_repo,
page=page,
page_size=normalized_page_size,
platform=normalized_platform,
pool_kind=normalized_pool_kind,
status=normalized_status,
keyword=normalized_keyword,
wanted_only=normalized_wanted_only,
sort_by=normalized_sort_by,
sort_dir=normalized_sort_dir,
)
filters = {
"page": int(page),
"page_size": normalized_page_size,
"platform": normalized_platform or "",
"pool_kind": normalized_pool_kind or "",
"status": normalized_status or "",
"keyword": normalized_keyword or "",
"wanted_only": normalized_wanted_only,
"sort_by": normalized_sort_by or "",
"sort_dir": normalized_sort_dir or "",
}
return templates.TemplateResponse(
name="ops/playlists.html",
request=request,
context={
"title": "Playlists",
"playlist_page": playlist_page,
"playlists": playlist_page["items"],
"filters": filters,
"sort_links": _build_playlist_sort_links(filters),
"previous_page_url": (
"?" + _playlist_query_string(filters, overrides={"page": playlist_page["page"] - 1})
if playlist_page["page"] > 1
else None
),
"next_page_url": (
"?" + _playlist_query_string(filters, overrides={"page": playlist_page["page"] + 1})
if playlist_page["page"] < playlist_page["total_pages"]
else None
),
"filter_options": _playlist_filter_options(catalog_repo),
"playlist_sources": _playlist_source_stats(repo),
"default_sources": ",".join(
_normalize_allowed_list(
env_manager.load_current().get("SOURCES") or "netease,qq,kuwo",
ALLOWED_COLLECT_SOURCES,
)
or ["netease", "qq", "kuwo"]
),
},
)
@app.get("/songs", response_class=HTMLResponse)
def songs_page(request: Request):
rows = repo._fetchall(
"""
SELECT id, platform, remote_song_id, name, singers, updated_at
FROM songs
ORDER BY id DESC
LIMIT 200
"""
)
return templates.TemplateResponse(
name="ops/songs.html",
request=request,
context={
"title": "Songs",
"songs": [dict(row) for row in rows],
"download_stats": _download_stats(repo),
"workers": _dashboard_workers(repo),
"running_items": _dashboard_running_items(repo),
},
)
@app.get("/logs", response_class=HTMLResponse)
def logs_page(request: Request):
events = repo._fetchall(
"""
SELECT id, job_run_id, event_type, message, created_at
FROM job_events
ORDER BY id DESC
LIMIT 200
"""
)
return templates.TemplateResponse(
name="ops/logs.html",
request=request,
context={"title": "Logs", "events": [dict(row) for row in events]},
)
@app.get("/config", response_class=HTMLResponse)
def config_page(request: Request):
return templates.TemplateResponse(
name="ops/config.html",
request=request,
context={
"title": "Config",
"env_content": _read_env_text(env_file),
"env_values": env_manager.load_current(),
"revisions": env_manager.list_revisions(limit=50),
},
)
@app.get("/api/dashboard")
def api_dashboard(include_task_rows: bool = Query(default=True)):
return _dashboard_payload(repo, include_task_rows=include_task_rows)
@app.get("/api/maintenance/local-duplicates")
def api_local_duplicates(sample_limit: int = Query(default=20, ge=1, le=200)):
return maintenance_service.scan_local_duplicates(sample_limit=sample_limit)
@app.post("/api/maintenance/local-duplicates/dedupe")
def api_local_duplicates_dedupe(sample_limit: int = Query(default=20, ge=1, le=200)):
try:
return maintenance_service.dedupe_local_duplicates(sample_limit=sample_limit)
except LocalDedupeBlockedError as exc:
raise HTTPException(status_code=409, detail=str(exc)) from exc
@app.get("/api/jobs")
def api_jobs(limit: int = Query(default=100, ge=1, le=500)):
return {"items": _list_jobs(repo, limit=limit)}
@app.get("/api/jobs/{job_id}")
def api_job_detail(job_id: int):
return _job_detail_payload(repo, job_id)
@app.get("/api/jobs/{job_id}/playlists/{playlist_id}/songs")
def api_job_playlist_songs(
job_id: int,
playlist_id: int,
limit: int = Query(default=500, ge=1, le=2000),
):
job = repo.get_job(job_id)
if job is None:
raise HTTPException(status_code=404, detail="job not found")
return {
"items": repo.list_job_playlist_song_progress(
job_id=job_id,
playlist_id=playlist_id,
limit=limit,
)
}
@app.get("/api/playlists/{playlist_id}/songs")
def api_playlist_songs(
playlist_id: int,
limit: int = Query(default=2000, ge=1, le=5000),
):
playlist_rows = catalog_repo.list_playlists_by_ids([playlist_id])
if not playlist_rows:
raise HTTPException(status_code=404, detail="playlist not found")
playlist_row = dict(playlist_rows[0])
return {
"playlist": {
"id": int(playlist_row["id"]),
"platform": str(playlist_row["platform"]),
"remote_playlist_id": str(playlist_row["remote_playlist_id"]),
"name": str(playlist_row["name"]),
"play_count": (
int(playlist_row["play_count"])
if playlist_row["play_count"] is not None
else None
),
},
"items": catalog_repo.list_playlist_song_details(playlist_id, limit=limit),
}
@app.get("/api/playlists/{playlist_id}/export-folder")
def api_playlist_export_folder(playlist_id: int):
playlist_rows = catalog_repo.list_playlists_by_ids([playlist_id])
if not playlist_rows:
raise HTTPException(status_code=404, detail="playlist not found")
playlist_row = dict(playlist_rows[0])
playlist_payload = {
"id": int(playlist_row["id"]),
"platform": str(playlist_row["platform"]),
"remote_playlist_id": str(playlist_row["remote_playlist_id"]),
"name": str(playlist_row["name"]),
"play_count": (
int(playlist_row["play_count"])
if playlist_row["play_count"] is not None
else None
),
}
env_values = env_manager.load_current()
playlists_root = _resolve_playlists_root(env_values, catalog_repo)
if playlists_root is None:
raise HTTPException(status_code=500, detail="playlists root is not configured")
folder_path = CatalogSyncService(
repository=catalog_repo,
playlists_root=playlists_root,
).ensure_playlist_artifacts_for_playlist(playlist_id)
if folder_path is None or not folder_path.exists():
folder_path = locate_playlist_dir(playlists_root, playlist_payload)
if folder_path is None or not folder_path.exists():
raise HTTPException(status_code=404, detail="playlist export folder not found")
return {
"playlist": playlist_payload,
"folder": {
"path": str(folder_path.resolve()),
"exists": True,
"files": _collect_folder_listing(folder_path),
},
}
@app.get("/api/playlists/{playlist_id}/export.zip")
def api_playlist_export_zip(playlist_id: int):
playlist_rows = catalog_repo.list_playlist_export_state_rows([playlist_id])
if not playlist_rows:
raise HTTPException(status_code=404, detail="playlist not found")
playlist_row = dict(playlist_rows[0])
state_code = str(playlist_row.get("state_code") or "")
if state_code != "downloaded":
return JSONResponse(
status_code=409,
content={
"playlist_id": int(playlist_row["id"]),
"state_code": state_code,
"message": "playlist is not ready for zip export",
},
)
env_values = env_manager.load_current()
playlists_root = _resolve_playlists_root(env_values, catalog_repo)
if playlists_root is None:
raise HTTPException(status_code=500, detail="playlists root is not configured")
export_service = CatalogSyncService(repository=catalog_repo, playlists_root=playlists_root)
playlist_payload = {
"id": int(playlist_row["id"]),
"platform": str(playlist_row.get("platform") or ""),
"remote_playlist_id": str(playlist_row.get("remote_playlist_id") or ""),
"name": str(playlist_row.get("name") or ""),
}
folder_path = export_service.ensure_playlist_artifacts_for_playlist(int(playlist_row["id"]))
if folder_path is None or not folder_path.exists():
folder_path = locate_playlist_dir(playlists_root, playlist_payload)
if folder_path is None or not folder_path.exists():
raise HTTPException(status_code=404, detail="playlist export folder not found")
bundle_path = create_single_playlist_bundle(
playlist_dir=folder_path,
bundle_root=default_bundle_root(),
platform=str(playlist_row.get("platform") or ""),
playlist_id=int(playlist_row["id"]),
playlist_name=str(playlist_row.get("name") or ""),
)
return FileResponse(
path=str(bundle_path),
media_type="application/zip",
filename=bundle_download_filename(bundle_path),
)
@app.get("/api/exports/bundles/{bundle_name}.zip")
def api_export_bundle_download(bundle_name: str):
bundle_path = resolve_bundle_download_path(default_bundle_root(), bundle_name)
if bundle_path is None or not bundle_path.exists() or not bundle_path.is_file():
raise HTTPException(status_code=404, detail="bundle not found")
return FileResponse(
path=str(bundle_path),
media_type="application/zip",
filename=bundle_download_filename(bundle_path),
)
@app.get("/api/playlists")
def api_playlists(
page: int = Query(default=1, ge=1),
page_size: int = Query(default=50),
platform: str | None = Query(default=None),
pool_kind: str | None = Query(default=None),
status: str | None = Query(default=None),
keyword: str | None = Query(default=None),
wanted_only: str | None = Query(default=None),
sort_by: str | None = Query(default=None),
sort_dir: str | None = Query(default=None),
):
normalized_page_size = _validate_playlist_page_size(int(page_size))
normalized_platform = _normalize_optional_filter(platform)
normalized_pool_kind = _normalize_optional_filter(pool_kind)
normalized_status = _normalize_optional_filter(status)
normalized_keyword = _normalize_optional_filter(keyword)
normalized_wanted_only = _parse_optional_bool_query(wanted_only, field_name="wanted_only")
normalized_sort_by, normalized_sort_dir = _normalize_playlist_sort(sort_by, sort_dir)
return _list_playlist_page_or_422(
catalog_repo,
page=page,
page_size=normalized_page_size,
platform=normalized_platform,
pool_kind=normalized_pool_kind,
status=normalized_status,
keyword=normalized_keyword,
wanted_only=normalized_wanted_only,
sort_by=normalized_sort_by,
sort_dir=normalized_sort_dir,
)
@app.post("/api/playlists/mark-wanted")
def api_mark_playlists_wanted(payload: PlaylistBulkRequest):
playlist_ids = _normalize_playlist_ids(payload.playlist_ids)
if not playlist_ids:
raise HTTPException(status_code=422, detail="playlist_ids is required")
catalog_repo.mark_playlists_wanted(playlist_ids, marked_by=payload.marked_by or payload.requested_by)
return {"playlist_ids": playlist_ids, "marked_count": len(playlist_ids)}
@app.post("/api/playlists/unmark-wanted")
def api_unmark_playlists_wanted(payload: PlaylistBulkRequest):
playlist_ids = _normalize_playlist_ids(payload.playlist_ids)
if not playlist_ids:
raise HTTPException(status_code=422, detail="playlist_ids is required")
catalog_repo.unmark_playlists_wanted(playlist_ids)
return {"playlist_ids": playlist_ids, "unmarked_count": len(playlist_ids)}
@app.post("/api/playlists/download", status_code=201)
def api_download_selected_playlists(payload: PlaylistBulkRequest):
return _build_adaptive_download_jobs(
repo,
catalog_repo,
env_manager,
payload=payload,
)
@app.post("/api/playlists/sync", status_code=201)
def api_sync_selected_playlists(payload: PlaylistBulkRequest):
return _build_playlist_job(repo, env_manager, job_type="sync_only", payload=payload)
@app.post("/api/playlists/sync-download", status_code=201)
def api_sync_download_selected_playlists(payload: PlaylistBulkRequest):
return _build_playlist_job(repo, env_manager, job_type="sync_download", payload=payload)
@app.post("/api/playlists/export")
def api_export_selected_playlists(payload: PlaylistBulkRequest):
playlist_ids = _normalize_playlist_ids(payload.playlist_ids)
if not playlist_ids:
raise HTTPException(status_code=422, detail="playlist_ids is required")
env_values = env_manager.load_current()
playlists_root = _resolve_playlists_root(env_values, catalog_repo)
if playlists_root is None:
raise HTTPException(status_code=500, detail="playlists root is not configured")
export_service = CatalogSyncService(
repository=catalog_repo,
playlists_root=playlists_root,
)
rows_by_id = _playlist_export_rows_by_id_or_404(catalog_repo, playlist_ids)
exported_playlist_ids: list[int] = []
download_playlist_ids: list[int] = []
sync_download_playlist_ids: list[int] = []
for playlist_id in playlist_ids:
row = rows_by_id.get(int(playlist_id))
if row is None:
continue
state_code = str(row.get("state_code") or "")
if state_code == "downloaded":
folder_path = export_service.ensure_playlist_artifacts_for_playlist(int(playlist_id))
if folder_path is not None:
exported_playlist_ids.append(int(playlist_id))
continue
if state_code == "unsynced":
sync_download_playlist_ids.append(int(playlist_id))
continue
download_playlist_ids.append(int(playlist_id))
download_job = _create_scoped_playlist_job(
repo,
env_manager,
job_type="download_only",
playlist_ids=download_playlist_ids,
requested_by=payload.requested_by,
)
sync_download_job = _create_scoped_playlist_job(
repo,
env_manager,
job_type="sync_download",
playlist_ids=sync_download_playlist_ids,
requested_by=payload.requested_by,
)
return {
"playlist_ids": playlist_ids,
"exported_playlist_ids": exported_playlist_ids,
"exported_count": len(exported_playlist_ids),
"download_job": download_job,
"sync_download_job": sync_download_job,
}
@app.post("/api/playlists/export-zip")
def api_export_selected_playlists_zip(payload: PlaylistBulkRequest):
playlist_ids = _normalize_playlist_ids(payload.playlist_ids)
if not playlist_ids:
raise HTTPException(status_code=422, detail="playlist_ids is required")
rows_by_id = _playlist_export_rows_by_id_or_404(catalog_repo, playlist_ids)
ready_playlist_ids: list[int] = []
blocked_playlist_ids: list[int] = []
download_playlist_ids: list[int] = []
sync_download_playlist_ids: list[int] = []
for playlist_id in playlist_ids:
row = rows_by_id.get(int(playlist_id))
if row is None:
continue
state_code = str(row.get("state_code") or "")
if state_code == "downloaded":
ready_playlist_ids.append(int(playlist_id))
continue
blocked_playlist_ids.append(int(playlist_id))
if state_code == "unsynced":
sync_download_playlist_ids.append(int(playlist_id))
continue
download_playlist_ids.append(int(playlist_id))
if blocked_playlist_ids:
return {
"status": "queued",
"message": "selected playlists include non-ready items; export zip queued",
"playlist_ids": playlist_ids,
"ready_playlist_ids": ready_playlist_ids,
"blocked_playlist_ids": blocked_playlist_ids,
"download_job": _create_scoped_playlist_job(
repo,
env_manager,
job_type="download_only",
playlist_ids=download_playlist_ids,
requested_by=payload.requested_by,
),
"sync_download_job": _create_scoped_playlist_job(
repo,
env_manager,
job_type="sync_download",
playlist_ids=sync_download_playlist_ids,
requested_by=payload.requested_by,
),
}
env_values = env_manager.load_current()
playlists_root = _resolve_playlists_root(env_values, catalog_repo)
if playlists_root is None:
raise HTTPException(status_code=500, detail="playlists root is not configured")
export_service = CatalogSyncService(repository=catalog_repo, playlists_root=playlists_root)
playlist_dirs: list[Path] = []
for playlist_id in ready_playlist_ids:
row = rows_by_id.get(int(playlist_id))
if row is None:
continue
playlist_payload = {
"id": int(row["id"]),
"platform": str(row.get("platform") or ""),
"remote_playlist_id": str(row.get("remote_playlist_id") or ""),
"name": str(row.get("name") or ""),
}
folder_path = export_service.ensure_playlist_artifacts_for_playlist(playlist_id)
if folder_path is None or not folder_path.exists():
folder_path = locate_playlist_dir(playlists_root, playlist_payload)
if folder_path is None or not folder_path.exists():
raise HTTPException(status_code=404, detail="playlist export folder not found")
playlist_dirs.append(folder_path)
bundle_path = create_multi_playlist_bundle(
playlist_dirs=playlist_dirs,
bundle_root=default_bundle_root(),
)
return {
"status": "ready",
"playlist_ids": ready_playlist_ids,
"download_url": f"/api/exports/bundles/{bundle_path.stem}.zip",
}
@app.post("/api/jobs", status_code=201)
def api_create_job(payload: JobCreateRequest):
snapshot = payload.config_snapshot or env_manager.build_job_snapshot()
sources = _normalize_allowed_list(list(payload.sources), ALLOWED_COLLECT_SOURCES) or _normalize_allowed_list(
snapshot.get("sources") or snapshot.get("SOURCES"),
ALLOWED_COLLECT_SOURCES,
)
download_sources = _normalize_allowed_list(
list(payload.download_sources),
ALLOWED_DOWNLOAD_SOURCES,
) or _normalize_allowed_list(
snapshot.get("download_sources") or snapshot.get("DOWNLOAD_SOURCES"),
ALLOWED_DOWNLOAD_SOURCES,
)
job_id = repo.create_job(
job_type=payload.job_type,
requested_by=payload.requested_by,
config_snapshot=dict(snapshot),
sources=sources,
download_sources=download_sources,
playlist_scope=payload.playlist_scope or {},
)
_ensure_job_stages(repo, job_id, payload.job_type)
job = repo.get_job(job_id)
if job is None:
raise HTTPException(status_code=500, detail="job create failed")
return {"job": _serialize_job(job)}
@app.post("/api/jobs/{job_id}/commands", status_code=201)
def api_create_job_command(job_id: int, payload: JobCommandRequest):
job = repo.get_job(job_id)
if job is None:
raise HTTPException(status_code=404, detail="job not found")
command_type = payload.command_type
if command_type in RETRY_COMMAND_TYPES and payload.target_item_id is None:
raise HTTPException(
status_code=422,
detail="target_item_id is required for retry commands",
)
command_id = repo.create_command(
job_run_id=job_id,
command_type=command_type,
target_item_id=payload.target_item_id,
payload=payload.payload,
)
return {"job_id": int(job_id), "command_id": int(command_id), "command_type": command_type}
@app.get("/api/config/env")
def api_get_env():
return {
"path": str(env_file.resolve()),
"content": _read_env_text(env_file),
"parsed": env_manager.load_current(),
}
@app.put("/api/config/env")
def api_put_env(payload: EnvUpdateRequest):
env_file.parent.mkdir(parents=True, exist_ok=True)
env_file.write_text(payload.content, encoding="utf-8")
revision_id = env_manager.save_revision(note=payload.note)
revision = repo.get_config_revision(revision_id)
return {
"saved": True,
"revision": revision,
"content": _read_env_text(env_file),
"parsed": env_manager.load_current(),
}
@app.get("/api/config/revisions")
def api_get_revisions(limit: int = Query(default=50, ge=1, le=500)):
return {"items": env_manager.list_revisions(limit=limit)}
@app.post("/api/config/revisions/{revision_id}/apply")
def api_apply_revision(revision_id: int):
try:
env_manager.apply_revision(revision_id)
except ValueError:
raise HTTPException(status_code=404, detail="revision not found") from None
revision = repo.get_config_revision(revision_id)
return {
"applied": True,
"revision": revision,
"content": _read_env_text(env_file),
"parsed": env_manager.load_current(),
}
@app.get("/api/events/stream")
def api_events_stream(once: bool = Query(default=False)):
def event_stream():
while True:
snapshot = _dashboard_payload(repo)
payload = json.dumps(snapshot, ensure_ascii=False)
yield f"event: snapshot\ndata: {payload}\n\n"
if once:
break
yield ": heartbeat\n\n"
time.sleep(LIVE_DASHBOARD_SNAPSHOT_INTERVAL_SECONDS)
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
return app