1608 lines
61 KiB
Python
1608 lines
61 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import threading
|
|
import time
|
|
from contextlib import asynccontextmanager
|
|
from pathlib import Path
|
|
from typing import Any, Literal
|
|
from urllib.parse import urlencode
|
|
|
|
from fastapi import FastAPI, HTTPException, Query, Request
|
|
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse, StreamingResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
from pydantic import BaseModel, Field
|
|
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.export_bundles import (
|
|
bundle_download_filename,
|
|
create_multi_playlist_bundle,
|
|
create_single_playlist_bundle,
|
|
default_bundle_root,
|
|
resolve_bundle_download_path,
|
|
)
|
|
from musicdl.catalogsync.playlist_artifacts import locate_playlist_dir
|
|
from musicdl.catalogsync.repository import CatalogRepository
|
|
from musicdl.catalogsync.resolver_stats import (
|
|
default_resolver_stats_db_path,
|
|
initialize_resolver_stats_database,
|
|
)
|
|
from musicdl.catalogsync.services import CatalogSyncService
|
|
|
|
from .config import CatalogsyncEnvManager
|
|
from .jobdefs import job_lane_type
|
|
from .maintenance import LocalDedupeBlockedError, LocalMaintenanceService
|
|
from .repository import OpsRepository
|
|
from .runner import JOB_STAGE_SEQUENCES, OpsRunner
|
|
|
|
JobType = Literal[
|
|
"catalog_sync",
|
|
"collect_only",
|
|
"sync_only",
|
|
"sync_download",
|
|
"download_only",
|
|
"upload_only",
|
|
"download_upload",
|
|
]
|
|
CommandType = Literal["pause", "resume", "cancel", "retry_item", "force_retry_item"]
|
|
|
|
ALLOWED_COLLECT_SOURCES = {"netease", "qq", "kuwo"}
|
|
ALLOWED_DOWNLOAD_SOURCES = {"qq", "kuwo", "migu", "qianqian", "kugou", "netease"}
|
|
RETRY_COMMAND_TYPES = {"retry_item", "force_retry_item"}
|
|
LOGGER = logging.getLogger(__name__)
|
|
DOWNLOAD_JOB_TYPES = tuple(
|
|
job_type for job_type, stages in JOB_STAGE_SEQUENCES.items() if "download" in stages
|
|
)
|
|
LIVE_DASHBOARD_SNAPSHOT_INTERVAL_SECONDS = 0.5
|
|
PLAYLIST_SORT_FIELDS = {"id", "platform", "name", "play_count"}
|
|
PLAYLIST_SORT_DIRECTIONS = {"asc", "desc"}
|
|
PLAYLIST_SORT_DEFAULTS = {
|
|
"id": "desc",
|
|
"platform": "asc",
|
|
"name": "asc",
|
|
"play_count": "desc",
|
|
}
|
|
|
|
|
|
def _format_speed_text(speed_bytes_per_sec: int | None) -> str:
|
|
speed_value = int(speed_bytes_per_sec or 0)
|
|
if speed_value <= 0:
|
|
return "0 B/s"
|
|
if speed_value >= 1024 * 1024:
|
|
return f"{speed_value / (1024 * 1024):.1f} MB/s"
|
|
if speed_value >= 1024:
|
|
return f"{speed_value / 1024:.1f} KB/s"
|
|
return f"{speed_value} B/s"
|
|
|
|
|
|
def _normalize_allowed_list(value: Any, allowed: set[str]) -> list[str]:
|
|
if not value:
|
|
return []
|
|
parts: list[str] = []
|
|
if isinstance(value, str):
|
|
parts = [part.strip() for part in value.split(",") if part and part.strip()]
|
|
elif isinstance(value, list):
|
|
parts = [str(item).strip() for item in value if str(item).strip()]
|
|
else:
|
|
return []
|
|
normalized: list[str] = []
|
|
seen: set[str] = set()
|
|
for item in parts:
|
|
if item in allowed and item not in seen:
|
|
normalized.append(item)
|
|
seen.add(item)
|
|
return normalized
|
|
|
|
|
|
def _read_env_text(env_path: Path) -> str:
|
|
if not env_path.exists():
|
|
return ""
|
|
return env_path.read_text(encoding="utf-8")
|
|
|
|
|
|
def _resolve_playlists_root(env_values: dict[str, str], catalog_repo: CatalogRepository) -> Path | None:
|
|
root_dir = str(env_values.get("ROOT_DIR") or "").strip()
|
|
if root_dir:
|
|
path = Path(root_dir).resolve() / "playlists"
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
return path
|
|
library_dir = str(env_values.get("LIBRARY_DIR") or "").strip()
|
|
if library_dir:
|
|
path = Path(library_dir).resolve().parent / "playlists"
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
return path
|
|
library_root = catalog_repo.get_default_local_library_root()
|
|
if library_root is None:
|
|
return None
|
|
path = library_root.parent / "playlists"
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
return path
|
|
|
|
|
|
def _collect_folder_listing(folder_path: Path) -> list[dict[str, Any]]:
|
|
if not folder_path.exists():
|
|
return []
|
|
items: list[dict[str, Any]] = []
|
|
for child in sorted(folder_path.rglob("*")):
|
|
if not child.is_file():
|
|
continue
|
|
try:
|
|
stat = child.stat()
|
|
size_bytes = int(stat.st_size)
|
|
except OSError:
|
|
size_bytes = 0
|
|
items.append(
|
|
{
|
|
"relative_path": child.relative_to(folder_path).as_posix(),
|
|
"absolute_path": str(child.resolve()),
|
|
"size_bytes": size_bytes,
|
|
}
|
|
)
|
|
return items
|
|
|
|
|
|
def _serialize_job(job: Any) -> dict[str, Any]:
|
|
return {
|
|
"id": int(job.id),
|
|
"job_type": str(job.job_type),
|
|
"status": getattr(job.status, "value", str(job.status)),
|
|
"priority": int(job.priority),
|
|
"requested_by": job.requested_by,
|
|
"config_snapshot": dict(job.config_snapshot or {}),
|
|
"sources": list(job.sources or []),
|
|
"download_sources": list(job.download_sources or []),
|
|
"playlist_scope": dict(job.playlist_scope or {}),
|
|
"created_at": job.created_at,
|
|
"started_at": job.started_at,
|
|
"ended_at": job.ended_at,
|
|
"last_error": job.last_error,
|
|
"resume_token": job.resume_token,
|
|
}
|
|
|
|
|
|
def _display_from_payload(payload: dict[str, Any]) -> str | None:
|
|
if not isinstance(payload, dict):
|
|
return None
|
|
for key in ("row", "playlist_row", "upload_row"):
|
|
row = payload.get(key)
|
|
if isinstance(row, dict):
|
|
name = str(row.get("name") or row.get("song_name") or "").strip()
|
|
if name:
|
|
return name
|
|
return str(payload.get("display_name") or payload.get("name") or "").strip() or None
|
|
|
|
|
|
def _list_jobs(repo: OpsRepository, limit: int = 100) -> list[dict[str, Any]]:
|
|
rows = repo._fetchall("SELECT * FROM job_runs ORDER BY id DESC LIMIT ?", (int(limit),))
|
|
return [_serialize_job(repo._row_to_job(row)) for row in rows]
|
|
|
|
|
|
def _job_stages(repo: OpsRepository, job_id: int) -> list[dict[str, Any]]:
|
|
return [
|
|
{
|
|
"id": int(stage.id),
|
|
"job_run_id": int(stage.job_run_id),
|
|
"stage_type": str(stage.stage_type),
|
|
"status": stage.status.value,
|
|
"seq_no": int(stage.seq_no),
|
|
"total_items": int(stage.total_items),
|
|
"pending_items": int(stage.pending_items),
|
|
"running_items": int(stage.running_items),
|
|
"success_items": int(stage.success_items),
|
|
"failed_items": int(stage.failed_items),
|
|
"skipped_items": int(stage.skipped_items),
|
|
"started_at": stage.started_at,
|
|
"ended_at": stage.ended_at,
|
|
"last_error": stage.last_error,
|
|
}
|
|
for stage in repo.list_job_stages(job_id)
|
|
]
|
|
|
|
|
|
def _job_commands(repo: OpsRepository, job_id: int) -> list[dict[str, Any]]:
|
|
rows = repo._fetchall(
|
|
"""
|
|
SELECT *
|
|
FROM job_commands
|
|
WHERE job_run_id = ?
|
|
ORDER BY id DESC
|
|
LIMIT 100
|
|
""",
|
|
(int(job_id),),
|
|
)
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
def _queued_download_job_count(repo: OpsRepository) -> int:
|
|
if not DOWNLOAD_JOB_TYPES:
|
|
return 0
|
|
placeholders = ", ".join("?" for _ in DOWNLOAD_JOB_TYPES)
|
|
row = repo._fetchone(
|
|
f"""
|
|
SELECT COUNT(*) AS count_value
|
|
FROM job_runs
|
|
WHERE status = ? AND job_type IN ({placeholders})
|
|
""",
|
|
("queued", *DOWNLOAD_JOB_TYPES),
|
|
)
|
|
return int(row["count_value"]) if row else 0
|
|
|
|
|
|
def _dashboard_summary(
|
|
repo: OpsRepository,
|
|
*,
|
|
task_rows: list[dict[str, Any]] | None = None,
|
|
) -> dict[str, int]:
|
|
rows = repo._fetchall(
|
|
"""
|
|
SELECT status, COUNT(*) AS count_value
|
|
FROM job_runs
|
|
GROUP BY status
|
|
"""
|
|
)
|
|
counts = {str(row["status"]): int(row["count_value"]) for row in rows}
|
|
if task_rows is None:
|
|
queued_download_jobs = _queued_download_job_count(repo)
|
|
else:
|
|
queued_download_jobs = sum(
|
|
1
|
|
for row in task_rows
|
|
if str(row.get("status")) == "queued" and str(row.get("lane_type")) == "download"
|
|
)
|
|
return {
|
|
"total_jobs": int(sum(counts.values())),
|
|
"queued_jobs": int(counts.get("queued", 0)),
|
|
"queued_download_jobs": int(queued_download_jobs),
|
|
"running_jobs": int(counts.get("running", 0)),
|
|
"paused_jobs": int(counts.get("paused", 0) + counts.get("pause_requested", 0)),
|
|
"failed_jobs": int(counts.get("failed", 0) + counts.get("completed_with_errors", 0)),
|
|
}
|
|
|
|
|
|
def _serialize_worker_row(row: dict[str, Any]) -> dict[str, Any]:
|
|
payload_text = str(row.get("current_display_text") or "").strip()
|
|
fallback_text = str(row.get("song_name") or row.get("playlist_name") or row.get("item_key") or "").strip()
|
|
speed_bytes_per_sec = int(row.get("speed_bytes_per_sec") or 0)
|
|
return {
|
|
"id": int(row["id"]),
|
|
"job_run_id": row["job_run_id"],
|
|
"job_stage_id": row["job_stage_id"],
|
|
"worker_name": row["worker_name"],
|
|
"status": row["status"],
|
|
"current_job_item_id": row["current_job_item_id"],
|
|
"current_song_id": row["current_song_id"],
|
|
"current_playlist_id": row["current_playlist_id"],
|
|
"display_text": payload_text or fallback_text,
|
|
"stage_type": row.get("stage_type"),
|
|
"item_type": row.get("item_type"),
|
|
"last_progress_text": row.get("last_progress_text"),
|
|
"downloaded_bytes": int(row.get("downloaded_bytes") or 0),
|
|
"total_bytes": int(row.get("total_bytes") or 0),
|
|
"speed_bytes_per_sec": speed_bytes_per_sec,
|
|
"speed_text": _format_speed_text(speed_bytes_per_sec),
|
|
"processed_count": int(row["processed_count"] or 0),
|
|
"error_count": int(row["error_count"] or 0),
|
|
"heartbeat_at": row.get("heartbeat_at"),
|
|
}
|
|
|
|
|
|
def _playlist_source_stats(repo: OpsRepository) -> list[dict[str, Any]]:
|
|
rows = repo._fetchall(
|
|
"""
|
|
SELECT
|
|
pp.platform,
|
|
pp.pool_kind,
|
|
pp.name AS pool_name,
|
|
COUNT(DISTINCT pl.id) AS playlist_count
|
|
FROM playlist_pools AS pp
|
|
LEFT JOIN pool_playlists AS rel ON rel.pool_id = pp.id
|
|
LEFT JOIN playlists AS pl ON pl.id = rel.playlist_id
|
|
GROUP BY pp.id, pp.platform, pp.pool_kind, pp.name
|
|
ORDER BY pp.platform ASC, pp.pool_kind ASC, pp.id ASC
|
|
"""
|
|
)
|
|
return [
|
|
{
|
|
"platform": str(row["platform"]),
|
|
"pool_kind": str(row["pool_kind"]),
|
|
"pool_name": str(row["pool_name"]),
|
|
"playlist_count": int(row["playlist_count"] or 0),
|
|
}
|
|
for row in rows
|
|
]
|
|
|
|
|
|
def _playlist_filter_options(catalog_repo: CatalogRepository) -> dict[str, list[str]]:
|
|
platform_rows = catalog_repo._fetchall(
|
|
"SELECT DISTINCT platform FROM playlists WHERE platform IS NOT NULL AND TRIM(platform) != '' ORDER BY platform ASC"
|
|
)
|
|
pool_kind_rows = catalog_repo._fetchall(
|
|
"SELECT DISTINCT pool_kind FROM playlist_pools WHERE pool_kind IS NOT NULL AND TRIM(pool_kind) != '' ORDER BY pool_kind ASC"
|
|
)
|
|
return {
|
|
"platforms": [str(row["platform"]) for row in platform_rows],
|
|
"pool_kinds": [str(row["pool_kind"]) for row in pool_kind_rows],
|
|
}
|
|
|
|
|
|
def _normalize_playlist_ids(values: list[Any]) -> list[int]:
|
|
normalized: list[int] = []
|
|
seen: set[int] = set()
|
|
for raw in values:
|
|
try:
|
|
playlist_id = int(raw)
|
|
except (TypeError, ValueError):
|
|
continue
|
|
if playlist_id <= 0 or playlist_id in seen:
|
|
continue
|
|
normalized.append(playlist_id)
|
|
seen.add(playlist_id)
|
|
return normalized
|
|
|
|
|
|
def _validate_playlist_page_size(page_size: int) -> int:
|
|
if page_size not in {20, 50, 100}:
|
|
raise HTTPException(status_code=422, detail="page_size must be one of 20, 50, 100")
|
|
return page_size
|
|
|
|
|
|
def _normalize_optional_filter(value: str | None) -> str | None:
|
|
if value is None:
|
|
return None
|
|
normalized = value.strip()
|
|
return normalized or None
|
|
|
|
|
|
def _normalize_playlist_sort(
|
|
sort_by: str | None,
|
|
sort_dir: str | None,
|
|
) -> tuple[str | None, str | None]:
|
|
normalized_sort_by = _normalize_optional_filter(sort_by)
|
|
normalized_sort_dir = _normalize_optional_filter(sort_dir)
|
|
if normalized_sort_by is None:
|
|
return None, None
|
|
if normalized_sort_by not in PLAYLIST_SORT_FIELDS:
|
|
raise HTTPException(status_code=422, detail="sort_by must be one of id, platform, name, play_count")
|
|
if normalized_sort_dir is None:
|
|
normalized_sort_dir = PLAYLIST_SORT_DEFAULTS[normalized_sort_by]
|
|
normalized_sort_dir = normalized_sort_dir.lower()
|
|
if normalized_sort_dir not in PLAYLIST_SORT_DIRECTIONS:
|
|
raise HTTPException(status_code=422, detail="sort_dir must be one of asc, desc")
|
|
return normalized_sort_by, normalized_sort_dir
|
|
|
|
|
|
def _playlist_query_string(
|
|
filters: dict[str, Any],
|
|
*,
|
|
overrides: dict[str, Any] | None = None,
|
|
) -> str:
|
|
payload = dict(filters or {})
|
|
payload.update(overrides or {})
|
|
params: list[tuple[str, str]] = [
|
|
("page", str(int(payload.get("page") or 1))),
|
|
("page_size", str(int(payload.get("page_size") or 50))),
|
|
]
|
|
for key in ("keyword", "platform", "pool_kind", "status", "sort_by", "sort_dir"):
|
|
value = payload.get(key)
|
|
if value in (None, ""):
|
|
continue
|
|
params.append((key, str(value)))
|
|
if payload.get("wanted_only"):
|
|
params.append(("wanted_only", "1"))
|
|
return urlencode(params)
|
|
|
|
|
|
def _build_playlist_sort_links(filters: dict[str, Any]) -> dict[str, dict[str, str]]:
|
|
links: dict[str, dict[str, str]] = {}
|
|
current_sort_by = str(filters.get("sort_by") or "")
|
|
current_sort_dir = str(filters.get("sort_dir") or "")
|
|
for field in PLAYLIST_SORT_FIELDS:
|
|
is_current = current_sort_by == field
|
|
if is_current:
|
|
next_dir = "desc" if current_sort_dir == "asc" else "asc"
|
|
indicator = "^" if current_sort_dir == "asc" else "v"
|
|
else:
|
|
next_dir = PLAYLIST_SORT_DEFAULTS[field]
|
|
indicator = ""
|
|
links[field] = {
|
|
"href": "?" + _playlist_query_string(
|
|
filters,
|
|
overrides={"page": 1, "sort_by": field, "sort_dir": next_dir},
|
|
),
|
|
"indicator": indicator,
|
|
}
|
|
return links
|
|
|
|
|
|
def _parse_optional_bool_query(value: str | bool | None, *, field_name: str) -> bool:
|
|
if value is None:
|
|
return False
|
|
if isinstance(value, bool):
|
|
return value
|
|
normalized = str(value).strip().lower()
|
|
if not normalized:
|
|
return False
|
|
if normalized in {"1", "true", "t", "yes", "y", "on"}:
|
|
return True
|
|
if normalized in {"0", "false", "f", "no", "n", "off"}:
|
|
return False
|
|
raise HTTPException(status_code=422, detail=f"{field_name} must be a valid boolean value")
|
|
|
|
|
|
def _list_playlist_page_or_422(
|
|
catalog_repo: CatalogRepository,
|
|
*,
|
|
page: int,
|
|
page_size: int,
|
|
platform: str | None,
|
|
pool_kind: str | None,
|
|
status: str | None,
|
|
keyword: str | None,
|
|
wanted_only: bool,
|
|
sort_by: str | None,
|
|
sort_dir: str | None,
|
|
) -> dict[str, Any]:
|
|
try:
|
|
return catalog_repo.list_playlist_page(
|
|
page=page,
|
|
page_size=page_size,
|
|
platform=platform,
|
|
pool_kind=pool_kind,
|
|
status=status,
|
|
keyword=keyword,
|
|
wanted_only=wanted_only,
|
|
sort_by=sort_by,
|
|
sort_dir=sort_dir,
|
|
)
|
|
except ValueError as exc:
|
|
raise HTTPException(status_code=422, detail=str(exc)) from exc
|
|
|
|
|
|
def _job_workers(repo: OpsRepository, job_id: int, *, active_only: bool = False) -> list[dict[str, Any]]:
|
|
return [
|
|
_serialize_worker_row(row)
|
|
for row in repo.list_job_workers(job_id, active_only=active_only, limit=50)
|
|
]
|
|
|
|
|
|
def _job_running_items(repo: OpsRepository, job_id: int) -> list[dict[str, Any]]:
|
|
workers = {
|
|
int(worker["current_job_item_id"]): worker
|
|
for worker in _job_workers(repo, job_id, active_only=False)
|
|
if worker.get("current_job_item_id") is not None
|
|
}
|
|
items = repo.list_job_items_with_context(job_id, statuses=["running"], limit=100)
|
|
serialized: list[dict[str, Any]] = []
|
|
for item in items:
|
|
payload = item.get("payload") or {}
|
|
worker = workers.get(int(item["id"]))
|
|
serialized.append(
|
|
{
|
|
"id": int(item["id"]),
|
|
"stage_type": item.get("stage_type"),
|
|
"item_type": item.get("item_type"),
|
|
"status": item.get("status"),
|
|
"song_id": item.get("song_id"),
|
|
"playlist_id": item.get("playlist_id"),
|
|
"worker_name": worker.get("worker_name") if worker else None,
|
|
"display_name": (
|
|
(worker.get("display_text") if worker else None)
|
|
or item.get("song_name")
|
|
or item.get("playlist_name")
|
|
or _display_from_payload(payload)
|
|
or str(item.get("item_key") or "")
|
|
),
|
|
"started_at": item.get("started_at"),
|
|
}
|
|
)
|
|
return serialized
|
|
|
|
|
|
def _dashboard_workers(repo: OpsRepository) -> list[dict[str, Any]]:
|
|
rows = repo._fetchall(
|
|
"""
|
|
SELECT
|
|
jw.*,
|
|
js.stage_type,
|
|
ji.item_type,
|
|
ji.item_key,
|
|
s.name AS song_name,
|
|
p.name AS playlist_name
|
|
FROM job_workers AS jw
|
|
JOIN job_items AS ji
|
|
ON ji.id = jw.current_job_item_id
|
|
AND ji.status = 'running'
|
|
JOIN job_stages AS js ON js.id = ji.job_stage_id
|
|
JOIN job_runs AS jr
|
|
ON jr.id = js.job_run_id
|
|
AND jr.status IN ('running', 'pause_requested')
|
|
LEFT JOIN songs AS s ON s.id = COALESCE(jw.current_song_id, ji.song_id)
|
|
LEFT JOIN playlists AS p ON p.id = COALESCE(jw.current_playlist_id, ji.playlist_id)
|
|
ORDER BY
|
|
CASE
|
|
WHEN jw.status = 'running' THEN 0
|
|
WHEN jw.status = 'paused' THEN 1
|
|
ELSE 2
|
|
END,
|
|
COALESCE(jw.heartbeat_at, '') DESC,
|
|
jw.id DESC
|
|
LIMIT 50
|
|
"""
|
|
)
|
|
return [_serialize_worker_row(dict(row)) for row in rows]
|
|
|
|
|
|
def _dashboard_running_items(repo: OpsRepository) -> list[dict[str, Any]]:
|
|
rows = repo._fetchall(
|
|
"""
|
|
SELECT
|
|
i.*,
|
|
s.stage_type,
|
|
s.job_run_id,
|
|
w.worker_name,
|
|
w.current_display_text,
|
|
p.name AS playlist_name,
|
|
sg.name AS song_name
|
|
FROM job_items AS i
|
|
JOIN job_stages AS s ON s.id = i.job_stage_id
|
|
LEFT JOIN job_workers AS w ON w.current_job_item_id = i.id
|
|
LEFT JOIN playlists AS p ON p.id = i.playlist_id
|
|
LEFT JOIN songs AS sg ON sg.id = i.song_id
|
|
WHERE i.status = 'running'
|
|
ORDER BY i.id DESC
|
|
LIMIT 100
|
|
"""
|
|
)
|
|
items: list[dict[str, Any]] = []
|
|
for row in rows:
|
|
payload = json.loads(row["payload_json"] or "{}")
|
|
items.append(
|
|
{
|
|
"id": int(row["id"]),
|
|
"job_run_id": row["job_run_id"],
|
|
"stage_type": row["stage_type"],
|
|
"item_type": row["item_type"],
|
|
"status": row["status"],
|
|
"song_id": row["song_id"],
|
|
"playlist_id": row["playlist_id"],
|
|
"worker_name": row["worker_name"],
|
|
"display_name": (
|
|
str(row["current_display_text"] or "").strip()
|
|
or str(row["song_name"] or "").strip()
|
|
or str(row["playlist_name"] or "").strip()
|
|
or _display_from_payload(payload)
|
|
or str(row["item_key"])
|
|
),
|
|
"started_at": row["started_at"],
|
|
}
|
|
)
|
|
return items
|
|
|
|
|
|
def _download_stats(repo: OpsRepository) -> dict[str, int]:
|
|
stats = repo.get_download_stats()
|
|
playlist_row = repo._fetchone("SELECT COUNT(*) AS count_value FROM playlists")
|
|
pool_row = repo._fetchone("SELECT COUNT(*) AS count_value FROM playlist_pools")
|
|
running_song_row = repo._fetchone(
|
|
"""
|
|
SELECT COUNT(*) AS count_value
|
|
FROM job_items
|
|
WHERE status = 'running' AND song_id IS NOT NULL
|
|
"""
|
|
)
|
|
stats["total_playlists"] = int(playlist_row["count_value"]) if playlist_row else 0
|
|
stats["total_playlist_pools"] = int(pool_row["count_value"]) if pool_row else 0
|
|
stats["running_song_items"] = int(running_song_row["count_value"]) if running_song_row else 0
|
|
return stats
|
|
|
|
|
|
def _dashboard_transfer_stats(workers: list[dict[str, Any]]) -> dict[str, Any]:
|
|
download_speed_bytes_per_sec = sum(
|
|
int(worker.get("speed_bytes_per_sec") or 0)
|
|
for worker in workers
|
|
if str(worker.get("stage_type") or "").lower() == "download"
|
|
)
|
|
return {
|
|
"download_speed_bytes_per_sec": int(download_speed_bytes_per_sec),
|
|
"download_speed_text": _format_speed_text(download_speed_bytes_per_sec),
|
|
"upload_speed_bytes_per_sec": 0,
|
|
"upload_speed_text": "-",
|
|
}
|
|
|
|
|
|
def _dashboard_payload(repo: OpsRepository, *, include_task_rows: bool = False) -> dict[str, Any]:
|
|
active_job = repo.get_active_job()
|
|
task_rows = repo.list_task_center_rows(limit=50) if include_task_rows else None
|
|
workers = _dashboard_workers(repo)
|
|
payload = {
|
|
"summary": _dashboard_summary(repo, task_rows=task_rows),
|
|
"download_stats": _download_stats(repo),
|
|
"transfer_stats": _dashboard_transfer_stats(workers),
|
|
"workers": workers,
|
|
"running_items": _dashboard_running_items(repo),
|
|
"playlist_sources": _playlist_source_stats(repo),
|
|
"active_job": _serialize_job(active_job) if active_job is not None else None,
|
|
"jobs": _list_jobs(repo, limit=10),
|
|
}
|
|
if task_rows is not None:
|
|
payload["task_rows"] = task_rows
|
|
recent_events = repo._fetchall(
|
|
"""
|
|
SELECT id, job_run_id, event_type, message, created_at
|
|
FROM job_events
|
|
ORDER BY id DESC
|
|
LIMIT 5
|
|
"""
|
|
)
|
|
payload["recent_events"] = [dict(row) for row in recent_events]
|
|
return payload
|
|
|
|
|
|
def _split_dashboard_task_rows(task_rows: list[dict[str, Any]]) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
|
|
done_statuses = {"completed", "completed_with_errors", "failed", "canceled"}
|
|
doing_rows: list[dict[str, Any]] = []
|
|
done_rows: list[dict[str, Any]] = []
|
|
for row in task_rows or []:
|
|
if str(row.get("status") or "").lower() in done_statuses:
|
|
done_rows.append(row)
|
|
else:
|
|
doing_rows.append(row)
|
|
done_rows.sort(
|
|
key=lambda row: (
|
|
str(row.get("ended_at") or row.get("started_at") or row.get("created_at") or ""),
|
|
int(row.get("id") or 0),
|
|
),
|
|
reverse=True,
|
|
)
|
|
done_rows = done_rows[:10]
|
|
return doing_rows, done_rows
|
|
|
|
|
|
def _job_detail_payload(repo: OpsRepository, job_id: int) -> dict[str, Any]:
|
|
job = repo.get_job(job_id)
|
|
if job is None:
|
|
raise HTTPException(status_code=404, detail="job not found")
|
|
return {
|
|
"job": _serialize_job(job),
|
|
"stages": _job_stages(repo, job_id),
|
|
"commands": _job_commands(repo, job_id),
|
|
"workers": _job_workers(repo, job_id, active_only=False),
|
|
"running_items": _job_running_items(repo, job_id),
|
|
"download_stats": _download_stats(repo),
|
|
"playlist_progress": repo.list_job_playlist_progress(job_id),
|
|
}
|
|
|
|
|
|
def _ensure_job_stages(repo: OpsRepository, job_id: int, job_type: str) -> None:
|
|
if repo.list_job_stages(job_id):
|
|
return
|
|
for seq_no, stage_type in enumerate(JOB_STAGE_SEQUENCES.get(job_type, []), start=1):
|
|
repo.create_stage(job_run_id=job_id, stage_type=stage_type, seq_no=seq_no)
|
|
|
|
|
|
class JobCreateRequest(BaseModel):
|
|
job_type: JobType = Field(default="catalog_sync")
|
|
requested_by: str | None = None
|
|
sources: list[Literal["netease", "qq", "kuwo"]] = Field(default_factory=list)
|
|
download_sources: list[Literal["qq", "kuwo", "migu", "qianqian", "kugou", "netease"]] = Field(
|
|
default_factory=list
|
|
)
|
|
playlist_scope: dict[str, Any] = Field(default_factory=dict)
|
|
config_snapshot: dict[str, Any] | None = None
|
|
|
|
|
|
class JobCommandRequest(BaseModel):
|
|
command_type: CommandType
|
|
target_item_id: int | None = Field(default=None, ge=1)
|
|
payload: dict[str, Any] = Field(default_factory=dict)
|
|
|
|
|
|
class EnvUpdateRequest(BaseModel):
|
|
content: str = ""
|
|
note: str | None = None
|
|
|
|
|
|
class PlaylistBulkRequest(BaseModel):
|
|
playlist_ids: list[int] = Field(default_factory=list)
|
|
requested_by: str | None = None
|
|
marked_by: str | None = None
|
|
|
|
|
|
def _create_scoped_playlist_job(
|
|
repo: OpsRepository,
|
|
env_manager: CatalogsyncEnvManager,
|
|
*,
|
|
job_type: Literal["download_only", "sync_download", "sync_only"],
|
|
playlist_ids: list[int],
|
|
requested_by: str | None = None,
|
|
):
|
|
normalized_ids = _normalize_playlist_ids(playlist_ids)
|
|
if not normalized_ids:
|
|
return None
|
|
snapshot = env_manager.build_job_snapshot()
|
|
sources = _normalize_allowed_list(
|
|
snapshot.get("sources") or snapshot.get("SOURCES"),
|
|
ALLOWED_COLLECT_SOURCES,
|
|
)
|
|
download_sources = _normalize_allowed_list(
|
|
snapshot.get("download_sources") or snapshot.get("DOWNLOAD_SOURCES"),
|
|
ALLOWED_DOWNLOAD_SOURCES,
|
|
)
|
|
job_id = repo.create_job(
|
|
job_type=job_type,
|
|
requested_by=requested_by,
|
|
config_snapshot=dict(snapshot),
|
|
sources=sources,
|
|
download_sources=download_sources,
|
|
playlist_scope={"playlist_ids": normalized_ids},
|
|
)
|
|
_ensure_job_stages(repo, job_id, job_type)
|
|
job = repo.get_job(job_id)
|
|
if job is None:
|
|
raise HTTPException(status_code=500, detail="job create failed")
|
|
return _serialize_job(job)
|
|
|
|
|
|
def _build_playlist_job(
|
|
repo: OpsRepository,
|
|
env_manager: CatalogsyncEnvManager,
|
|
*,
|
|
job_type: Literal["download_only", "sync_download", "sync_only"],
|
|
payload: PlaylistBulkRequest,
|
|
) -> dict[str, Any]:
|
|
playlist_ids = _normalize_playlist_ids(payload.playlist_ids)
|
|
if not playlist_ids:
|
|
raise HTTPException(status_code=422, detail="playlist_ids is required")
|
|
job = _create_scoped_playlist_job(
|
|
repo,
|
|
env_manager,
|
|
job_type=job_type,
|
|
playlist_ids=playlist_ids,
|
|
requested_by=payload.requested_by,
|
|
)
|
|
if job is None:
|
|
raise HTTPException(status_code=500, detail="job create failed")
|
|
return {"job": job}
|
|
|
|
|
|
def _build_adaptive_download_jobs(
|
|
repo: OpsRepository,
|
|
catalog_repo: CatalogRepository,
|
|
env_manager: CatalogsyncEnvManager,
|
|
*,
|
|
payload: PlaylistBulkRequest,
|
|
) -> dict[str, Any]:
|
|
playlist_ids = _normalize_playlist_ids(payload.playlist_ids)
|
|
if not playlist_ids:
|
|
raise HTTPException(status_code=422, detail="playlist_ids is required")
|
|
|
|
rows_by_id = _playlist_export_rows_by_id_or_404(catalog_repo, playlist_ids)
|
|
|
|
download_playlist_ids: list[int] = []
|
|
sync_download_playlist_ids: list[int] = []
|
|
skipped_downloaded_playlist_ids: list[int] = []
|
|
|
|
for playlist_id in playlist_ids:
|
|
row = rows_by_id.get(int(playlist_id))
|
|
if row is None:
|
|
continue
|
|
state_code = str(row.get("state_code") or "")
|
|
if state_code == "unsynced":
|
|
sync_download_playlist_ids.append(int(playlist_id))
|
|
continue
|
|
if state_code == "downloaded":
|
|
skipped_downloaded_playlist_ids.append(int(playlist_id))
|
|
continue
|
|
download_playlist_ids.append(int(playlist_id))
|
|
|
|
download_job = _create_scoped_playlist_job(
|
|
repo,
|
|
env_manager,
|
|
job_type="download_only",
|
|
playlist_ids=download_playlist_ids,
|
|
requested_by=payload.requested_by,
|
|
)
|
|
sync_download_job = _create_scoped_playlist_job(
|
|
repo,
|
|
env_manager,
|
|
job_type="sync_download",
|
|
playlist_ids=sync_download_playlist_ids,
|
|
requested_by=payload.requested_by,
|
|
)
|
|
|
|
jobs = [job for job in (download_job, sync_download_job) if job is not None]
|
|
response: dict[str, Any] = {
|
|
"playlist_ids": playlist_ids,
|
|
"download_playlist_ids": download_playlist_ids,
|
|
"sync_download_playlist_ids": sync_download_playlist_ids,
|
|
"skipped_downloaded_playlist_ids": skipped_downloaded_playlist_ids,
|
|
"download_job": download_job,
|
|
"sync_download_job": sync_download_job,
|
|
"job": jobs[0] if len(jobs) == 1 else None,
|
|
}
|
|
if not jobs:
|
|
response["message"] = "no runnable playlist download jobs were created"
|
|
return response
|
|
|
|
|
|
def _playlist_export_rows_by_id_or_404(
|
|
catalog_repo: CatalogRepository,
|
|
playlist_ids: list[int],
|
|
) -> dict[int, dict[str, Any]]:
|
|
playlist_rows = catalog_repo.list_playlist_export_state_rows(playlist_ids)
|
|
if not playlist_rows:
|
|
raise HTTPException(status_code=404, detail="playlist not found")
|
|
rows_by_id = {int(row["id"]): dict(row) for row in playlist_rows}
|
|
missing_playlist_ids = [
|
|
int(playlist_id) for playlist_id in playlist_ids if int(playlist_id) not in rows_by_id
|
|
]
|
|
if missing_playlist_ids:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail={
|
|
"message": "playlist not found",
|
|
"missing_playlist_ids": missing_playlist_ids,
|
|
},
|
|
)
|
|
return rows_by_id
|
|
|
|
|
|
def create_app(
|
|
db_path: str | Path,
|
|
env_path: str | Path,
|
|
*,
|
|
start_runner: bool = False,
|
|
runner_sleep_seconds: float = 1.0,
|
|
) -> FastAPI:
|
|
db_file = Path(db_path)
|
|
env_file = Path(env_path)
|
|
initialize_database(db_file).close()
|
|
initialize_resolver_stats_database(default_resolver_stats_db_path(db_file)).close()
|
|
|
|
repo = OpsRepository(db_file)
|
|
catalog_repo = CatalogRepository(db_file)
|
|
maintenance_service = LocalMaintenanceService(db_file)
|
|
env_manager = CatalogsyncEnvManager(
|
|
db_path=db_file,
|
|
env_file_path=env_file,
|
|
repository=repo,
|
|
)
|
|
templates_dir = Path(__file__).resolve().parents[1] / "templates"
|
|
static_dir = Path(__file__).resolve().parents[1] / "static"
|
|
templates = Jinja2Templates(directory=str(templates_dir))
|
|
|
|
runner_stop_event: threading.Event | None = None
|
|
runner_thread: threading.Thread | None = None
|
|
|
|
if start_runner:
|
|
runner = OpsRunner(repository=repo, sleep_seconds=runner_sleep_seconds)
|
|
runner_restart_delay = max(min(float(runner_sleep_seconds), 1.0), 0.1)
|
|
|
|
def _run_runner_with_supervision(stop_event: threading.Event) -> None:
|
|
while not stop_event.is_set():
|
|
try:
|
|
LOGGER.info("Starting embedded ops runner")
|
|
runner.run_forever(stop_event)
|
|
LOGGER.info("Embedded ops runner stopped")
|
|
return
|
|
except Exception:
|
|
LOGGER.exception(
|
|
"Embedded ops runner crashed; restarting in %.2f seconds",
|
|
runner_restart_delay,
|
|
)
|
|
if stop_event.wait(runner_restart_delay):
|
|
return
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(_: FastAPI):
|
|
nonlocal runner_stop_event, runner_thread
|
|
if start_runner:
|
|
if runner_thread is None or not runner_thread.is_alive():
|
|
runner_stop_event = threading.Event()
|
|
runner_thread = threading.Thread(
|
|
target=_run_runner_with_supervision,
|
|
args=(runner_stop_event,),
|
|
daemon=True,
|
|
name="catalogsync-ops-runner",
|
|
)
|
|
runner_thread.start()
|
|
try:
|
|
yield
|
|
finally:
|
|
if runner_stop_event is not None:
|
|
runner_stop_event.set()
|
|
if runner_thread is not None:
|
|
runner_thread.join(timeout=5.0)
|
|
|
|
app = FastAPI(title="Catalogsync Operations Console", lifespan=lifespan)
|
|
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
|
|
|
|
@app.get("/", include_in_schema=False)
|
|
def index_redirect():
|
|
return RedirectResponse(url="/dashboard", status_code=307)
|
|
|
|
@app.get("/dashboard", response_class=HTMLResponse)
|
|
def dashboard(request: Request):
|
|
dashboard_payload = _dashboard_payload(repo, include_task_rows=True)
|
|
task_rows = dashboard_payload.get("task_rows", [])
|
|
doing_task_rows, done_task_rows = _split_dashboard_task_rows(task_rows)
|
|
env_values = env_manager.load_current()
|
|
return templates.TemplateResponse(
|
|
name="ops/dashboard.html",
|
|
request=request,
|
|
context={
|
|
"title": "Dashboard",
|
|
"sse_url": "/api/events/stream",
|
|
"dashboard_api_url": "/api/dashboard",
|
|
"summary": dashboard_payload["summary"],
|
|
"download_stats": dashboard_payload["download_stats"],
|
|
"transfer_stats": dashboard_payload["transfer_stats"],
|
|
"task_rows": task_rows,
|
|
"doing_task_rows": doing_task_rows,
|
|
"done_task_rows": done_task_rows,
|
|
"workers": dashboard_payload["workers"],
|
|
"running_items": dashboard_payload["running_items"],
|
|
"playlist_sources": dashboard_payload["playlist_sources"],
|
|
"active_job": dashboard_payload["active_job"],
|
|
"jobs": dashboard_payload["jobs"],
|
|
"maintenance_local_duplicates_scan_api": "/api/maintenance/local-duplicates",
|
|
"maintenance_local_duplicates_dedupe_api": "/api/maintenance/local-duplicates/dedupe",
|
|
"job_type_options": [
|
|
("catalog_sync", "Collect + Sync + Download"),
|
|
("collect_only", "Collect Only"),
|
|
("sync_only", "Sync Only"),
|
|
("sync_download", "Sync + Download"),
|
|
("download_only", "Download Only"),
|
|
("upload_only", "Upload Only"),
|
|
("download_upload", "Download + Upload"),
|
|
],
|
|
"default_sources": ",".join(
|
|
_normalize_allowed_list(
|
|
env_values.get("SOURCES") or "netease,qq,kuwo",
|
|
ALLOWED_COLLECT_SOURCES,
|
|
)
|
|
or ["netease", "qq", "kuwo"]
|
|
),
|
|
"default_download_sources": ",".join(
|
|
_normalize_allowed_list(
|
|
env_values.get("DOWNLOAD_SOURCES") or env_values.get("download_sources"),
|
|
ALLOWED_DOWNLOAD_SOURCES,
|
|
)
|
|
or ["qq", "kuwo", "migu", "qianqian", "kugou", "netease"]
|
|
),
|
|
},
|
|
)
|
|
|
|
@app.get("/jobs", response_class=HTMLResponse)
|
|
def jobs_page(request: Request):
|
|
return templates.TemplateResponse(
|
|
name="ops/jobs.html",
|
|
request=request,
|
|
context={"title": "Jobs", "jobs": _list_jobs(repo, limit=200)},
|
|
)
|
|
|
|
@app.get("/jobs/{job_id}", response_class=HTMLResponse)
|
|
def job_detail_page(request: Request, job_id: int):
|
|
payload = _job_detail_payload(repo, job_id)
|
|
return templates.TemplateResponse(
|
|
name="ops/job_detail.html",
|
|
request=request,
|
|
context={
|
|
"title": f"Job {job_id}",
|
|
"job": payload["job"],
|
|
"stages": payload["stages"],
|
|
"commands": payload["commands"],
|
|
"workers": payload["workers"],
|
|
"running_items": payload["running_items"],
|
|
"download_stats": payload["download_stats"],
|
|
"playlist_progress": payload["playlist_progress"],
|
|
"command_endpoint": f"/api/jobs/{job_id}/commands",
|
|
},
|
|
)
|
|
|
|
@app.get("/playlists", response_class=HTMLResponse)
|
|
def playlists_page(
|
|
request: Request,
|
|
page: int = Query(default=1, ge=1),
|
|
page_size: int = Query(default=50),
|
|
platform: str | None = Query(default=None),
|
|
pool_kind: str | None = Query(default=None),
|
|
status: str | None = Query(default=None),
|
|
keyword: str | None = Query(default=None),
|
|
wanted_only: str | None = Query(default=None),
|
|
sort_by: str | None = Query(default=None),
|
|
sort_dir: str | None = Query(default=None),
|
|
):
|
|
normalized_page_size = _validate_playlist_page_size(int(page_size))
|
|
normalized_platform = _normalize_optional_filter(platform)
|
|
normalized_pool_kind = _normalize_optional_filter(pool_kind)
|
|
normalized_status = _normalize_optional_filter(status)
|
|
normalized_keyword = _normalize_optional_filter(keyword)
|
|
normalized_wanted_only = _parse_optional_bool_query(wanted_only, field_name="wanted_only")
|
|
normalized_sort_by, normalized_sort_dir = _normalize_playlist_sort(sort_by, sort_dir)
|
|
playlist_page = _list_playlist_page_or_422(
|
|
catalog_repo,
|
|
page=page,
|
|
page_size=normalized_page_size,
|
|
platform=normalized_platform,
|
|
pool_kind=normalized_pool_kind,
|
|
status=normalized_status,
|
|
keyword=normalized_keyword,
|
|
wanted_only=normalized_wanted_only,
|
|
sort_by=normalized_sort_by,
|
|
sort_dir=normalized_sort_dir,
|
|
)
|
|
filters = {
|
|
"page": int(page),
|
|
"page_size": normalized_page_size,
|
|
"platform": normalized_platform or "",
|
|
"pool_kind": normalized_pool_kind or "",
|
|
"status": normalized_status or "",
|
|
"keyword": normalized_keyword or "",
|
|
"wanted_only": normalized_wanted_only,
|
|
"sort_by": normalized_sort_by or "",
|
|
"sort_dir": normalized_sort_dir or "",
|
|
}
|
|
return templates.TemplateResponse(
|
|
name="ops/playlists.html",
|
|
request=request,
|
|
context={
|
|
"title": "Playlists",
|
|
"playlist_page": playlist_page,
|
|
"playlists": playlist_page["items"],
|
|
"filters": filters,
|
|
"sort_links": _build_playlist_sort_links(filters),
|
|
"previous_page_url": (
|
|
"?" + _playlist_query_string(filters, overrides={"page": playlist_page["page"] - 1})
|
|
if playlist_page["page"] > 1
|
|
else None
|
|
),
|
|
"next_page_url": (
|
|
"?" + _playlist_query_string(filters, overrides={"page": playlist_page["page"] + 1})
|
|
if playlist_page["page"] < playlist_page["total_pages"]
|
|
else None
|
|
),
|
|
"filter_options": _playlist_filter_options(catalog_repo),
|
|
"playlist_sources": _playlist_source_stats(repo),
|
|
"default_sources": ",".join(
|
|
_normalize_allowed_list(
|
|
env_manager.load_current().get("SOURCES") or "netease,qq,kuwo",
|
|
ALLOWED_COLLECT_SOURCES,
|
|
)
|
|
or ["netease", "qq", "kuwo"]
|
|
),
|
|
},
|
|
)
|
|
|
|
@app.get("/songs", response_class=HTMLResponse)
|
|
def songs_page(request: Request):
|
|
rows = repo._fetchall(
|
|
"""
|
|
SELECT id, platform, remote_song_id, name, singers, updated_at
|
|
FROM songs
|
|
ORDER BY id DESC
|
|
LIMIT 200
|
|
"""
|
|
)
|
|
return templates.TemplateResponse(
|
|
name="ops/songs.html",
|
|
request=request,
|
|
context={
|
|
"title": "Songs",
|
|
"songs": [dict(row) for row in rows],
|
|
"download_stats": _download_stats(repo),
|
|
"workers": _dashboard_workers(repo),
|
|
"running_items": _dashboard_running_items(repo),
|
|
},
|
|
)
|
|
|
|
@app.get("/logs", response_class=HTMLResponse)
|
|
def logs_page(request: Request):
|
|
events = repo._fetchall(
|
|
"""
|
|
SELECT id, job_run_id, event_type, message, created_at
|
|
FROM job_events
|
|
ORDER BY id DESC
|
|
LIMIT 200
|
|
"""
|
|
)
|
|
return templates.TemplateResponse(
|
|
name="ops/logs.html",
|
|
request=request,
|
|
context={"title": "Logs", "events": [dict(row) for row in events]},
|
|
)
|
|
|
|
@app.get("/config", response_class=HTMLResponse)
|
|
def config_page(request: Request):
|
|
return templates.TemplateResponse(
|
|
name="ops/config.html",
|
|
request=request,
|
|
context={
|
|
"title": "Config",
|
|
"env_content": _read_env_text(env_file),
|
|
"env_values": env_manager.load_current(),
|
|
"revisions": env_manager.list_revisions(limit=50),
|
|
},
|
|
)
|
|
|
|
@app.get("/api/dashboard")
|
|
def api_dashboard(include_task_rows: bool = Query(default=True)):
|
|
return _dashboard_payload(repo, include_task_rows=include_task_rows)
|
|
|
|
@app.get("/api/maintenance/local-duplicates")
|
|
def api_local_duplicates(sample_limit: int = Query(default=20, ge=1, le=200)):
|
|
return maintenance_service.scan_local_duplicates(sample_limit=sample_limit)
|
|
|
|
@app.post("/api/maintenance/local-duplicates/dedupe")
|
|
def api_local_duplicates_dedupe(sample_limit: int = Query(default=20, ge=1, le=200)):
|
|
try:
|
|
return maintenance_service.dedupe_local_duplicates(sample_limit=sample_limit)
|
|
except LocalDedupeBlockedError as exc:
|
|
raise HTTPException(status_code=409, detail=str(exc)) from exc
|
|
|
|
@app.get("/api/jobs")
|
|
def api_jobs(limit: int = Query(default=100, ge=1, le=500)):
|
|
return {"items": _list_jobs(repo, limit=limit)}
|
|
|
|
@app.get("/api/jobs/{job_id}")
|
|
def api_job_detail(job_id: int):
|
|
return _job_detail_payload(repo, job_id)
|
|
|
|
@app.get("/api/jobs/{job_id}/playlists/{playlist_id}/songs")
|
|
def api_job_playlist_songs(
|
|
job_id: int,
|
|
playlist_id: int,
|
|
limit: int = Query(default=500, ge=1, le=2000),
|
|
):
|
|
job = repo.get_job(job_id)
|
|
if job is None:
|
|
raise HTTPException(status_code=404, detail="job not found")
|
|
return {
|
|
"items": repo.list_job_playlist_song_progress(
|
|
job_id=job_id,
|
|
playlist_id=playlist_id,
|
|
limit=limit,
|
|
)
|
|
}
|
|
|
|
@app.get("/api/playlists/{playlist_id}/songs")
|
|
def api_playlist_songs(
|
|
playlist_id: int,
|
|
limit: int = Query(default=2000, ge=1, le=5000),
|
|
):
|
|
playlist_rows = catalog_repo.list_playlists_by_ids([playlist_id])
|
|
if not playlist_rows:
|
|
raise HTTPException(status_code=404, detail="playlist not found")
|
|
playlist_row = dict(playlist_rows[0])
|
|
return {
|
|
"playlist": {
|
|
"id": int(playlist_row["id"]),
|
|
"platform": str(playlist_row["platform"]),
|
|
"remote_playlist_id": str(playlist_row["remote_playlist_id"]),
|
|
"name": str(playlist_row["name"]),
|
|
"play_count": (
|
|
int(playlist_row["play_count"])
|
|
if playlist_row["play_count"] is not None
|
|
else None
|
|
),
|
|
},
|
|
"items": catalog_repo.list_playlist_song_details(playlist_id, limit=limit),
|
|
}
|
|
|
|
@app.get("/api/playlists/{playlist_id}/export-folder")
|
|
def api_playlist_export_folder(playlist_id: int):
|
|
playlist_rows = catalog_repo.list_playlists_by_ids([playlist_id])
|
|
if not playlist_rows:
|
|
raise HTTPException(status_code=404, detail="playlist not found")
|
|
playlist_row = dict(playlist_rows[0])
|
|
playlist_payload = {
|
|
"id": int(playlist_row["id"]),
|
|
"platform": str(playlist_row["platform"]),
|
|
"remote_playlist_id": str(playlist_row["remote_playlist_id"]),
|
|
"name": str(playlist_row["name"]),
|
|
"play_count": (
|
|
int(playlist_row["play_count"])
|
|
if playlist_row["play_count"] is not None
|
|
else None
|
|
),
|
|
}
|
|
env_values = env_manager.load_current()
|
|
playlists_root = _resolve_playlists_root(env_values, catalog_repo)
|
|
if playlists_root is None:
|
|
raise HTTPException(status_code=500, detail="playlists root is not configured")
|
|
|
|
folder_path = CatalogSyncService(
|
|
repository=catalog_repo,
|
|
playlists_root=playlists_root,
|
|
).ensure_playlist_artifacts_for_playlist(playlist_id)
|
|
if folder_path is None or not folder_path.exists():
|
|
folder_path = locate_playlist_dir(playlists_root, playlist_payload)
|
|
if folder_path is None or not folder_path.exists():
|
|
raise HTTPException(status_code=404, detail="playlist export folder not found")
|
|
|
|
return {
|
|
"playlist": playlist_payload,
|
|
"folder": {
|
|
"path": str(folder_path.resolve()),
|
|
"exists": True,
|
|
"files": _collect_folder_listing(folder_path),
|
|
},
|
|
}
|
|
|
|
@app.get("/api/playlists/{playlist_id}/export.zip")
|
|
def api_playlist_export_zip(playlist_id: int):
|
|
playlist_rows = catalog_repo.list_playlist_export_state_rows([playlist_id])
|
|
if not playlist_rows:
|
|
raise HTTPException(status_code=404, detail="playlist not found")
|
|
playlist_row = dict(playlist_rows[0])
|
|
state_code = str(playlist_row.get("state_code") or "")
|
|
if state_code != "downloaded":
|
|
return JSONResponse(
|
|
status_code=409,
|
|
content={
|
|
"playlist_id": int(playlist_row["id"]),
|
|
"state_code": state_code,
|
|
"message": "playlist is not ready for zip export",
|
|
},
|
|
)
|
|
|
|
env_values = env_manager.load_current()
|
|
playlists_root = _resolve_playlists_root(env_values, catalog_repo)
|
|
if playlists_root is None:
|
|
raise HTTPException(status_code=500, detail="playlists root is not configured")
|
|
export_service = CatalogSyncService(repository=catalog_repo, playlists_root=playlists_root)
|
|
playlist_payload = {
|
|
"id": int(playlist_row["id"]),
|
|
"platform": str(playlist_row.get("platform") or ""),
|
|
"remote_playlist_id": str(playlist_row.get("remote_playlist_id") or ""),
|
|
"name": str(playlist_row.get("name") or ""),
|
|
}
|
|
folder_path = export_service.ensure_playlist_artifacts_for_playlist(int(playlist_row["id"]))
|
|
if folder_path is None or not folder_path.exists():
|
|
folder_path = locate_playlist_dir(playlists_root, playlist_payload)
|
|
if folder_path is None or not folder_path.exists():
|
|
raise HTTPException(status_code=404, detail="playlist export folder not found")
|
|
|
|
bundle_path = create_single_playlist_bundle(
|
|
playlist_dir=folder_path,
|
|
bundle_root=default_bundle_root(),
|
|
platform=str(playlist_row.get("platform") or ""),
|
|
playlist_id=int(playlist_row["id"]),
|
|
playlist_name=str(playlist_row.get("name") or ""),
|
|
)
|
|
return FileResponse(
|
|
path=str(bundle_path),
|
|
media_type="application/zip",
|
|
filename=bundle_download_filename(bundle_path),
|
|
)
|
|
|
|
@app.get("/api/exports/bundles/{bundle_name}.zip")
|
|
def api_export_bundle_download(bundle_name: str):
|
|
bundle_path = resolve_bundle_download_path(default_bundle_root(), bundle_name)
|
|
if bundle_path is None or not bundle_path.exists() or not bundle_path.is_file():
|
|
raise HTTPException(status_code=404, detail="bundle not found")
|
|
return FileResponse(
|
|
path=str(bundle_path),
|
|
media_type="application/zip",
|
|
filename=bundle_download_filename(bundle_path),
|
|
)
|
|
|
|
@app.get("/api/playlists")
|
|
def api_playlists(
|
|
page: int = Query(default=1, ge=1),
|
|
page_size: int = Query(default=50),
|
|
platform: str | None = Query(default=None),
|
|
pool_kind: str | None = Query(default=None),
|
|
status: str | None = Query(default=None),
|
|
keyword: str | None = Query(default=None),
|
|
wanted_only: str | None = Query(default=None),
|
|
sort_by: str | None = Query(default=None),
|
|
sort_dir: str | None = Query(default=None),
|
|
):
|
|
normalized_page_size = _validate_playlist_page_size(int(page_size))
|
|
normalized_platform = _normalize_optional_filter(platform)
|
|
normalized_pool_kind = _normalize_optional_filter(pool_kind)
|
|
normalized_status = _normalize_optional_filter(status)
|
|
normalized_keyword = _normalize_optional_filter(keyword)
|
|
normalized_wanted_only = _parse_optional_bool_query(wanted_only, field_name="wanted_only")
|
|
normalized_sort_by, normalized_sort_dir = _normalize_playlist_sort(sort_by, sort_dir)
|
|
return _list_playlist_page_or_422(
|
|
catalog_repo,
|
|
page=page,
|
|
page_size=normalized_page_size,
|
|
platform=normalized_platform,
|
|
pool_kind=normalized_pool_kind,
|
|
status=normalized_status,
|
|
keyword=normalized_keyword,
|
|
wanted_only=normalized_wanted_only,
|
|
sort_by=normalized_sort_by,
|
|
sort_dir=normalized_sort_dir,
|
|
)
|
|
|
|
@app.post("/api/playlists/mark-wanted")
|
|
def api_mark_playlists_wanted(payload: PlaylistBulkRequest):
|
|
playlist_ids = _normalize_playlist_ids(payload.playlist_ids)
|
|
if not playlist_ids:
|
|
raise HTTPException(status_code=422, detail="playlist_ids is required")
|
|
catalog_repo.mark_playlists_wanted(playlist_ids, marked_by=payload.marked_by or payload.requested_by)
|
|
return {"playlist_ids": playlist_ids, "marked_count": len(playlist_ids)}
|
|
|
|
@app.post("/api/playlists/unmark-wanted")
|
|
def api_unmark_playlists_wanted(payload: PlaylistBulkRequest):
|
|
playlist_ids = _normalize_playlist_ids(payload.playlist_ids)
|
|
if not playlist_ids:
|
|
raise HTTPException(status_code=422, detail="playlist_ids is required")
|
|
catalog_repo.unmark_playlists_wanted(playlist_ids)
|
|
return {"playlist_ids": playlist_ids, "unmarked_count": len(playlist_ids)}
|
|
|
|
@app.post("/api/playlists/download", status_code=201)
|
|
def api_download_selected_playlists(payload: PlaylistBulkRequest):
|
|
return _build_adaptive_download_jobs(
|
|
repo,
|
|
catalog_repo,
|
|
env_manager,
|
|
payload=payload,
|
|
)
|
|
|
|
@app.post("/api/playlists/sync", status_code=201)
|
|
def api_sync_selected_playlists(payload: PlaylistBulkRequest):
|
|
return _build_playlist_job(repo, env_manager, job_type="sync_only", payload=payload)
|
|
|
|
@app.post("/api/playlists/sync-download", status_code=201)
|
|
def api_sync_download_selected_playlists(payload: PlaylistBulkRequest):
|
|
return _build_playlist_job(repo, env_manager, job_type="sync_download", payload=payload)
|
|
|
|
@app.post("/api/playlists/export")
|
|
def api_export_selected_playlists(payload: PlaylistBulkRequest):
|
|
playlist_ids = _normalize_playlist_ids(payload.playlist_ids)
|
|
if not playlist_ids:
|
|
raise HTTPException(status_code=422, detail="playlist_ids is required")
|
|
|
|
env_values = env_manager.load_current()
|
|
playlists_root = _resolve_playlists_root(env_values, catalog_repo)
|
|
if playlists_root is None:
|
|
raise HTTPException(status_code=500, detail="playlists root is not configured")
|
|
|
|
export_service = CatalogSyncService(
|
|
repository=catalog_repo,
|
|
playlists_root=playlists_root,
|
|
)
|
|
rows_by_id = _playlist_export_rows_by_id_or_404(catalog_repo, playlist_ids)
|
|
exported_playlist_ids: list[int] = []
|
|
download_playlist_ids: list[int] = []
|
|
sync_download_playlist_ids: list[int] = []
|
|
|
|
for playlist_id in playlist_ids:
|
|
row = rows_by_id.get(int(playlist_id))
|
|
if row is None:
|
|
continue
|
|
state_code = str(row.get("state_code") or "")
|
|
if state_code == "downloaded":
|
|
folder_path = export_service.ensure_playlist_artifacts_for_playlist(int(playlist_id))
|
|
if folder_path is not None:
|
|
exported_playlist_ids.append(int(playlist_id))
|
|
continue
|
|
if state_code == "unsynced":
|
|
sync_download_playlist_ids.append(int(playlist_id))
|
|
continue
|
|
download_playlist_ids.append(int(playlist_id))
|
|
|
|
download_job = _create_scoped_playlist_job(
|
|
repo,
|
|
env_manager,
|
|
job_type="download_only",
|
|
playlist_ids=download_playlist_ids,
|
|
requested_by=payload.requested_by,
|
|
)
|
|
sync_download_job = _create_scoped_playlist_job(
|
|
repo,
|
|
env_manager,
|
|
job_type="sync_download",
|
|
playlist_ids=sync_download_playlist_ids,
|
|
requested_by=payload.requested_by,
|
|
)
|
|
|
|
return {
|
|
"playlist_ids": playlist_ids,
|
|
"exported_playlist_ids": exported_playlist_ids,
|
|
"exported_count": len(exported_playlist_ids),
|
|
"download_job": download_job,
|
|
"sync_download_job": sync_download_job,
|
|
}
|
|
|
|
@app.post("/api/playlists/export-zip")
|
|
def api_export_selected_playlists_zip(payload: PlaylistBulkRequest):
|
|
playlist_ids = _normalize_playlist_ids(payload.playlist_ids)
|
|
if not playlist_ids:
|
|
raise HTTPException(status_code=422, detail="playlist_ids is required")
|
|
|
|
rows_by_id = _playlist_export_rows_by_id_or_404(catalog_repo, playlist_ids)
|
|
|
|
ready_playlist_ids: list[int] = []
|
|
blocked_playlist_ids: list[int] = []
|
|
download_playlist_ids: list[int] = []
|
|
sync_download_playlist_ids: list[int] = []
|
|
|
|
for playlist_id in playlist_ids:
|
|
row = rows_by_id.get(int(playlist_id))
|
|
if row is None:
|
|
continue
|
|
state_code = str(row.get("state_code") or "")
|
|
if state_code == "downloaded":
|
|
ready_playlist_ids.append(int(playlist_id))
|
|
continue
|
|
blocked_playlist_ids.append(int(playlist_id))
|
|
if state_code == "unsynced":
|
|
sync_download_playlist_ids.append(int(playlist_id))
|
|
continue
|
|
download_playlist_ids.append(int(playlist_id))
|
|
|
|
if blocked_playlist_ids:
|
|
return {
|
|
"status": "queued",
|
|
"message": "selected playlists include non-ready items; export zip queued",
|
|
"playlist_ids": playlist_ids,
|
|
"ready_playlist_ids": ready_playlist_ids,
|
|
"blocked_playlist_ids": blocked_playlist_ids,
|
|
"download_job": _create_scoped_playlist_job(
|
|
repo,
|
|
env_manager,
|
|
job_type="download_only",
|
|
playlist_ids=download_playlist_ids,
|
|
requested_by=payload.requested_by,
|
|
),
|
|
"sync_download_job": _create_scoped_playlist_job(
|
|
repo,
|
|
env_manager,
|
|
job_type="sync_download",
|
|
playlist_ids=sync_download_playlist_ids,
|
|
requested_by=payload.requested_by,
|
|
),
|
|
}
|
|
|
|
env_values = env_manager.load_current()
|
|
playlists_root = _resolve_playlists_root(env_values, catalog_repo)
|
|
if playlists_root is None:
|
|
raise HTTPException(status_code=500, detail="playlists root is not configured")
|
|
export_service = CatalogSyncService(repository=catalog_repo, playlists_root=playlists_root)
|
|
|
|
playlist_dirs: list[Path] = []
|
|
for playlist_id in ready_playlist_ids:
|
|
row = rows_by_id.get(int(playlist_id))
|
|
if row is None:
|
|
continue
|
|
playlist_payload = {
|
|
"id": int(row["id"]),
|
|
"platform": str(row.get("platform") or ""),
|
|
"remote_playlist_id": str(row.get("remote_playlist_id") or ""),
|
|
"name": str(row.get("name") or ""),
|
|
}
|
|
folder_path = export_service.ensure_playlist_artifacts_for_playlist(playlist_id)
|
|
if folder_path is None or not folder_path.exists():
|
|
folder_path = locate_playlist_dir(playlists_root, playlist_payload)
|
|
if folder_path is None or not folder_path.exists():
|
|
raise HTTPException(status_code=404, detail="playlist export folder not found")
|
|
playlist_dirs.append(folder_path)
|
|
|
|
bundle_path = create_multi_playlist_bundle(
|
|
playlist_dirs=playlist_dirs,
|
|
bundle_root=default_bundle_root(),
|
|
)
|
|
return {
|
|
"status": "ready",
|
|
"playlist_ids": ready_playlist_ids,
|
|
"download_url": f"/api/exports/bundles/{bundle_path.stem}.zip",
|
|
}
|
|
|
|
@app.post("/api/jobs", status_code=201)
|
|
def api_create_job(payload: JobCreateRequest):
|
|
snapshot = payload.config_snapshot or env_manager.build_job_snapshot()
|
|
sources = _normalize_allowed_list(list(payload.sources), ALLOWED_COLLECT_SOURCES) or _normalize_allowed_list(
|
|
snapshot.get("sources") or snapshot.get("SOURCES"),
|
|
ALLOWED_COLLECT_SOURCES,
|
|
)
|
|
download_sources = _normalize_allowed_list(
|
|
list(payload.download_sources),
|
|
ALLOWED_DOWNLOAD_SOURCES,
|
|
) or _normalize_allowed_list(
|
|
snapshot.get("download_sources") or snapshot.get("DOWNLOAD_SOURCES"),
|
|
ALLOWED_DOWNLOAD_SOURCES,
|
|
)
|
|
job_id = repo.create_job(
|
|
job_type=payload.job_type,
|
|
requested_by=payload.requested_by,
|
|
config_snapshot=dict(snapshot),
|
|
sources=sources,
|
|
download_sources=download_sources,
|
|
playlist_scope=payload.playlist_scope or {},
|
|
)
|
|
_ensure_job_stages(repo, job_id, payload.job_type)
|
|
job = repo.get_job(job_id)
|
|
if job is None:
|
|
raise HTTPException(status_code=500, detail="job create failed")
|
|
return {"job": _serialize_job(job)}
|
|
|
|
@app.post("/api/jobs/{job_id}/commands", status_code=201)
|
|
def api_create_job_command(job_id: int, payload: JobCommandRequest):
|
|
job = repo.get_job(job_id)
|
|
if job is None:
|
|
raise HTTPException(status_code=404, detail="job not found")
|
|
command_type = payload.command_type
|
|
if command_type in RETRY_COMMAND_TYPES and payload.target_item_id is None:
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail="target_item_id is required for retry commands",
|
|
)
|
|
command_id = repo.create_command(
|
|
job_run_id=job_id,
|
|
command_type=command_type,
|
|
target_item_id=payload.target_item_id,
|
|
payload=payload.payload,
|
|
)
|
|
return {"job_id": int(job_id), "command_id": int(command_id), "command_type": command_type}
|
|
|
|
@app.get("/api/config/env")
|
|
def api_get_env():
|
|
return {
|
|
"path": str(env_file.resolve()),
|
|
"content": _read_env_text(env_file),
|
|
"parsed": env_manager.load_current(),
|
|
}
|
|
|
|
@app.put("/api/config/env")
|
|
def api_put_env(payload: EnvUpdateRequest):
|
|
env_file.parent.mkdir(parents=True, exist_ok=True)
|
|
env_file.write_text(payload.content, encoding="utf-8")
|
|
revision_id = env_manager.save_revision(note=payload.note)
|
|
revision = repo.get_config_revision(revision_id)
|
|
return {
|
|
"saved": True,
|
|
"revision": revision,
|
|
"content": _read_env_text(env_file),
|
|
"parsed": env_manager.load_current(),
|
|
}
|
|
|
|
@app.get("/api/config/revisions")
|
|
def api_get_revisions(limit: int = Query(default=50, ge=1, le=500)):
|
|
return {"items": env_manager.list_revisions(limit=limit)}
|
|
|
|
@app.post("/api/config/revisions/{revision_id}/apply")
|
|
def api_apply_revision(revision_id: int):
|
|
try:
|
|
env_manager.apply_revision(revision_id)
|
|
except ValueError:
|
|
raise HTTPException(status_code=404, detail="revision not found") from None
|
|
revision = repo.get_config_revision(revision_id)
|
|
return {
|
|
"applied": True,
|
|
"revision": revision,
|
|
"content": _read_env_text(env_file),
|
|
"parsed": env_manager.load_current(),
|
|
}
|
|
|
|
@app.get("/api/events/stream")
|
|
def api_events_stream(once: bool = Query(default=False)):
|
|
def event_stream():
|
|
while True:
|
|
snapshot = _dashboard_payload(repo)
|
|
payload = json.dumps(snapshot, ensure_ascii=False)
|
|
yield f"event: snapshot\ndata: {payload}\n\n"
|
|
if once:
|
|
break
|
|
yield ": heartbeat\n\n"
|
|
time.sleep(LIVE_DASHBOARD_SNAPSHOT_INTERVAL_SECONDS)
|
|
|
|
return StreamingResponse(
|
|
event_stream(),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"X-Accel-Buffering": "no",
|
|
},
|
|
)
|
|
|
|
return app
|