# Catalogsync Operations Console Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Build a NAS-local operations console for `musicdl.catalogsync` with queue-based `collect/sync/download/upload` jobs, soft pause and resume, crash-safe recovery, song-level retry, worker visibility, and `catalogsync.env` configuration management. **Architecture:** Add a new `musicdl.catalogsync.ops` package for execution-state persistence, env revision management, job orchestration, and FastAPI-backed UI/API endpoints. Reuse the existing catalog tables and execution services by wrapping them in stage and item executors rather than rebuilding the domain logic from scratch. **Tech Stack:** Python 3, unittest, SQLite, FastAPI, Jinja2, Server-Sent Events, existing `musicdl.catalogsync` services/downloader/uploader, NAS shell templates. --- ## File Structure ### New files - `musicdl/catalogsync/ops/__init__.py` - exports the operations-console public entry points - `musicdl/catalogsync/ops/models.py` - enums, dataclasses, and small helpers for job, stage, item, and worker states - `musicdl/catalogsync/ops/repository.py` - CRUD for `job_runs`, `job_stages`, `job_items`, `job_workers`, `job_commands`, `job_events`, `job_logs`, and `config_revisions` - `musicdl/catalogsync/ops/config.py` - load, validate, snapshot, version, and apply `catalogsync.env` - `musicdl/catalogsync/ops/executors.py` - adapters that run `collect`, `sync`, `download`, and `upload` one work item at a time - `musicdl/catalogsync/ops/runner.py` - queue scheduler, command polling, pause/resume, and crash recovery - `musicdl/catalogsync/ops/web.py` - FastAPI app factory, API routes, page routes, and SSE stream - `musicdl/catalogsync/templates/ops/base.html` - shared layout - `musicdl/catalogsync/templates/ops/dashboard.html` - dashboard page - `musicdl/catalogsync/templates/ops/jobs.html` - queue and active-job page - `musicdl/catalogsync/templates/ops/job_detail.html` - per-job detail page - `musicdl/catalogsync/templates/ops/playlists.html` - playlist-pool and playlist status page - `musicdl/catalogsync/templates/ops/songs.html` - worker and song-processing page - `musicdl/catalogsync/templates/ops/logs.html` - log and exception page - `musicdl/catalogsync/templates/ops/config.html` - env editor and revision page - `musicdl/catalogsync/static/ops/app.js` - lightweight browser logic for SSE updates and page actions - `scripts/catalogsync/templates/serve_console.sh` - NAS runtime script for the web console - `tests/catalogsync/test_ops_db.py` - schema and repository coverage for operations tables - `tests/catalogsync/test_ops_config.py` - env loading, snapshot, revision, and apply tests - `tests/catalogsync/test_ops_runner.py` - state-machine, pause/resume, and recovery tests - `tests/catalogsync/test_ops_executors.py` - per-stage executor tests - `tests/catalogsync/test_ops_api.py` - FastAPI route and SSE tests ### Modified files - `musicdl/catalogsync/db.py` - create the new operations tables - `musicdl/catalogsync/repository.py` - add query helpers that the operations layer can reuse from catalog data - `musicdl/catalogsync/services.py` - expose a playlist-row based sync unit for the runner - `musicdl/catalogsync/downloader.py` - expose song-level download execution for one queued item - `musicdl/catalogsync/uploader.py` - expose upload-task-level execution for one queued item - `musicdl/catalogsync/cli.py` - add the `serve` command - `musicdl/catalogsync/runtime.py` - add web host, port, and config-path runtime fields - `scripts/catalogsync/templates/catalogsync.env.example` - add web-console settings - `scripts/catalogsync/templates/install_runtime.sh` - install any new web-console dependencies - `docs/catalogsync.md` - document the console workflow and runtime script - `README.md` - add a concise operations-console entry - `requirements.txt` - add the web-console runtime dependencies - `setup.py` - include templates/static assets in packaging - `MANIFEST.in` - include template/static files for source distributions - `tests/catalogsync/test_cli.py` - cover the new `serve` command - `tests/catalogsync/test_runtime.py` - cover the new runtime config fields ### Dependency notes - Add `fastapi` - Add `uvicorn` - Add `jinja2` - Add `python-multipart` These should be added only once and then reused by the `serve` command, tests, and NAS runtime script. ### Task 1: Add Operations Schema And Repository Primitives **Files:** - Create: `musicdl/catalogsync/ops/__init__.py` - Create: `musicdl/catalogsync/ops/models.py` - Create: `musicdl/catalogsync/ops/repository.py` - Modify: `musicdl/catalogsync/db.py` - Test: `tests/catalogsync/test_ops_db.py` - [ ] **Step 1: Write the failing schema and repository tests** ```python import sqlite3 import tempfile import unittest from contextlib import closing from pathlib import Path class OperationsSchemaTests(unittest.TestCase): def test_initialize_database_creates_operations_tables(self): from musicdl.catalogsync.db import initialize_database expected_tables = { "job_runs", "job_stages", "job_items", "job_workers", "job_commands", "job_events", "job_logs", "config_revisions", } with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" initialize_database(db_path).close() with closing(sqlite3.connect(db_path)) as conn: tables = { row[0] for row in conn.execute( "SELECT name FROM sqlite_master WHERE type = 'table'" ).fetchall() } self.assertTrue(expected_tables.issubset(tables)) ``` - [ ] **Step 2: Run the targeted test and verify it fails** Run: `python -m unittest tests.catalogsync.test_ops_db -v` Expected: - `FAIL` because the operations tables and repository module do not exist yet - [ ] **Step 3: Implement the schema, enums, and repository** ```python # musicdl/catalogsync/db.py REQUIRED_TABLES |= { "job_runs", "job_stages", "job_items", "job_workers", "job_commands", "job_events", "job_logs", "config_revisions", } ``` ```python # musicdl/catalogsync/ops/models.py from dataclasses import dataclass from enum import StrEnum class JobStatus(StrEnum): QUEUED = "queued" RUNNING = "running" PAUSE_REQUESTED = "pause_requested" PAUSED = "paused" COMPLETED = "completed" COMPLETED_WITH_ERRORS = "completed_with_errors" FAILED = "failed" CANCELED = "canceled" @dataclass(frozen=True) class JobCreateRequest: job_type: str config_snapshot: dict sources: list[str] | None = None ``` ```python # musicdl/catalogsync/ops/repository.py class OperationsRepository: def create_job_run(self, job_type: str, config_snapshot: dict, sources=None, download_sources=None, playlist_scope=None) -> int: with connect_database(self.db_path) as conn: cursor = conn.execute( """ INSERT INTO job_runs (job_type, config_snapshot_json, sources, download_sources, playlist_scope_json) VALUES (?, ?, ?, ?, ?) """, ( job_type, json.dumps(config_snapshot, ensure_ascii=False), ",".join(sources or []), ",".join(download_sources or []), json.dumps(playlist_scope or {}, ensure_ascii=False), ), ) return int(cursor.lastrowid) def create_job_stage(self, job_id: int, stage_type: str, seq_no: int) -> int: with connect_database(self.db_path) as conn: cursor = conn.execute( "INSERT INTO job_stages (job_run_id, stage_type, seq_no) VALUES (?, ?, ?)", (job_id, stage_type, seq_no), ) return int(cursor.lastrowid) def create_job_item(self, job_stage_id: int, item_type: str, item_key: str, **extra) -> int: with connect_database(self.db_path) as conn: cursor = conn.execute( """ INSERT INTO job_items (job_stage_id, item_type, item_key, playlist_pool_id, playlist_id, song_id, file_location_id, payload_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( job_stage_id, item_type, item_key, extra.get("playlist_pool_id"), extra.get("playlist_id"), extra.get("song_id"), extra.get("file_location_id"), json.dumps(extra.get("payload_json") or {}, ensure_ascii=False), ), ) return int(cursor.lastrowid) ``` - [ ] **Step 4: Run the targeted test and verify it passes** Run: `python -m unittest tests.catalogsync.test_ops_db -v` Expected: - `OK` - the test output shows the new operations tables are created - [ ] **Step 5: Commit** ```bash git add musicdl/catalogsync/db.py musicdl/catalogsync/ops/__init__.py musicdl/catalogsync/ops/models.py musicdl/catalogsync/ops/repository.py tests/catalogsync/test_ops_db.py git commit -m "feat: add operations schema and repository primitives" ``` ### Task 2: Add Env Revision Management And Job Config Snapshots **Files:** - Create: `musicdl/catalogsync/ops/config.py` - Modify: `musicdl/catalogsync/ops/repository.py` - Test: `tests/catalogsync/test_ops_config.py` - [ ] **Step 1: Write the failing config and revision tests** ```python import tempfile import unittest from pathlib import Path class EnvManagerTests(unittest.TestCase): def test_load_snapshot_and_save_revision(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.ops.config import CatalogsyncEnvManager from musicdl.catalogsync.ops.repository import OperationsRepository with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" env_path = Path(tmpdir) / "catalogsync.env" env_path.write_text( "LIBRARY_DIR=/volume4/Music_Cloud/library\nDOWNLOAD_SOURCES=qq,kuwo,migu\n", encoding="utf-8", ) initialize_database(db_path).close() repo = OperationsRepository(db_path) manager = CatalogsyncEnvManager(env_path=env_path, repository=repo) snapshot = manager.build_job_snapshot() revision_id = manager.save_revision(note="initial import") revisions = manager.list_revisions() self.assertEqual(["qq", "kuwo", "migu"], snapshot["download_sources"]) self.assertEqual(revision_id, revisions[0]["id"]) ``` - [ ] **Step 2: Run the targeted test and verify it fails** Run: `python -m unittest tests.catalogsync.test_ops_config -v` Expected: - `FAIL` because the env manager and config revision methods do not exist yet - [ ] **Step 3: Implement the env manager and revision methods** ```python # musicdl/catalogsync/ops/config.py class CatalogsyncEnvManager: SNAPSHOT_KEYS = { "DB_PATH", "LIBRARY_DIR", "DOWNLOAD_SOURCES", "OBJECT_BACKEND_NAME", "OBJECT_BUCKET", "OBJECT_ENDPOINT", } def load_current(self) -> dict[str, str]: values: dict[str, str] = {} for line in self.env_path.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) values[key.strip()] = value.strip() return values def build_job_snapshot(self) -> dict: current = self.load_current() return { "library_root": current.get("LIBRARY_DIR", ""), "download_sources": [item for item in current.get("DOWNLOAD_SOURCES", "").split(",") if item], "env_values": {key: current[key] for key in self.SNAPSHOT_KEYS if key in current}, } def save_revision(self, note: str | None = None) -> int: content = self.env_path.read_text(encoding="utf-8") digest = hashlib.sha256(content.encode("utf-8")).hexdigest() return self.repository.create_config_revision( file_path=str(self.env_path), content_text=content, content_hash=digest, note=note, ) def apply_revision(self, revision_id: int) -> None: row = self.repository.get_config_revision(revision_id) self.env_path.write_text(row["content_text"], encoding="utf-8") self.repository.mark_config_revision_applied(revision_id) ``` ```python # musicdl/catalogsync/ops/repository.py def create_config_revision(self, file_path: str, content_text: str, content_hash: str, note: str | None = None) -> int: with connect_database(self.db_path) as conn: cursor = conn.execute( """ INSERT INTO config_revisions (file_path, content_text, content_hash, note) VALUES (?, ?, ?, ?) """, (file_path, content_text, content_hash, note), ) return int(cursor.lastrowid) def get_config_revision(self, revision_id: int): with connect_database(self.db_path) as conn: return conn.execute("SELECT * FROM config_revisions WHERE id = ?", (revision_id,)).fetchone() ``` - [ ] **Step 4: Run the targeted test and verify it passes** Run: `python -m unittest tests.catalogsync.test_ops_config -v` Expected: - `OK` - revision apply rewrites the env file and marks the revision as applied - [ ] **Step 5: Commit** ```bash git add musicdl/catalogsync/ops/config.py musicdl/catalogsync/ops/repository.py tests/catalogsync/test_ops_config.py git commit -m "feat: add env revision and job snapshot management" ``` ### Task 3: Implement The Runner State Machine And Recovery Logic **Files:** - Create: `musicdl/catalogsync/ops/runner.py` - Modify: `musicdl/catalogsync/ops/models.py` - Modify: `musicdl/catalogsync/ops/repository.py` - Test: `tests/catalogsync/test_ops_runner.py` - [ ] **Step 1: Write the failing runner-state tests** ```python import tempfile import unittest from pathlib import Path class RunnerStateTests(unittest.TestCase): def test_recover_orphaned_jobs_converts_running_items_to_interrupted(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.ops.repository import OperationsRepository from musicdl.catalogsync.ops.runner import CatalogsyncRunner with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" initialize_database(db_path).close() repo = OperationsRepository(db_path) runner = CatalogsyncRunner(db_path=db_path, env_path=Path(tmpdir) / "catalogsync.env") job_id = repo.create_job_run("download_only", {"library_root": "/tmp/library"}) stage_id = repo.create_job_stage(job_id, "download", 1) item_id = repo.create_job_item(stage_id, "song", "song:1", song_id=1) repo.mark_job_running(job_id) repo.mark_stage_running(stage_id) repo.mark_item_running(item_id, worker_id=1) runner.recover_incomplete_jobs() job = repo.get_job_run(job_id) item = repo.get_job_item(item_id) self.assertEqual("paused", job["status"]) self.assertEqual("interrupted", item["status"]) ``` - [ ] **Step 2: Run the targeted test and verify it fails** Run: `python -m unittest tests.catalogsync.test_ops_runner -v` Expected: - `FAIL` because the runner, command flow, and recovery helpers do not exist yet - [ ] **Step 3: Implement the runner, command polling, and recovery** ```python # musicdl/catalogsync/ops/runner.py class CatalogsyncRunner: def recover_incomplete_jobs(self) -> None: for job in self.repository.list_recoverable_jobs(): self.repository.pause_job_for_recovery(int(job["id"])) for item in self.repository.list_running_items(int(job["id"])): self.repository.mark_item_interrupted(int(item["id"]), last_error="Runner restarted during execution") self.repository.add_job_event(int(job["id"]), "recovery_requeued", "Recovered job after runner restart") def apply_pending_commands(self) -> None: for command in self.repository.list_pending_commands(): if command["command_type"] == "pause": self.repository.request_job_pause(int(command["job_run_id"])) elif command["command_type"] == "resume": self.repository.resume_job(int(command["job_run_id"])) elif command["command_type"] == "retry_item": self.repository.requeue_item(int(command["target_item_id"]), force=False) elif command["command_type"] == "force_retry_item": self.repository.requeue_item(int(command["target_item_id"]), force=True) self.repository.mark_command_applied(int(command["id"])) def reconcile_pause_state(self, job_id: int) -> None: if self.repository.job_has_running_items(job_id): return self.repository.finalize_pause(job_id) def loop_once(self) -> None: self.apply_pending_commands() active_job = self.repository.claim_next_runnable_job() if active_job is None: return self.repository.mark_job_running(int(active_job["id"])) ``` ```python # musicdl/catalogsync/ops/repository.py def request_job_pause(self, job_id: int) -> None: with connect_database(self.db_path) as conn: conn.execute("UPDATE job_runs SET status = 'pause_requested' WHERE id = ?", (job_id,)) conn.execute( "UPDATE job_stages SET status = 'pause_requested' WHERE job_run_id = ? AND status = 'running'", (job_id,), ) def pause_job_for_recovery(self, job_id: int) -> None: with connect_database(self.db_path) as conn: conn.execute("UPDATE job_runs SET status = 'paused' WHERE id = ?", (job_id,)) conn.execute( "UPDATE job_stages SET status = 'paused' WHERE job_run_id = ? AND status IN ('running', 'pause_requested')", (job_id,), ) def mark_item_interrupted(self, item_id: int, last_error: str | None = None) -> None: with connect_database(self.db_path) as conn: conn.execute( "UPDATE job_items SET status = 'interrupted', worker_id = NULL, ended_at = CURRENT_TIMESTAMP, last_error = ? WHERE id = ?", (last_error, item_id), ) ``` - [ ] **Step 4: Run the targeted test and verify it passes** Run: `python -m unittest tests.catalogsync.test_ops_runner -v` Expected: - `OK` - orphaned running items become `interrupted` - soft pause waits until no item remains running before closing the stage - [ ] **Step 5: Commit** ```bash git add musicdl/catalogsync/ops/runner.py musicdl/catalogsync/ops/models.py musicdl/catalogsync/ops/repository.py tests/catalogsync/test_ops_runner.py git commit -m "feat: add operations runner state machine and recovery" ``` ### Task 4: Add Stage Executors And Single-Item Execution Hooks **Files:** - Create: `musicdl/catalogsync/ops/executors.py` - Modify: `musicdl/catalogsync/services.py` - Modify: `musicdl/catalogsync/downloader.py` - Modify: `musicdl/catalogsync/uploader.py` - Modify: `musicdl/catalogsync/ops/repository.py` - Test: `tests/catalogsync/test_ops_executors.py` - [ ] **Step 1: Write the failing executor integration tests** ```python import tempfile import unittest from pathlib import Path from unittest.mock import patch class StageExecutorTests(unittest.TestCase): def test_download_executor_marks_item_succeeded(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.ops.executors import DownloadStageExecutor from musicdl.catalogsync.ops.repository import OperationsRepository with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" library_root = Path(tmpdir) / "library" initialize_database(db_path, default_library_root=library_root).close() repo = OperationsRepository(db_path) stage_id = repo.create_job_stage(repo.create_job_run("download_only", {"library_root": str(library_root)}), "download", 1) item_id = repo.create_job_item(stage_id, "song", "song:1", song_id=1, payload_json={"row": {"id": 1, "platform": "qq"}}) executor = DownloadStageExecutor(db_path=db_path, library_root=library_root, download_sources=["qq"]) with patch("musicdl.catalogsync.downloader.CatalogDownloader.download_song_row", return_value=True): executor.process_item(item_id=item_id, worker_name="download-1") item = repo.get_job_item(item_id) self.assertEqual("succeeded", item["status"]) ``` - [ ] **Step 2: Run the targeted test and verify it fails** Run: `python -m unittest tests.catalogsync.test_ops_executors -v` Expected: - `FAIL` because the executor layer and single-item helpers do not exist yet - [ ] **Step 3: Implement executor adapters and expose item-level hooks** ```python # musicdl/catalogsync/downloader.py class CatalogDownloader: def download_song_row( self, row: dict, library_root: str | Path, download_sources: list[str] | None = None, worker_callback=None, ) -> bool: default_root = Path(library_root).resolve() if worker_callback: worker_callback( current_song_id=int(row["id"]), current_playlist_id=row.get("playlist_id"), current_display_text=f'{row.get("name", row["id"])} / {row.get("singers", "")}'.strip(" /"), ) return self._download_one(row=row, default_root=default_root, download_sources=download_sources) ``` ```python # musicdl/catalogsync/uploader.py class CatalogUploader: def process_upload_task_row(self, task_row, backend_name: str) -> str: backend = self.get_backend(backend_name) return self._process_task(task_row, backend, uploader=None) ``` ```python # musicdl/catalogsync/services.py class CatalogSyncService: def sync_playlist_row(self, playlist_row) -> int: song_infos = self.resolve_playlist_song_infos(playlist_row) source_pool_ids = self.repository.get_pool_ids_for_playlist(int(playlist_row["id"])) linked_count = 0 for source_pool_id in source_pool_ids: linked_count += self.store_playlist_songs( playlist_id=int(playlist_row["id"]), source_pool_id=source_pool_id, song_infos=song_infos, ) return linked_count ``` ```python # musicdl/catalogsync/ops/executors.py class DownloadStageExecutor: def process_item(self, item_id: int, worker_name: str) -> None: item = self.ops_repo.claim_item(item_id=item_id, worker_name=worker_name) row = self.ops_repo.build_download_row(item_id) ok = self.downloader.download_song_row( row=row, library_root=self.library_root, download_sources=self.download_sources, worker_callback=lambda **state: self.ops_repo.update_worker_state(worker_name=worker_name, **state), ) if ok: self.ops_repo.mark_item_succeeded(item_id) else: self.ops_repo.mark_item_failed(item_id, "download returned no file") class SyncStageExecutor: def process_item(self, item_id: int, worker_name: str) -> None: item = self.ops_repo.claim_item(item_id=item_id, worker_name=worker_name) playlist_row = self.ops_repo.get_playlist_row_for_item(item_id) linked_count = self.service.sync_playlist_row(playlist_row) self.ops_repo.mark_item_succeeded(item_id, result_payload={"linked_count": linked_count}) class UploadStageExecutor: def process_item(self, item_id: int, worker_name: str) -> None: item = self.ops_repo.claim_item(item_id=item_id, worker_name=worker_name) upload_row = self.ops_repo.get_upload_row_for_item(item_id) result = self.uploader.process_upload_task_row(upload_row, backend_name=self.backend_name) if result == "succeeded": self.ops_repo.mark_item_succeeded(item_id) else: self.ops_repo.mark_item_failed(item_id, f"upload result: {result}") ``` - [ ] **Step 4: Run the targeted test and verify it passes** Run: `python -m unittest tests.catalogsync.test_ops_executors -v` Expected: - `OK` - download, sync, and upload work items can be processed one at a time - the item records and worker state update correctly - [ ] **Step 5: Commit** ```bash git add musicdl/catalogsync/downloader.py musicdl/catalogsync/uploader.py musicdl/catalogsync/services.py musicdl/catalogsync/ops/executors.py musicdl/catalogsync/ops/repository.py tests/catalogsync/test_ops_executors.py git commit -m "feat: add stage executors for operations console" ``` ### Task 5: Build The FastAPI UI And Management API **Files:** - Create: `musicdl/catalogsync/ops/web.py` - Create: `musicdl/catalogsync/templates/ops/base.html` - Create: `musicdl/catalogsync/templates/ops/dashboard.html` - Create: `musicdl/catalogsync/templates/ops/jobs.html` - Create: `musicdl/catalogsync/templates/ops/job_detail.html` - Create: `musicdl/catalogsync/templates/ops/playlists.html` - Create: `musicdl/catalogsync/templates/ops/songs.html` - Create: `musicdl/catalogsync/templates/ops/logs.html` - Create: `musicdl/catalogsync/templates/ops/config.html` - Create: `musicdl/catalogsync/static/ops/app.js` - Modify: `setup.py` - Modify: `MANIFEST.in` - Test: `tests/catalogsync/test_ops_api.py` - [ ] **Step 1: Write the failing API and page tests** ```python import tempfile import unittest from pathlib import Path from fastapi.testclient import TestClient class OperationsApiTests(unittest.TestCase): def test_dashboard_and_jobs_endpoints_render(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.ops.web import create_app with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" env_path = Path(tmpdir) / "catalogsync.env" env_path.write_text("LIBRARY_DIR=/volume4/Music_Cloud/library\n", encoding="utf-8") initialize_database(db_path).close() client = TestClient(create_app(db_path=db_path, env_path=env_path)) dashboard = client.get("/dashboard") jobs = client.get("/api/jobs") self.assertEqual(200, dashboard.status_code) self.assertEqual(200, jobs.status_code) ``` - [ ] **Step 2: Run the targeted test and verify it fails** Run: `python -m unittest tests.catalogsync.test_ops_api -v` Expected: - `FAIL` because the FastAPI app, templates, and API endpoints do not exist yet - [ ] **Step 3: Implement the FastAPI app, pages, APIs, and SSE** ```python # musicdl/catalogsync/ops/web.py from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates def create_app(db_path: str | Path, env_path: str | Path) -> FastAPI: app = FastAPI(title="Catalogsync Operations Console") repo = OperationsRepository(db_path) env_manager = CatalogsyncEnvManager(env_path=env_path, repository=repo) templates = Jinja2Templates(directory=str(Path(__file__).resolve().parents[1] / "templates")) app.mount("/static", StaticFiles(directory=str(Path(__file__).resolve().parents[1] / "static")), name="static") @app.get("/dashboard", response_class=HTMLResponse) def dashboard(request: Request): return templates.TemplateResponse( request, "ops/dashboard.html", {"title": "总览", "summary": repo.get_dashboard_summary()}, ) @app.get("/api/jobs") def api_jobs(): return {"items": repo.list_jobs()} @app.get("/api/events/stream") def api_events(): def event_stream(): while True: yield f"data: {json.dumps(repo.get_live_snapshot(), ensure_ascii=False)}\n\n" return StreamingResponse(event_stream(), media_type="text/event-stream") ``` ```html {{ title or "Catalogsync Console" }} {% block content %}{% endblock %} ``` ```text # MANIFEST.in recursive-include musicdl/catalogsync/templates/ops *.html recursive-include musicdl/catalogsync/static/ops *.js ``` - [ ] **Step 4: Run the targeted test and verify it passes** Run: `python -m unittest tests.catalogsync.test_ops_api -v` Expected: - `OK` - page routes render - queue-control and config endpoints return the expected status codes - [ ] **Step 5: Commit** ```bash git add musicdl/catalogsync/ops/web.py musicdl/catalogsync/templates/ops/base.html musicdl/catalogsync/templates/ops/dashboard.html musicdl/catalogsync/templates/ops/jobs.html musicdl/catalogsync/templates/ops/job_detail.html musicdl/catalogsync/templates/ops/playlists.html musicdl/catalogsync/templates/ops/songs.html musicdl/catalogsync/templates/ops/logs.html musicdl/catalogsync/templates/ops/config.html musicdl/catalogsync/static/ops/app.js setup.py MANIFEST.in tests/catalogsync/test_ops_api.py git commit -m "feat: add operations console web app" ``` ### Task 6: Wire The CLI, Runtime Scripts, Docs, And Final Verification **Files:** - Modify: `musicdl/catalogsync/cli.py` - Modify: `musicdl/catalogsync/runtime.py` - Modify: `requirements.txt` - Modify: `setup.py` - Modify: `scripts/catalogsync/templates/catalogsync.env.example` - Modify: `scripts/catalogsync/templates/install_runtime.sh` - Create: `scripts/catalogsync/templates/serve_console.sh` - Modify: `docs/catalogsync.md` - Modify: `README.md` - Modify: `tests/catalogsync/test_cli.py` - Modify: `tests/catalogsync/test_runtime.py` - [ ] **Step 1: Write the failing CLI and runtime tests** ```python import tempfile import unittest from pathlib import Path from unittest.mock import patch from click.testing import CliRunner class CatalogConsoleCliTests(unittest.TestCase): def test_serve_command_builds_web_app(self): from musicdl.catalogsync.cli import cli runner = CliRunner() with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" env_path = Path(tmpdir) / "catalogsync.env" env_path.write_text("LIBRARY_DIR=/volume4/Music_Cloud/library\n", encoding="utf-8") with patch("musicdl.catalogsync.cli.uvicorn.run") as uvicorn_run: result = runner.invoke( cli, [ "serve", "--db", str(db_path), "--env-file", str(env_path), "--host", "0.0.0.0", "--port", "8421", ], ) self.assertEqual(0, result.exit_code, msg=result.output) uvicorn_run.assert_called_once() ``` - [ ] **Step 2: Run the targeted tests and verify they fail** Run: `python -m unittest tests.catalogsync.test_cli tests.catalogsync.test_runtime -v` Expected: - `FAIL` because the `serve` command and web runtime fields do not exist yet - [ ] **Step 3: Implement the serve command, runtime fields, dependencies, and NAS script** ```python # musicdl/catalogsync/runtime.py @dataclass class CatalogSyncRuntimeConfig: root_dir: Path app_home: Path library_dir: Path db_path: Path input_dir: Path log_dir: Path python_bin: str venv_dir: Path download_layout: str env_file: Path web_host: str web_port: int ``` ```python # musicdl/catalogsync/cli.py @cli.command("serve") @click.option("--db", "db_path", required=True, type=click.Path(dir_okay=False)) @click.option("--env-file", required=True, type=click.Path(dir_okay=False, exists=True)) @click.option("--host", default="0.0.0.0", show_default=True) @click.option("--port", default=8421, type=int, show_default=True) def serve_command(db_path: str, env_file: str, host: str, port: int): import uvicorn from .ops.web import create_app app = create_app(db_path=db_path, env_path=env_file) uvicorn.run(app, host=host, port=port) ``` ```bash # scripts/catalogsync/templates/serve_console.sh "${VENV_DIR}/bin/python" -m musicdl.catalogsync.cli serve \ --db "${DB_PATH}" \ --env-file "${ENV_FILE:-${CONFIG_FILE}}" \ --host "${WEB_HOST:-0.0.0.0}" \ --port "${WEB_PORT:-8421}" ``` - [ ] **Step 4: Update docs and runtime templates** ```text # scripts/catalogsync/templates/catalogsync.env.example ENV_FILE=/volume4/Music_Cloud/catalogsync/config/catalogsync.env WEB_HOST=0.0.0.0 WEB_PORT=8421 ``` Document in: - `docs/catalogsync.md` - `README.md` - [ ] **Step 5: Run the full verification suite** Run: `python -m unittest discover -s tests/catalogsync -v` Expected: - `OK` - all previous catalogsync tests still pass - the new operations-console tests pass Run: `python -m musicdl.catalogsync.cli serve --help` Expected: - help output lists `--db`, `--env-file`, `--host`, and `--port` - [ ] **Step 6: Commit** ```bash git add musicdl/catalogsync/cli.py musicdl/catalogsync/runtime.py requirements.txt setup.py scripts/catalogsync/templates/catalogsync.env.example scripts/catalogsync/templates/install_runtime.sh scripts/catalogsync/templates/serve_console.sh docs/catalogsync.md README.md tests/catalogsync/test_cli.py tests/catalogsync/test_runtime.py git commit -m "feat: ship operations console runtime and docs" ```