Files
musicdl-catalog-sync-suite/catalog-sync/docs/superpowers/plans/2026-04-16-catalogsync-operations-console.md

34 KiB

Catalogsync Operations Console Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Build a NAS-local operations console for musicdl.catalogsync with queue-based collect/sync/download/upload jobs, soft pause and resume, crash-safe recovery, song-level retry, worker visibility, and catalogsync.env configuration management.

Architecture: Add a new musicdl.catalogsync.ops package for execution-state persistence, env revision management, job orchestration, and FastAPI-backed UI/API endpoints. Reuse the existing catalog tables and execution services by wrapping them in stage and item executors rather than rebuilding the domain logic from scratch.

Tech Stack: Python 3, unittest, SQLite, FastAPI, Jinja2, Server-Sent Events, existing musicdl.catalogsync services/downloader/uploader, NAS shell templates.


File Structure

New files

  • musicdl/catalogsync/ops/__init__.py
    • exports the operations-console public entry points
  • musicdl/catalogsync/ops/models.py
    • enums, dataclasses, and small helpers for job, stage, item, and worker states
  • musicdl/catalogsync/ops/repository.py
    • CRUD for job_runs, job_stages, job_items, job_workers, job_commands, job_events, job_logs, and config_revisions
  • musicdl/catalogsync/ops/config.py
    • load, validate, snapshot, version, and apply catalogsync.env
  • musicdl/catalogsync/ops/executors.py
    • adapters that run collect, sync, download, and upload one work item at a time
  • musicdl/catalogsync/ops/runner.py
    • queue scheduler, command polling, pause/resume, and crash recovery
  • musicdl/catalogsync/ops/web.py
    • FastAPI app factory, API routes, page routes, and SSE stream
  • musicdl/catalogsync/templates/ops/base.html
    • shared layout
  • musicdl/catalogsync/templates/ops/dashboard.html
    • dashboard page
  • musicdl/catalogsync/templates/ops/jobs.html
    • queue and active-job page
  • musicdl/catalogsync/templates/ops/job_detail.html
    • per-job detail page
  • musicdl/catalogsync/templates/ops/playlists.html
    • playlist-pool and playlist status page
  • musicdl/catalogsync/templates/ops/songs.html
    • worker and song-processing page
  • musicdl/catalogsync/templates/ops/logs.html
    • log and exception page
  • musicdl/catalogsync/templates/ops/config.html
    • env editor and revision page
  • musicdl/catalogsync/static/ops/app.js
    • lightweight browser logic for SSE updates and page actions
  • scripts/catalogsync/templates/serve_console.sh
    • NAS runtime script for the web console
  • tests/catalogsync/test_ops_db.py
    • schema and repository coverage for operations tables
  • tests/catalogsync/test_ops_config.py
    • env loading, snapshot, revision, and apply tests
  • tests/catalogsync/test_ops_runner.py
    • state-machine, pause/resume, and recovery tests
  • tests/catalogsync/test_ops_executors.py
    • per-stage executor tests
  • tests/catalogsync/test_ops_api.py
    • FastAPI route and SSE tests

Modified files

  • musicdl/catalogsync/db.py
    • create the new operations tables
  • musicdl/catalogsync/repository.py
    • add query helpers that the operations layer can reuse from catalog data
  • musicdl/catalogsync/services.py
    • expose a playlist-row based sync unit for the runner
  • musicdl/catalogsync/downloader.py
    • expose song-level download execution for one queued item
  • musicdl/catalogsync/uploader.py
    • expose upload-task-level execution for one queued item
  • musicdl/catalogsync/cli.py
    • add the serve command
  • musicdl/catalogsync/runtime.py
    • add web host, port, and config-path runtime fields
  • scripts/catalogsync/templates/catalogsync.env.example
    • add web-console settings
  • scripts/catalogsync/templates/install_runtime.sh
    • install any new web-console dependencies
  • docs/catalogsync.md
    • document the console workflow and runtime script
  • README.md
    • add a concise operations-console entry
  • requirements.txt
    • add the web-console runtime dependencies
  • setup.py
    • include templates/static assets in packaging
  • MANIFEST.in
    • include template/static files for source distributions
  • tests/catalogsync/test_cli.py
    • cover the new serve command
  • tests/catalogsync/test_runtime.py
    • cover the new runtime config fields

Dependency notes

  • Add fastapi
  • Add uvicorn
  • Add jinja2
  • Add python-multipart

These should be added only once and then reused by the serve command, tests, and NAS runtime script.

Task 1: Add Operations Schema And Repository Primitives

Files:

  • Create: musicdl/catalogsync/ops/__init__.py

  • Create: musicdl/catalogsync/ops/models.py

  • Create: musicdl/catalogsync/ops/repository.py

  • Modify: musicdl/catalogsync/db.py

  • Test: tests/catalogsync/test_ops_db.py

  • Step 1: Write the failing schema and repository tests

import sqlite3
import tempfile
import unittest
from contextlib import closing
from pathlib import Path


class OperationsSchemaTests(unittest.TestCase):
    def test_initialize_database_creates_operations_tables(self):
        from musicdl.catalogsync.db import initialize_database

        expected_tables = {
            "job_runs",
            "job_stages",
            "job_items",
            "job_workers",
            "job_commands",
            "job_events",
            "job_logs",
            "config_revisions",
        }

        with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
            db_path = Path(tmpdir) / "catalogsync.db"
            initialize_database(db_path).close()
            with closing(sqlite3.connect(db_path)) as conn:
                tables = {
                    row[0]
                    for row in conn.execute(
                        "SELECT name FROM sqlite_master WHERE type = 'table'"
                    ).fetchall()
                }

        self.assertTrue(expected_tables.issubset(tables))
  • Step 2: Run the targeted test and verify it fails

Run: python -m unittest tests.catalogsync.test_ops_db -v

Expected:

  • FAIL because the operations tables and repository module do not exist yet

  • Step 3: Implement the schema, enums, and repository

# musicdl/catalogsync/db.py
REQUIRED_TABLES |= {
    "job_runs",
    "job_stages",
    "job_items",
    "job_workers",
    "job_commands",
    "job_events",
    "job_logs",
    "config_revisions",
}
# musicdl/catalogsync/ops/models.py
from dataclasses import dataclass
from enum import StrEnum


class JobStatus(StrEnum):
    QUEUED = "queued"
    RUNNING = "running"
    PAUSE_REQUESTED = "pause_requested"
    PAUSED = "paused"
    COMPLETED = "completed"
    COMPLETED_WITH_ERRORS = "completed_with_errors"
    FAILED = "failed"
    CANCELED = "canceled"


@dataclass(frozen=True)
class JobCreateRequest:
    job_type: str
    config_snapshot: dict
    sources: list[str] | None = None
# musicdl/catalogsync/ops/repository.py
class OperationsRepository:
    def create_job_run(self, job_type: str, config_snapshot: dict, sources=None, download_sources=None, playlist_scope=None) -> int:
        with connect_database(self.db_path) as conn:
            cursor = conn.execute(
                """
                INSERT INTO job_runs (job_type, config_snapshot_json, sources, download_sources, playlist_scope_json)
                VALUES (?, ?, ?, ?, ?)
                """,
                (
                    job_type,
                    json.dumps(config_snapshot, ensure_ascii=False),
                    ",".join(sources or []),
                    ",".join(download_sources or []),
                    json.dumps(playlist_scope or {}, ensure_ascii=False),
                ),
            )
            return int(cursor.lastrowid)

    def create_job_stage(self, job_id: int, stage_type: str, seq_no: int) -> int:
        with connect_database(self.db_path) as conn:
            cursor = conn.execute(
                "INSERT INTO job_stages (job_run_id, stage_type, seq_no) VALUES (?, ?, ?)",
                (job_id, stage_type, seq_no),
            )
            return int(cursor.lastrowid)

    def create_job_item(self, job_stage_id: int, item_type: str, item_key: str, **extra) -> int:
        with connect_database(self.db_path) as conn:
            cursor = conn.execute(
                """
                INSERT INTO job_items (job_stage_id, item_type, item_key, playlist_pool_id, playlist_id, song_id, file_location_id, payload_json)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                """,
                (
                    job_stage_id,
                    item_type,
                    item_key,
                    extra.get("playlist_pool_id"),
                    extra.get("playlist_id"),
                    extra.get("song_id"),
                    extra.get("file_location_id"),
                    json.dumps(extra.get("payload_json") or {}, ensure_ascii=False),
                ),
            )
            return int(cursor.lastrowid)
  • Step 4: Run the targeted test and verify it passes

Run: python -m unittest tests.catalogsync.test_ops_db -v

Expected:

  • OK

  • the test output shows the new operations tables are created

  • Step 5: Commit

git add musicdl/catalogsync/db.py musicdl/catalogsync/ops/__init__.py musicdl/catalogsync/ops/models.py musicdl/catalogsync/ops/repository.py tests/catalogsync/test_ops_db.py
git commit -m "feat: add operations schema and repository primitives"

Task 2: Add Env Revision Management And Job Config Snapshots

Files:

  • Create: musicdl/catalogsync/ops/config.py

  • Modify: musicdl/catalogsync/ops/repository.py

  • Test: tests/catalogsync/test_ops_config.py

  • Step 1: Write the failing config and revision tests

import tempfile
import unittest
from pathlib import Path


class EnvManagerTests(unittest.TestCase):
    def test_load_snapshot_and_save_revision(self):
        from musicdl.catalogsync.db import initialize_database
        from musicdl.catalogsync.ops.config import CatalogsyncEnvManager
        from musicdl.catalogsync.ops.repository import OperationsRepository

        with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
            db_path = Path(tmpdir) / "catalogsync.db"
            env_path = Path(tmpdir) / "catalogsync.env"
            env_path.write_text(
                "LIBRARY_DIR=/volume4/Music_Cloud/library\nDOWNLOAD_SOURCES=qq,kuwo,migu\n",
                encoding="utf-8",
            )
            initialize_database(db_path).close()
            repo = OperationsRepository(db_path)
            manager = CatalogsyncEnvManager(env_path=env_path, repository=repo)

            snapshot = manager.build_job_snapshot()
            revision_id = manager.save_revision(note="initial import")
            revisions = manager.list_revisions()

        self.assertEqual(["qq", "kuwo", "migu"], snapshot["download_sources"])
        self.assertEqual(revision_id, revisions[0]["id"])
  • Step 2: Run the targeted test and verify it fails

Run: python -m unittest tests.catalogsync.test_ops_config -v

Expected:

  • FAIL because the env manager and config revision methods do not exist yet

  • Step 3: Implement the env manager and revision methods

# musicdl/catalogsync/ops/config.py
class CatalogsyncEnvManager:
    SNAPSHOT_KEYS = {
        "DB_PATH",
        "LIBRARY_DIR",
        "DOWNLOAD_SOURCES",
        "OBJECT_BACKEND_NAME",
        "OBJECT_BUCKET",
        "OBJECT_ENDPOINT",
    }

    def load_current(self) -> dict[str, str]:
        values: dict[str, str] = {}
        for line in self.env_path.read_text(encoding="utf-8").splitlines():
            line = line.strip()
            if not line or line.startswith("#") or "=" not in line:
                continue
            key, value = line.split("=", 1)
            values[key.strip()] = value.strip()
        return values

    def build_job_snapshot(self) -> dict:
        current = self.load_current()
        return {
            "library_root": current.get("LIBRARY_DIR", ""),
            "download_sources": [item for item in current.get("DOWNLOAD_SOURCES", "").split(",") if item],
            "env_values": {key: current[key] for key in self.SNAPSHOT_KEYS if key in current},
        }

    def save_revision(self, note: str | None = None) -> int:
        content = self.env_path.read_text(encoding="utf-8")
        digest = hashlib.sha256(content.encode("utf-8")).hexdigest()
        return self.repository.create_config_revision(
            file_path=str(self.env_path),
            content_text=content,
            content_hash=digest,
            note=note,
        )

    def apply_revision(self, revision_id: int) -> None:
        row = self.repository.get_config_revision(revision_id)
        self.env_path.write_text(row["content_text"], encoding="utf-8")
        self.repository.mark_config_revision_applied(revision_id)
# musicdl/catalogsync/ops/repository.py
def create_config_revision(self, file_path: str, content_text: str, content_hash: str, note: str | None = None) -> int:
    with connect_database(self.db_path) as conn:
        cursor = conn.execute(
            """
            INSERT INTO config_revisions (file_path, content_text, content_hash, note)
            VALUES (?, ?, ?, ?)
            """,
            (file_path, content_text, content_hash, note),
        )
        return int(cursor.lastrowid)

def get_config_revision(self, revision_id: int):
    with connect_database(self.db_path) as conn:
        return conn.execute("SELECT * FROM config_revisions WHERE id = ?", (revision_id,)).fetchone()
  • Step 4: Run the targeted test and verify it passes

Run: python -m unittest tests.catalogsync.test_ops_config -v

Expected:

  • OK

  • revision apply rewrites the env file and marks the revision as applied

  • Step 5: Commit

git add musicdl/catalogsync/ops/config.py musicdl/catalogsync/ops/repository.py tests/catalogsync/test_ops_config.py
git commit -m "feat: add env revision and job snapshot management"

Task 3: Implement The Runner State Machine And Recovery Logic

Files:

  • Create: musicdl/catalogsync/ops/runner.py

  • Modify: musicdl/catalogsync/ops/models.py

  • Modify: musicdl/catalogsync/ops/repository.py

  • Test: tests/catalogsync/test_ops_runner.py

  • Step 1: Write the failing runner-state tests

import tempfile
import unittest
from pathlib import Path


class RunnerStateTests(unittest.TestCase):
    def test_recover_orphaned_jobs_converts_running_items_to_interrupted(self):
        from musicdl.catalogsync.db import initialize_database
        from musicdl.catalogsync.ops.repository import OperationsRepository
        from musicdl.catalogsync.ops.runner import CatalogsyncRunner

        with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
            db_path = Path(tmpdir) / "catalogsync.db"
            initialize_database(db_path).close()
            repo = OperationsRepository(db_path)
            runner = CatalogsyncRunner(db_path=db_path, env_path=Path(tmpdir) / "catalogsync.env")

            job_id = repo.create_job_run("download_only", {"library_root": "/tmp/library"})
            stage_id = repo.create_job_stage(job_id, "download", 1)
            item_id = repo.create_job_item(stage_id, "song", "song:1", song_id=1)
            repo.mark_job_running(job_id)
            repo.mark_stage_running(stage_id)
            repo.mark_item_running(item_id, worker_id=1)

            runner.recover_incomplete_jobs()

            job = repo.get_job_run(job_id)
            item = repo.get_job_item(item_id)

        self.assertEqual("paused", job["status"])
        self.assertEqual("interrupted", item["status"])
  • Step 2: Run the targeted test and verify it fails

Run: python -m unittest tests.catalogsync.test_ops_runner -v

Expected:

  • FAIL because the runner, command flow, and recovery helpers do not exist yet

  • Step 3: Implement the runner, command polling, and recovery

# musicdl/catalogsync/ops/runner.py
class CatalogsyncRunner:
    def recover_incomplete_jobs(self) -> None:
        for job in self.repository.list_recoverable_jobs():
            self.repository.pause_job_for_recovery(int(job["id"]))
            for item in self.repository.list_running_items(int(job["id"])):
                self.repository.mark_item_interrupted(int(item["id"]), last_error="Runner restarted during execution")
            self.repository.add_job_event(int(job["id"]), "recovery_requeued", "Recovered job after runner restart")

    def apply_pending_commands(self) -> None:
        for command in self.repository.list_pending_commands():
            if command["command_type"] == "pause":
                self.repository.request_job_pause(int(command["job_run_id"]))
            elif command["command_type"] == "resume":
                self.repository.resume_job(int(command["job_run_id"]))
            elif command["command_type"] == "retry_item":
                self.repository.requeue_item(int(command["target_item_id"]), force=False)
            elif command["command_type"] == "force_retry_item":
                self.repository.requeue_item(int(command["target_item_id"]), force=True)
            self.repository.mark_command_applied(int(command["id"]))

    def reconcile_pause_state(self, job_id: int) -> None:
        if self.repository.job_has_running_items(job_id):
            return
        self.repository.finalize_pause(job_id)

    def loop_once(self) -> None:
        self.apply_pending_commands()
        active_job = self.repository.claim_next_runnable_job()
        if active_job is None:
            return
        self.repository.mark_job_running(int(active_job["id"]))
# musicdl/catalogsync/ops/repository.py
def request_job_pause(self, job_id: int) -> None:
    with connect_database(self.db_path) as conn:
        conn.execute("UPDATE job_runs SET status = 'pause_requested' WHERE id = ?", (job_id,))
        conn.execute(
            "UPDATE job_stages SET status = 'pause_requested' WHERE job_run_id = ? AND status = 'running'",
            (job_id,),
        )

def pause_job_for_recovery(self, job_id: int) -> None:
    with connect_database(self.db_path) as conn:
        conn.execute("UPDATE job_runs SET status = 'paused' WHERE id = ?", (job_id,))
        conn.execute(
            "UPDATE job_stages SET status = 'paused' WHERE job_run_id = ? AND status IN ('running', 'pause_requested')",
            (job_id,),
        )

def mark_item_interrupted(self, item_id: int, last_error: str | None = None) -> None:
    with connect_database(self.db_path) as conn:
        conn.execute(
            "UPDATE job_items SET status = 'interrupted', worker_id = NULL, ended_at = CURRENT_TIMESTAMP, last_error = ? WHERE id = ?",
            (last_error, item_id),
        )
  • Step 4: Run the targeted test and verify it passes

Run: python -m unittest tests.catalogsync.test_ops_runner -v

Expected:

  • OK

  • orphaned running items become interrupted

  • soft pause waits until no item remains running before closing the stage

  • Step 5: Commit

git add musicdl/catalogsync/ops/runner.py musicdl/catalogsync/ops/models.py musicdl/catalogsync/ops/repository.py tests/catalogsync/test_ops_runner.py
git commit -m "feat: add operations runner state machine and recovery"

Task 4: Add Stage Executors And Single-Item Execution Hooks

Files:

  • Create: musicdl/catalogsync/ops/executors.py

  • Modify: musicdl/catalogsync/services.py

  • Modify: musicdl/catalogsync/downloader.py

  • Modify: musicdl/catalogsync/uploader.py

  • Modify: musicdl/catalogsync/ops/repository.py

  • Test: tests/catalogsync/test_ops_executors.py

  • Step 1: Write the failing executor integration tests

import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch


class StageExecutorTests(unittest.TestCase):
    def test_download_executor_marks_item_succeeded(self):
        from musicdl.catalogsync.db import initialize_database
        from musicdl.catalogsync.ops.executors import DownloadStageExecutor
        from musicdl.catalogsync.ops.repository import OperationsRepository

        with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
            db_path = Path(tmpdir) / "catalogsync.db"
            library_root = Path(tmpdir) / "library"
            initialize_database(db_path, default_library_root=library_root).close()
            repo = OperationsRepository(db_path)
            stage_id = repo.create_job_stage(repo.create_job_run("download_only", {"library_root": str(library_root)}), "download", 1)
            item_id = repo.create_job_item(stage_id, "song", "song:1", song_id=1, payload_json={"row": {"id": 1, "platform": "qq"}})

            executor = DownloadStageExecutor(db_path=db_path, library_root=library_root, download_sources=["qq"])
            with patch("musicdl.catalogsync.downloader.CatalogDownloader.download_song_row", return_value=True):
                executor.process_item(item_id=item_id, worker_name="download-1")

            item = repo.get_job_item(item_id)

        self.assertEqual("succeeded", item["status"])
  • Step 2: Run the targeted test and verify it fails

Run: python -m unittest tests.catalogsync.test_ops_executors -v

Expected:

  • FAIL because the executor layer and single-item helpers do not exist yet

  • Step 3: Implement executor adapters and expose item-level hooks

# musicdl/catalogsync/downloader.py
class CatalogDownloader:
    def download_song_row(
        self,
        row: dict,
        library_root: str | Path,
        download_sources: list[str] | None = None,
        worker_callback=None,
    ) -> bool:
        default_root = Path(library_root).resolve()
        if worker_callback:
            worker_callback(
                current_song_id=int(row["id"]),
                current_playlist_id=row.get("playlist_id"),
                current_display_text=f'{row.get("name", row["id"])} / {row.get("singers", "")}'.strip(" /"),
            )
        return self._download_one(row=row, default_root=default_root, download_sources=download_sources)
# musicdl/catalogsync/uploader.py
class CatalogUploader:
    def process_upload_task_row(self, task_row, backend_name: str) -> str:
        backend = self.get_backend(backend_name)
        return self._process_task(task_row, backend, uploader=None)
# musicdl/catalogsync/services.py
class CatalogSyncService:
    def sync_playlist_row(self, playlist_row) -> int:
        song_infos = self.resolve_playlist_song_infos(playlist_row)
        source_pool_ids = self.repository.get_pool_ids_for_playlist(int(playlist_row["id"]))
        linked_count = 0
        for source_pool_id in source_pool_ids:
            linked_count += self.store_playlist_songs(
                playlist_id=int(playlist_row["id"]),
                source_pool_id=source_pool_id,
                song_infos=song_infos,
            )
        return linked_count
# musicdl/catalogsync/ops/executors.py
class DownloadStageExecutor:
    def process_item(self, item_id: int, worker_name: str) -> None:
        item = self.ops_repo.claim_item(item_id=item_id, worker_name=worker_name)
        row = self.ops_repo.build_download_row(item_id)
        ok = self.downloader.download_song_row(
            row=row,
            library_root=self.library_root,
            download_sources=self.download_sources,
            worker_callback=lambda **state: self.ops_repo.update_worker_state(worker_name=worker_name, **state),
        )
        if ok:
            self.ops_repo.mark_item_succeeded(item_id)
        else:
            self.ops_repo.mark_item_failed(item_id, "download returned no file")


class SyncStageExecutor:
    def process_item(self, item_id: int, worker_name: str) -> None:
        item = self.ops_repo.claim_item(item_id=item_id, worker_name=worker_name)
        playlist_row = self.ops_repo.get_playlist_row_for_item(item_id)
        linked_count = self.service.sync_playlist_row(playlist_row)
        self.ops_repo.mark_item_succeeded(item_id, result_payload={"linked_count": linked_count})


class UploadStageExecutor:
    def process_item(self, item_id: int, worker_name: str) -> None:
        item = self.ops_repo.claim_item(item_id=item_id, worker_name=worker_name)
        upload_row = self.ops_repo.get_upload_row_for_item(item_id)
        result = self.uploader.process_upload_task_row(upload_row, backend_name=self.backend_name)
        if result == "succeeded":
            self.ops_repo.mark_item_succeeded(item_id)
        else:
            self.ops_repo.mark_item_failed(item_id, f"upload result: {result}")
  • Step 4: Run the targeted test and verify it passes

Run: python -m unittest tests.catalogsync.test_ops_executors -v

Expected:

  • OK

  • download, sync, and upload work items can be processed one at a time

  • the item records and worker state update correctly

  • Step 5: Commit

git add musicdl/catalogsync/downloader.py musicdl/catalogsync/uploader.py musicdl/catalogsync/services.py musicdl/catalogsync/ops/executors.py musicdl/catalogsync/ops/repository.py tests/catalogsync/test_ops_executors.py
git commit -m "feat: add stage executors for operations console"

Task 5: Build The FastAPI UI And Management API

Files:

  • Create: musicdl/catalogsync/ops/web.py

  • Create: musicdl/catalogsync/templates/ops/base.html

  • Create: musicdl/catalogsync/templates/ops/dashboard.html

  • Create: musicdl/catalogsync/templates/ops/jobs.html

  • Create: musicdl/catalogsync/templates/ops/job_detail.html

  • Create: musicdl/catalogsync/templates/ops/playlists.html

  • Create: musicdl/catalogsync/templates/ops/songs.html

  • Create: musicdl/catalogsync/templates/ops/logs.html

  • Create: musicdl/catalogsync/templates/ops/config.html

  • Create: musicdl/catalogsync/static/ops/app.js

  • Modify: setup.py

  • Modify: MANIFEST.in

  • Test: tests/catalogsync/test_ops_api.py

  • Step 1: Write the failing API and page tests

import tempfile
import unittest
from pathlib import Path

from fastapi.testclient import TestClient


class OperationsApiTests(unittest.TestCase):
    def test_dashboard_and_jobs_endpoints_render(self):
        from musicdl.catalogsync.db import initialize_database
        from musicdl.catalogsync.ops.web import create_app

        with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
            db_path = Path(tmpdir) / "catalogsync.db"
            env_path = Path(tmpdir) / "catalogsync.env"
            env_path.write_text("LIBRARY_DIR=/volume4/Music_Cloud/library\n", encoding="utf-8")
            initialize_database(db_path).close()
            client = TestClient(create_app(db_path=db_path, env_path=env_path))

            dashboard = client.get("/dashboard")
            jobs = client.get("/api/jobs")

        self.assertEqual(200, dashboard.status_code)
        self.assertEqual(200, jobs.status_code)
  • Step 2: Run the targeted test and verify it fails

Run: python -m unittest tests.catalogsync.test_ops_api -v

Expected:

  • FAIL because the FastAPI app, templates, and API endpoints do not exist yet

  • Step 3: Implement the FastAPI app, pages, APIs, and SSE

# musicdl/catalogsync/ops/web.py
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates


def create_app(db_path: str | Path, env_path: str | Path) -> FastAPI:
    app = FastAPI(title="Catalogsync Operations Console")
    repo = OperationsRepository(db_path)
    env_manager = CatalogsyncEnvManager(env_path=env_path, repository=repo)
    templates = Jinja2Templates(directory=str(Path(__file__).resolve().parents[1] / "templates"))
    app.mount("/static", StaticFiles(directory=str(Path(__file__).resolve().parents[1] / "static")), name="static")
    @app.get("/dashboard", response_class=HTMLResponse)
    def dashboard(request: Request):
        return templates.TemplateResponse(
            request,
            "ops/dashboard.html",
            {"title": "总览", "summary": repo.get_dashboard_summary()},
        )

    @app.get("/api/jobs")
    def api_jobs():
        return {"items": repo.list_jobs()}

    @app.get("/api/events/stream")
    def api_events():
        def event_stream():
            while True:
                yield f"data: {json.dumps(repo.get_live_snapshot(), ensure_ascii=False)}\n\n"
        return StreamingResponse(event_stream(), media_type="text/event-stream")
<!-- musicdl/catalogsync/templates/ops/base.html -->
<!DOCTYPE html>
<html lang="zh-CN">
  <head>
    <meta charset="utf-8">
    <title>{{ title or "Catalogsync Console" }}</title>
    <script src="/static/ops/app.js" defer></script>
  </head>
  <body data-sse-url="/api/events/stream">
    <nav>
      <a href="/dashboard">总览</a>
      <a href="/jobs">任务中心</a>
      <a href="/playlists">歌单池</a>
      <a href="/songs">歌曲处理</a>
      <a href="/logs">日志异常</a>
      <a href="/config">配置管理</a>
    </nav>
    {% block content %}{% endblock %}
  </body>
</html>
# MANIFEST.in
recursive-include musicdl/catalogsync/templates/ops *.html
recursive-include musicdl/catalogsync/static/ops *.js
  • Step 4: Run the targeted test and verify it passes

Run: python -m unittest tests.catalogsync.test_ops_api -v

Expected:

  • OK

  • page routes render

  • queue-control and config endpoints return the expected status codes

  • Step 5: Commit

git add musicdl/catalogsync/ops/web.py musicdl/catalogsync/templates/ops/base.html musicdl/catalogsync/templates/ops/dashboard.html musicdl/catalogsync/templates/ops/jobs.html musicdl/catalogsync/templates/ops/job_detail.html musicdl/catalogsync/templates/ops/playlists.html musicdl/catalogsync/templates/ops/songs.html musicdl/catalogsync/templates/ops/logs.html musicdl/catalogsync/templates/ops/config.html musicdl/catalogsync/static/ops/app.js setup.py MANIFEST.in tests/catalogsync/test_ops_api.py
git commit -m "feat: add operations console web app"

Task 6: Wire The CLI, Runtime Scripts, Docs, And Final Verification

Files:

  • Modify: musicdl/catalogsync/cli.py

  • Modify: musicdl/catalogsync/runtime.py

  • Modify: requirements.txt

  • Modify: setup.py

  • Modify: scripts/catalogsync/templates/catalogsync.env.example

  • Modify: scripts/catalogsync/templates/install_runtime.sh

  • Create: scripts/catalogsync/templates/serve_console.sh

  • Modify: docs/catalogsync.md

  • Modify: README.md

  • Modify: tests/catalogsync/test_cli.py

  • Modify: tests/catalogsync/test_runtime.py

  • Step 1: Write the failing CLI and runtime tests

import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch

from click.testing import CliRunner


class CatalogConsoleCliTests(unittest.TestCase):
    def test_serve_command_builds_web_app(self):
        from musicdl.catalogsync.cli import cli

        runner = CliRunner()
        with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
            db_path = Path(tmpdir) / "catalogsync.db"
            env_path = Path(tmpdir) / "catalogsync.env"
            env_path.write_text("LIBRARY_DIR=/volume4/Music_Cloud/library\n", encoding="utf-8")

            with patch("musicdl.catalogsync.cli.uvicorn.run") as uvicorn_run:
                result = runner.invoke(
                    cli,
                    [
                        "serve",
                        "--db",
                        str(db_path),
                        "--env-file",
                        str(env_path),
                        "--host",
                        "0.0.0.0",
                        "--port",
                        "8421",
                    ],
                )

        self.assertEqual(0, result.exit_code, msg=result.output)
        uvicorn_run.assert_called_once()
  • Step 2: Run the targeted tests and verify they fail

Run: python -m unittest tests.catalogsync.test_cli tests.catalogsync.test_runtime -v

Expected:

  • FAIL because the serve command and web runtime fields do not exist yet

  • Step 3: Implement the serve command, runtime fields, dependencies, and NAS script

# musicdl/catalogsync/runtime.py
@dataclass
class CatalogSyncRuntimeConfig:
    root_dir: Path
    app_home: Path
    library_dir: Path
    db_path: Path
    input_dir: Path
    log_dir: Path
    python_bin: str
    venv_dir: Path
    download_layout: str
    env_file: Path
    web_host: str
    web_port: int
# musicdl/catalogsync/cli.py
@cli.command("serve")
@click.option("--db", "db_path", required=True, type=click.Path(dir_okay=False))
@click.option("--env-file", required=True, type=click.Path(dir_okay=False, exists=True))
@click.option("--host", default="0.0.0.0", show_default=True)
@click.option("--port", default=8421, type=int, show_default=True)
def serve_command(db_path: str, env_file: str, host: str, port: int):
    import uvicorn
    from .ops.web import create_app

    app = create_app(db_path=db_path, env_path=env_file)
    uvicorn.run(app, host=host, port=port)
# scripts/catalogsync/templates/serve_console.sh
"${VENV_DIR}/bin/python" -m musicdl.catalogsync.cli serve \
  --db "${DB_PATH}" \
  --env-file "${ENV_FILE:-${CONFIG_FILE}}" \
  --host "${WEB_HOST:-0.0.0.0}" \
  --port "${WEB_PORT:-8421}"
  • Step 4: Update docs and runtime templates
# scripts/catalogsync/templates/catalogsync.env.example
ENV_FILE=/volume4/Music_Cloud/catalogsync/config/catalogsync.env
WEB_HOST=0.0.0.0
WEB_PORT=8421

Document in:

  • docs/catalogsync.md

  • README.md

  • Step 5: Run the full verification suite

Run: python -m unittest discover -s tests/catalogsync -v

Expected:

  • OK
  • all previous catalogsync tests still pass
  • the new operations-console tests pass

Run: python -m musicdl.catalogsync.cli serve --help

Expected:

  • help output lists --db, --env-file, --host, and --port

  • Step 6: Commit

git add musicdl/catalogsync/cli.py musicdl/catalogsync/runtime.py requirements.txt setup.py scripts/catalogsync/templates/catalogsync.env.example scripts/catalogsync/templates/install_runtime.sh scripts/catalogsync/templates/serve_console.sh docs/catalogsync.md README.md tests/catalogsync/test_cli.py tests/catalogsync/test_runtime.py
git commit -m "feat: ship operations console runtime and docs"