959 lines
34 KiB
Markdown
959 lines
34 KiB
Markdown
# Catalogsync Operations Console Implementation Plan
|
|
|
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
|
|
|
**Goal:** Build a NAS-local operations console for `musicdl.catalogsync` with queue-based `collect/sync/download/upload` jobs, soft pause and resume, crash-safe recovery, song-level retry, worker visibility, and `catalogsync.env` configuration management.
|
|
|
|
**Architecture:** Add a new `musicdl.catalogsync.ops` package for execution-state persistence, env revision management, job orchestration, and FastAPI-backed UI/API endpoints. Reuse the existing catalog tables and execution services by wrapping them in stage and item executors rather than rebuilding the domain logic from scratch.
|
|
|
|
**Tech Stack:** Python 3, unittest, SQLite, FastAPI, Jinja2, Server-Sent Events, existing `musicdl.catalogsync` services/downloader/uploader, NAS shell templates.
|
|
|
|
---
|
|
|
|
## File Structure
|
|
|
|
### New files
|
|
|
|
- `musicdl/catalogsync/ops/__init__.py`
|
|
- exports the operations-console public entry points
|
|
- `musicdl/catalogsync/ops/models.py`
|
|
- enums, dataclasses, and small helpers for job, stage, item, and worker states
|
|
- `musicdl/catalogsync/ops/repository.py`
|
|
- CRUD for `job_runs`, `job_stages`, `job_items`, `job_workers`, `job_commands`, `job_events`, `job_logs`, and `config_revisions`
|
|
- `musicdl/catalogsync/ops/config.py`
|
|
- load, validate, snapshot, version, and apply `catalogsync.env`
|
|
- `musicdl/catalogsync/ops/executors.py`
|
|
- adapters that run `collect`, `sync`, `download`, and `upload` one work item at a time
|
|
- `musicdl/catalogsync/ops/runner.py`
|
|
- queue scheduler, command polling, pause/resume, and crash recovery
|
|
- `musicdl/catalogsync/ops/web.py`
|
|
- FastAPI app factory, API routes, page routes, and SSE stream
|
|
- `musicdl/catalogsync/templates/ops/base.html`
|
|
- shared layout
|
|
- `musicdl/catalogsync/templates/ops/dashboard.html`
|
|
- dashboard page
|
|
- `musicdl/catalogsync/templates/ops/jobs.html`
|
|
- queue and active-job page
|
|
- `musicdl/catalogsync/templates/ops/job_detail.html`
|
|
- per-job detail page
|
|
- `musicdl/catalogsync/templates/ops/playlists.html`
|
|
- playlist-pool and playlist status page
|
|
- `musicdl/catalogsync/templates/ops/songs.html`
|
|
- worker and song-processing page
|
|
- `musicdl/catalogsync/templates/ops/logs.html`
|
|
- log and exception page
|
|
- `musicdl/catalogsync/templates/ops/config.html`
|
|
- env editor and revision page
|
|
- `musicdl/catalogsync/static/ops/app.js`
|
|
- lightweight browser logic for SSE updates and page actions
|
|
- `scripts/catalogsync/templates/serve_console.sh`
|
|
- NAS runtime script for the web console
|
|
- `tests/catalogsync/test_ops_db.py`
|
|
- schema and repository coverage for operations tables
|
|
- `tests/catalogsync/test_ops_config.py`
|
|
- env loading, snapshot, revision, and apply tests
|
|
- `tests/catalogsync/test_ops_runner.py`
|
|
- state-machine, pause/resume, and recovery tests
|
|
- `tests/catalogsync/test_ops_executors.py`
|
|
- per-stage executor tests
|
|
- `tests/catalogsync/test_ops_api.py`
|
|
- FastAPI route and SSE tests
|
|
|
|
### Modified files
|
|
|
|
- `musicdl/catalogsync/db.py`
|
|
- create the new operations tables
|
|
- `musicdl/catalogsync/repository.py`
|
|
- add query helpers that the operations layer can reuse from catalog data
|
|
- `musicdl/catalogsync/services.py`
|
|
- expose a playlist-row based sync unit for the runner
|
|
- `musicdl/catalogsync/downloader.py`
|
|
- expose song-level download execution for one queued item
|
|
- `musicdl/catalogsync/uploader.py`
|
|
- expose upload-task-level execution for one queued item
|
|
- `musicdl/catalogsync/cli.py`
|
|
- add the `serve` command
|
|
- `musicdl/catalogsync/runtime.py`
|
|
- add web host, port, and config-path runtime fields
|
|
- `scripts/catalogsync/templates/catalogsync.env.example`
|
|
- add web-console settings
|
|
- `scripts/catalogsync/templates/install_runtime.sh`
|
|
- install any new web-console dependencies
|
|
- `docs/catalogsync.md`
|
|
- document the console workflow and runtime script
|
|
- `README.md`
|
|
- add a concise operations-console entry
|
|
- `requirements.txt`
|
|
- add the web-console runtime dependencies
|
|
- `setup.py`
|
|
- include templates/static assets in packaging
|
|
- `MANIFEST.in`
|
|
- include template/static files for source distributions
|
|
- `tests/catalogsync/test_cli.py`
|
|
- cover the new `serve` command
|
|
- `tests/catalogsync/test_runtime.py`
|
|
- cover the new runtime config fields
|
|
|
|
### Dependency notes
|
|
|
|
- Add `fastapi`
|
|
- Add `uvicorn`
|
|
- Add `jinja2`
|
|
- Add `python-multipart`
|
|
|
|
These should be added only once and then reused by the `serve` command, tests, and NAS runtime script.
|
|
|
|
### Task 1: Add Operations Schema And Repository Primitives
|
|
|
|
**Files:**
|
|
- Create: `musicdl/catalogsync/ops/__init__.py`
|
|
- Create: `musicdl/catalogsync/ops/models.py`
|
|
- Create: `musicdl/catalogsync/ops/repository.py`
|
|
- Modify: `musicdl/catalogsync/db.py`
|
|
- Test: `tests/catalogsync/test_ops_db.py`
|
|
|
|
- [ ] **Step 1: Write the failing schema and repository tests**
|
|
|
|
```python
|
|
import sqlite3
|
|
import tempfile
|
|
import unittest
|
|
from contextlib import closing
|
|
from pathlib import Path
|
|
|
|
|
|
class OperationsSchemaTests(unittest.TestCase):
|
|
def test_initialize_database_creates_operations_tables(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
|
|
expected_tables = {
|
|
"job_runs",
|
|
"job_stages",
|
|
"job_items",
|
|
"job_workers",
|
|
"job_commands",
|
|
"job_events",
|
|
"job_logs",
|
|
"config_revisions",
|
|
}
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
initialize_database(db_path).close()
|
|
with closing(sqlite3.connect(db_path)) as conn:
|
|
tables = {
|
|
row[0]
|
|
for row in conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type = 'table'"
|
|
).fetchall()
|
|
}
|
|
|
|
self.assertTrue(expected_tables.issubset(tables))
|
|
```
|
|
|
|
- [ ] **Step 2: Run the targeted test and verify it fails**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_db -v`
|
|
|
|
Expected:
|
|
|
|
- `FAIL` because the operations tables and repository module do not exist yet
|
|
|
|
- [ ] **Step 3: Implement the schema, enums, and repository**
|
|
|
|
```python
|
|
# musicdl/catalogsync/db.py
|
|
REQUIRED_TABLES |= {
|
|
"job_runs",
|
|
"job_stages",
|
|
"job_items",
|
|
"job_workers",
|
|
"job_commands",
|
|
"job_events",
|
|
"job_logs",
|
|
"config_revisions",
|
|
}
|
|
```
|
|
|
|
```python
|
|
# musicdl/catalogsync/ops/models.py
|
|
from dataclasses import dataclass
|
|
from enum import StrEnum
|
|
|
|
|
|
class JobStatus(StrEnum):
|
|
QUEUED = "queued"
|
|
RUNNING = "running"
|
|
PAUSE_REQUESTED = "pause_requested"
|
|
PAUSED = "paused"
|
|
COMPLETED = "completed"
|
|
COMPLETED_WITH_ERRORS = "completed_with_errors"
|
|
FAILED = "failed"
|
|
CANCELED = "canceled"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class JobCreateRequest:
|
|
job_type: str
|
|
config_snapshot: dict
|
|
sources: list[str] | None = None
|
|
```
|
|
|
|
```python
|
|
# musicdl/catalogsync/ops/repository.py
|
|
class OperationsRepository:
|
|
def create_job_run(self, job_type: str, config_snapshot: dict, sources=None, download_sources=None, playlist_scope=None) -> int:
|
|
with connect_database(self.db_path) as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
INSERT INTO job_runs (job_type, config_snapshot_json, sources, download_sources, playlist_scope_json)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
job_type,
|
|
json.dumps(config_snapshot, ensure_ascii=False),
|
|
",".join(sources or []),
|
|
",".join(download_sources or []),
|
|
json.dumps(playlist_scope or {}, ensure_ascii=False),
|
|
),
|
|
)
|
|
return int(cursor.lastrowid)
|
|
|
|
def create_job_stage(self, job_id: int, stage_type: str, seq_no: int) -> int:
|
|
with connect_database(self.db_path) as conn:
|
|
cursor = conn.execute(
|
|
"INSERT INTO job_stages (job_run_id, stage_type, seq_no) VALUES (?, ?, ?)",
|
|
(job_id, stage_type, seq_no),
|
|
)
|
|
return int(cursor.lastrowid)
|
|
|
|
def create_job_item(self, job_stage_id: int, item_type: str, item_key: str, **extra) -> int:
|
|
with connect_database(self.db_path) as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
INSERT INTO job_items (job_stage_id, item_type, item_key, playlist_pool_id, playlist_id, song_id, file_location_id, payload_json)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
job_stage_id,
|
|
item_type,
|
|
item_key,
|
|
extra.get("playlist_pool_id"),
|
|
extra.get("playlist_id"),
|
|
extra.get("song_id"),
|
|
extra.get("file_location_id"),
|
|
json.dumps(extra.get("payload_json") or {}, ensure_ascii=False),
|
|
),
|
|
)
|
|
return int(cursor.lastrowid)
|
|
```
|
|
|
|
- [ ] **Step 4: Run the targeted test and verify it passes**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_db -v`
|
|
|
|
Expected:
|
|
|
|
- `OK`
|
|
- the test output shows the new operations tables are created
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add musicdl/catalogsync/db.py musicdl/catalogsync/ops/__init__.py musicdl/catalogsync/ops/models.py musicdl/catalogsync/ops/repository.py tests/catalogsync/test_ops_db.py
|
|
git commit -m "feat: add operations schema and repository primitives"
|
|
```
|
|
|
|
### Task 2: Add Env Revision Management And Job Config Snapshots
|
|
|
|
**Files:**
|
|
- Create: `musicdl/catalogsync/ops/config.py`
|
|
- Modify: `musicdl/catalogsync/ops/repository.py`
|
|
- Test: `tests/catalogsync/test_ops_config.py`
|
|
|
|
- [ ] **Step 1: Write the failing config and revision tests**
|
|
|
|
```python
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
|
|
class EnvManagerTests(unittest.TestCase):
|
|
def test_load_snapshot_and_save_revision(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.config import CatalogsyncEnvManager
|
|
from musicdl.catalogsync.ops.repository import OperationsRepository
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
env_path = Path(tmpdir) / "catalogsync.env"
|
|
env_path.write_text(
|
|
"LIBRARY_DIR=/volume4/Music_Cloud/library\nDOWNLOAD_SOURCES=qq,kuwo,migu\n",
|
|
encoding="utf-8",
|
|
)
|
|
initialize_database(db_path).close()
|
|
repo = OperationsRepository(db_path)
|
|
manager = CatalogsyncEnvManager(env_path=env_path, repository=repo)
|
|
|
|
snapshot = manager.build_job_snapshot()
|
|
revision_id = manager.save_revision(note="initial import")
|
|
revisions = manager.list_revisions()
|
|
|
|
self.assertEqual(["qq", "kuwo", "migu"], snapshot["download_sources"])
|
|
self.assertEqual(revision_id, revisions[0]["id"])
|
|
```
|
|
|
|
- [ ] **Step 2: Run the targeted test and verify it fails**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_config -v`
|
|
|
|
Expected:
|
|
|
|
- `FAIL` because the env manager and config revision methods do not exist yet
|
|
|
|
- [ ] **Step 3: Implement the env manager and revision methods**
|
|
|
|
```python
|
|
# musicdl/catalogsync/ops/config.py
|
|
class CatalogsyncEnvManager:
|
|
SNAPSHOT_KEYS = {
|
|
"DB_PATH",
|
|
"LIBRARY_DIR",
|
|
"DOWNLOAD_SOURCES",
|
|
"OBJECT_BACKEND_NAME",
|
|
"OBJECT_BUCKET",
|
|
"OBJECT_ENDPOINT",
|
|
}
|
|
|
|
def load_current(self) -> dict[str, str]:
|
|
values: dict[str, str] = {}
|
|
for line in self.env_path.read_text(encoding="utf-8").splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, value = line.split("=", 1)
|
|
values[key.strip()] = value.strip()
|
|
return values
|
|
|
|
def build_job_snapshot(self) -> dict:
|
|
current = self.load_current()
|
|
return {
|
|
"library_root": current.get("LIBRARY_DIR", ""),
|
|
"download_sources": [item for item in current.get("DOWNLOAD_SOURCES", "").split(",") if item],
|
|
"env_values": {key: current[key] for key in self.SNAPSHOT_KEYS if key in current},
|
|
}
|
|
|
|
def save_revision(self, note: str | None = None) -> int:
|
|
content = self.env_path.read_text(encoding="utf-8")
|
|
digest = hashlib.sha256(content.encode("utf-8")).hexdigest()
|
|
return self.repository.create_config_revision(
|
|
file_path=str(self.env_path),
|
|
content_text=content,
|
|
content_hash=digest,
|
|
note=note,
|
|
)
|
|
|
|
def apply_revision(self, revision_id: int) -> None:
|
|
row = self.repository.get_config_revision(revision_id)
|
|
self.env_path.write_text(row["content_text"], encoding="utf-8")
|
|
self.repository.mark_config_revision_applied(revision_id)
|
|
```
|
|
|
|
```python
|
|
# musicdl/catalogsync/ops/repository.py
|
|
def create_config_revision(self, file_path: str, content_text: str, content_hash: str, note: str | None = None) -> int:
|
|
with connect_database(self.db_path) as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
INSERT INTO config_revisions (file_path, content_text, content_hash, note)
|
|
VALUES (?, ?, ?, ?)
|
|
""",
|
|
(file_path, content_text, content_hash, note),
|
|
)
|
|
return int(cursor.lastrowid)
|
|
|
|
def get_config_revision(self, revision_id: int):
|
|
with connect_database(self.db_path) as conn:
|
|
return conn.execute("SELECT * FROM config_revisions WHERE id = ?", (revision_id,)).fetchone()
|
|
```
|
|
|
|
- [ ] **Step 4: Run the targeted test and verify it passes**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_config -v`
|
|
|
|
Expected:
|
|
|
|
- `OK`
|
|
- revision apply rewrites the env file and marks the revision as applied
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add musicdl/catalogsync/ops/config.py musicdl/catalogsync/ops/repository.py tests/catalogsync/test_ops_config.py
|
|
git commit -m "feat: add env revision and job snapshot management"
|
|
```
|
|
|
|
### Task 3: Implement The Runner State Machine And Recovery Logic
|
|
|
|
**Files:**
|
|
- Create: `musicdl/catalogsync/ops/runner.py`
|
|
- Modify: `musicdl/catalogsync/ops/models.py`
|
|
- Modify: `musicdl/catalogsync/ops/repository.py`
|
|
- Test: `tests/catalogsync/test_ops_runner.py`
|
|
|
|
- [ ] **Step 1: Write the failing runner-state tests**
|
|
|
|
```python
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
|
|
class RunnerStateTests(unittest.TestCase):
|
|
def test_recover_orphaned_jobs_converts_running_items_to_interrupted(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.repository import OperationsRepository
|
|
from musicdl.catalogsync.ops.runner import CatalogsyncRunner
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
initialize_database(db_path).close()
|
|
repo = OperationsRepository(db_path)
|
|
runner = CatalogsyncRunner(db_path=db_path, env_path=Path(tmpdir) / "catalogsync.env")
|
|
|
|
job_id = repo.create_job_run("download_only", {"library_root": "/tmp/library"})
|
|
stage_id = repo.create_job_stage(job_id, "download", 1)
|
|
item_id = repo.create_job_item(stage_id, "song", "song:1", song_id=1)
|
|
repo.mark_job_running(job_id)
|
|
repo.mark_stage_running(stage_id)
|
|
repo.mark_item_running(item_id, worker_id=1)
|
|
|
|
runner.recover_incomplete_jobs()
|
|
|
|
job = repo.get_job_run(job_id)
|
|
item = repo.get_job_item(item_id)
|
|
|
|
self.assertEqual("paused", job["status"])
|
|
self.assertEqual("interrupted", item["status"])
|
|
```
|
|
|
|
- [ ] **Step 2: Run the targeted test and verify it fails**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_runner -v`
|
|
|
|
Expected:
|
|
|
|
- `FAIL` because the runner, command flow, and recovery helpers do not exist yet
|
|
|
|
- [ ] **Step 3: Implement the runner, command polling, and recovery**
|
|
|
|
```python
|
|
# musicdl/catalogsync/ops/runner.py
|
|
class CatalogsyncRunner:
|
|
def recover_incomplete_jobs(self) -> None:
|
|
for job in self.repository.list_recoverable_jobs():
|
|
self.repository.pause_job_for_recovery(int(job["id"]))
|
|
for item in self.repository.list_running_items(int(job["id"])):
|
|
self.repository.mark_item_interrupted(int(item["id"]), last_error="Runner restarted during execution")
|
|
self.repository.add_job_event(int(job["id"]), "recovery_requeued", "Recovered job after runner restart")
|
|
|
|
def apply_pending_commands(self) -> None:
|
|
for command in self.repository.list_pending_commands():
|
|
if command["command_type"] == "pause":
|
|
self.repository.request_job_pause(int(command["job_run_id"]))
|
|
elif command["command_type"] == "resume":
|
|
self.repository.resume_job(int(command["job_run_id"]))
|
|
elif command["command_type"] == "retry_item":
|
|
self.repository.requeue_item(int(command["target_item_id"]), force=False)
|
|
elif command["command_type"] == "force_retry_item":
|
|
self.repository.requeue_item(int(command["target_item_id"]), force=True)
|
|
self.repository.mark_command_applied(int(command["id"]))
|
|
|
|
def reconcile_pause_state(self, job_id: int) -> None:
|
|
if self.repository.job_has_running_items(job_id):
|
|
return
|
|
self.repository.finalize_pause(job_id)
|
|
|
|
def loop_once(self) -> None:
|
|
self.apply_pending_commands()
|
|
active_job = self.repository.claim_next_runnable_job()
|
|
if active_job is None:
|
|
return
|
|
self.repository.mark_job_running(int(active_job["id"]))
|
|
```
|
|
|
|
```python
|
|
# musicdl/catalogsync/ops/repository.py
|
|
def request_job_pause(self, job_id: int) -> None:
|
|
with connect_database(self.db_path) as conn:
|
|
conn.execute("UPDATE job_runs SET status = 'pause_requested' WHERE id = ?", (job_id,))
|
|
conn.execute(
|
|
"UPDATE job_stages SET status = 'pause_requested' WHERE job_run_id = ? AND status = 'running'",
|
|
(job_id,),
|
|
)
|
|
|
|
def pause_job_for_recovery(self, job_id: int) -> None:
|
|
with connect_database(self.db_path) as conn:
|
|
conn.execute("UPDATE job_runs SET status = 'paused' WHERE id = ?", (job_id,))
|
|
conn.execute(
|
|
"UPDATE job_stages SET status = 'paused' WHERE job_run_id = ? AND status IN ('running', 'pause_requested')",
|
|
(job_id,),
|
|
)
|
|
|
|
def mark_item_interrupted(self, item_id: int, last_error: str | None = None) -> None:
|
|
with connect_database(self.db_path) as conn:
|
|
conn.execute(
|
|
"UPDATE job_items SET status = 'interrupted', worker_id = NULL, ended_at = CURRENT_TIMESTAMP, last_error = ? WHERE id = ?",
|
|
(last_error, item_id),
|
|
)
|
|
```
|
|
|
|
- [ ] **Step 4: Run the targeted test and verify it passes**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_runner -v`
|
|
|
|
Expected:
|
|
|
|
- `OK`
|
|
- orphaned running items become `interrupted`
|
|
- soft pause waits until no item remains running before closing the stage
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add musicdl/catalogsync/ops/runner.py musicdl/catalogsync/ops/models.py musicdl/catalogsync/ops/repository.py tests/catalogsync/test_ops_runner.py
|
|
git commit -m "feat: add operations runner state machine and recovery"
|
|
```
|
|
|
|
### Task 4: Add Stage Executors And Single-Item Execution Hooks
|
|
|
|
**Files:**
|
|
- Create: `musicdl/catalogsync/ops/executors.py`
|
|
- Modify: `musicdl/catalogsync/services.py`
|
|
- Modify: `musicdl/catalogsync/downloader.py`
|
|
- Modify: `musicdl/catalogsync/uploader.py`
|
|
- Modify: `musicdl/catalogsync/ops/repository.py`
|
|
- Test: `tests/catalogsync/test_ops_executors.py`
|
|
|
|
- [ ] **Step 1: Write the failing executor integration tests**
|
|
|
|
```python
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
|
|
class StageExecutorTests(unittest.TestCase):
|
|
def test_download_executor_marks_item_succeeded(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
|
|
from musicdl.catalogsync.ops.repository import OperationsRepository
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
library_root = Path(tmpdir) / "library"
|
|
initialize_database(db_path, default_library_root=library_root).close()
|
|
repo = OperationsRepository(db_path)
|
|
stage_id = repo.create_job_stage(repo.create_job_run("download_only", {"library_root": str(library_root)}), "download", 1)
|
|
item_id = repo.create_job_item(stage_id, "song", "song:1", song_id=1, payload_json={"row": {"id": 1, "platform": "qq"}})
|
|
|
|
executor = DownloadStageExecutor(db_path=db_path, library_root=library_root, download_sources=["qq"])
|
|
with patch("musicdl.catalogsync.downloader.CatalogDownloader.download_song_row", return_value=True):
|
|
executor.process_item(item_id=item_id, worker_name="download-1")
|
|
|
|
item = repo.get_job_item(item_id)
|
|
|
|
self.assertEqual("succeeded", item["status"])
|
|
```
|
|
|
|
- [ ] **Step 2: Run the targeted test and verify it fails**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_executors -v`
|
|
|
|
Expected:
|
|
|
|
- `FAIL` because the executor layer and single-item helpers do not exist yet
|
|
|
|
- [ ] **Step 3: Implement executor adapters and expose item-level hooks**
|
|
|
|
```python
|
|
# musicdl/catalogsync/downloader.py
|
|
class CatalogDownloader:
|
|
def download_song_row(
|
|
self,
|
|
row: dict,
|
|
library_root: str | Path,
|
|
download_sources: list[str] | None = None,
|
|
worker_callback=None,
|
|
) -> bool:
|
|
default_root = Path(library_root).resolve()
|
|
if worker_callback:
|
|
worker_callback(
|
|
current_song_id=int(row["id"]),
|
|
current_playlist_id=row.get("playlist_id"),
|
|
current_display_text=f'{row.get("name", row["id"])} / {row.get("singers", "")}'.strip(" /"),
|
|
)
|
|
return self._download_one(row=row, default_root=default_root, download_sources=download_sources)
|
|
```
|
|
|
|
```python
|
|
# musicdl/catalogsync/uploader.py
|
|
class CatalogUploader:
|
|
def process_upload_task_row(self, task_row, backend_name: str) -> str:
|
|
backend = self.get_backend(backend_name)
|
|
return self._process_task(task_row, backend, uploader=None)
|
|
```
|
|
|
|
```python
|
|
# musicdl/catalogsync/services.py
|
|
class CatalogSyncService:
|
|
def sync_playlist_row(self, playlist_row) -> int:
|
|
song_infos = self.resolve_playlist_song_infos(playlist_row)
|
|
source_pool_ids = self.repository.get_pool_ids_for_playlist(int(playlist_row["id"]))
|
|
linked_count = 0
|
|
for source_pool_id in source_pool_ids:
|
|
linked_count += self.store_playlist_songs(
|
|
playlist_id=int(playlist_row["id"]),
|
|
source_pool_id=source_pool_id,
|
|
song_infos=song_infos,
|
|
)
|
|
return linked_count
|
|
```
|
|
|
|
```python
|
|
# musicdl/catalogsync/ops/executors.py
|
|
class DownloadStageExecutor:
|
|
def process_item(self, item_id: int, worker_name: str) -> None:
|
|
item = self.ops_repo.claim_item(item_id=item_id, worker_name=worker_name)
|
|
row = self.ops_repo.build_download_row(item_id)
|
|
ok = self.downloader.download_song_row(
|
|
row=row,
|
|
library_root=self.library_root,
|
|
download_sources=self.download_sources,
|
|
worker_callback=lambda **state: self.ops_repo.update_worker_state(worker_name=worker_name, **state),
|
|
)
|
|
if ok:
|
|
self.ops_repo.mark_item_succeeded(item_id)
|
|
else:
|
|
self.ops_repo.mark_item_failed(item_id, "download returned no file")
|
|
|
|
|
|
class SyncStageExecutor:
|
|
def process_item(self, item_id: int, worker_name: str) -> None:
|
|
item = self.ops_repo.claim_item(item_id=item_id, worker_name=worker_name)
|
|
playlist_row = self.ops_repo.get_playlist_row_for_item(item_id)
|
|
linked_count = self.service.sync_playlist_row(playlist_row)
|
|
self.ops_repo.mark_item_succeeded(item_id, result_payload={"linked_count": linked_count})
|
|
|
|
|
|
class UploadStageExecutor:
|
|
def process_item(self, item_id: int, worker_name: str) -> None:
|
|
item = self.ops_repo.claim_item(item_id=item_id, worker_name=worker_name)
|
|
upload_row = self.ops_repo.get_upload_row_for_item(item_id)
|
|
result = self.uploader.process_upload_task_row(upload_row, backend_name=self.backend_name)
|
|
if result == "succeeded":
|
|
self.ops_repo.mark_item_succeeded(item_id)
|
|
else:
|
|
self.ops_repo.mark_item_failed(item_id, f"upload result: {result}")
|
|
```
|
|
|
|
- [ ] **Step 4: Run the targeted test and verify it passes**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_executors -v`
|
|
|
|
Expected:
|
|
|
|
- `OK`
|
|
- download, sync, and upload work items can be processed one at a time
|
|
- the item records and worker state update correctly
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add musicdl/catalogsync/downloader.py musicdl/catalogsync/uploader.py musicdl/catalogsync/services.py musicdl/catalogsync/ops/executors.py musicdl/catalogsync/ops/repository.py tests/catalogsync/test_ops_executors.py
|
|
git commit -m "feat: add stage executors for operations console"
|
|
```
|
|
|
|
### Task 5: Build The FastAPI UI And Management API
|
|
|
|
**Files:**
|
|
- Create: `musicdl/catalogsync/ops/web.py`
|
|
- Create: `musicdl/catalogsync/templates/ops/base.html`
|
|
- Create: `musicdl/catalogsync/templates/ops/dashboard.html`
|
|
- Create: `musicdl/catalogsync/templates/ops/jobs.html`
|
|
- Create: `musicdl/catalogsync/templates/ops/job_detail.html`
|
|
- Create: `musicdl/catalogsync/templates/ops/playlists.html`
|
|
- Create: `musicdl/catalogsync/templates/ops/songs.html`
|
|
- Create: `musicdl/catalogsync/templates/ops/logs.html`
|
|
- Create: `musicdl/catalogsync/templates/ops/config.html`
|
|
- Create: `musicdl/catalogsync/static/ops/app.js`
|
|
- Modify: `setup.py`
|
|
- Modify: `MANIFEST.in`
|
|
- Test: `tests/catalogsync/test_ops_api.py`
|
|
|
|
- [ ] **Step 1: Write the failing API and page tests**
|
|
|
|
```python
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
from fastapi.testclient import TestClient
|
|
|
|
|
|
class OperationsApiTests(unittest.TestCase):
|
|
def test_dashboard_and_jobs_endpoints_render(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.web import create_app
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
env_path = Path(tmpdir) / "catalogsync.env"
|
|
env_path.write_text("LIBRARY_DIR=/volume4/Music_Cloud/library\n", encoding="utf-8")
|
|
initialize_database(db_path).close()
|
|
client = TestClient(create_app(db_path=db_path, env_path=env_path))
|
|
|
|
dashboard = client.get("/dashboard")
|
|
jobs = client.get("/api/jobs")
|
|
|
|
self.assertEqual(200, dashboard.status_code)
|
|
self.assertEqual(200, jobs.status_code)
|
|
```
|
|
|
|
- [ ] **Step 2: Run the targeted test and verify it fails**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_api -v`
|
|
|
|
Expected:
|
|
|
|
- `FAIL` because the FastAPI app, templates, and API endpoints do not exist yet
|
|
|
|
- [ ] **Step 3: Implement the FastAPI app, pages, APIs, and SSE**
|
|
|
|
```python
|
|
# musicdl/catalogsync/ops/web.py
|
|
from fastapi import FastAPI, Request
|
|
from fastapi.responses import HTMLResponse, StreamingResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
|
|
|
|
def create_app(db_path: str | Path, env_path: str | Path) -> FastAPI:
|
|
app = FastAPI(title="Catalogsync Operations Console")
|
|
repo = OperationsRepository(db_path)
|
|
env_manager = CatalogsyncEnvManager(env_path=env_path, repository=repo)
|
|
templates = Jinja2Templates(directory=str(Path(__file__).resolve().parents[1] / "templates"))
|
|
app.mount("/static", StaticFiles(directory=str(Path(__file__).resolve().parents[1] / "static")), name="static")
|
|
@app.get("/dashboard", response_class=HTMLResponse)
|
|
def dashboard(request: Request):
|
|
return templates.TemplateResponse(
|
|
request,
|
|
"ops/dashboard.html",
|
|
{"title": "总览", "summary": repo.get_dashboard_summary()},
|
|
)
|
|
|
|
@app.get("/api/jobs")
|
|
def api_jobs():
|
|
return {"items": repo.list_jobs()}
|
|
|
|
@app.get("/api/events/stream")
|
|
def api_events():
|
|
def event_stream():
|
|
while True:
|
|
yield f"data: {json.dumps(repo.get_live_snapshot(), ensure_ascii=False)}\n\n"
|
|
return StreamingResponse(event_stream(), media_type="text/event-stream")
|
|
```
|
|
|
|
```html
|
|
<!-- musicdl/catalogsync/templates/ops/base.html -->
|
|
<!DOCTYPE html>
|
|
<html lang="zh-CN">
|
|
<head>
|
|
<meta charset="utf-8">
|
|
<title>{{ title or "Catalogsync Console" }}</title>
|
|
<script src="/static/ops/app.js" defer></script>
|
|
</head>
|
|
<body data-sse-url="/api/events/stream">
|
|
<nav>
|
|
<a href="/dashboard">总览</a>
|
|
<a href="/jobs">任务中心</a>
|
|
<a href="/playlists">歌单池</a>
|
|
<a href="/songs">歌曲处理</a>
|
|
<a href="/logs">日志异常</a>
|
|
<a href="/config">配置管理</a>
|
|
</nav>
|
|
{% block content %}{% endblock %}
|
|
</body>
|
|
</html>
|
|
```
|
|
|
|
```text
|
|
# MANIFEST.in
|
|
recursive-include musicdl/catalogsync/templates/ops *.html
|
|
recursive-include musicdl/catalogsync/static/ops *.js
|
|
```
|
|
|
|
- [ ] **Step 4: Run the targeted test and verify it passes**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_api -v`
|
|
|
|
Expected:
|
|
|
|
- `OK`
|
|
- page routes render
|
|
- queue-control and config endpoints return the expected status codes
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add musicdl/catalogsync/ops/web.py musicdl/catalogsync/templates/ops/base.html musicdl/catalogsync/templates/ops/dashboard.html musicdl/catalogsync/templates/ops/jobs.html musicdl/catalogsync/templates/ops/job_detail.html musicdl/catalogsync/templates/ops/playlists.html musicdl/catalogsync/templates/ops/songs.html musicdl/catalogsync/templates/ops/logs.html musicdl/catalogsync/templates/ops/config.html musicdl/catalogsync/static/ops/app.js setup.py MANIFEST.in tests/catalogsync/test_ops_api.py
|
|
git commit -m "feat: add operations console web app"
|
|
```
|
|
|
|
### Task 6: Wire The CLI, Runtime Scripts, Docs, And Final Verification
|
|
|
|
**Files:**
|
|
- Modify: `musicdl/catalogsync/cli.py`
|
|
- Modify: `musicdl/catalogsync/runtime.py`
|
|
- Modify: `requirements.txt`
|
|
- Modify: `setup.py`
|
|
- Modify: `scripts/catalogsync/templates/catalogsync.env.example`
|
|
- Modify: `scripts/catalogsync/templates/install_runtime.sh`
|
|
- Create: `scripts/catalogsync/templates/serve_console.sh`
|
|
- Modify: `docs/catalogsync.md`
|
|
- Modify: `README.md`
|
|
- Modify: `tests/catalogsync/test_cli.py`
|
|
- Modify: `tests/catalogsync/test_runtime.py`
|
|
|
|
- [ ] **Step 1: Write the failing CLI and runtime tests**
|
|
|
|
```python
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
from click.testing import CliRunner
|
|
|
|
|
|
class CatalogConsoleCliTests(unittest.TestCase):
|
|
def test_serve_command_builds_web_app(self):
|
|
from musicdl.catalogsync.cli import cli
|
|
|
|
runner = CliRunner()
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
env_path = Path(tmpdir) / "catalogsync.env"
|
|
env_path.write_text("LIBRARY_DIR=/volume4/Music_Cloud/library\n", encoding="utf-8")
|
|
|
|
with patch("musicdl.catalogsync.cli.uvicorn.run") as uvicorn_run:
|
|
result = runner.invoke(
|
|
cli,
|
|
[
|
|
"serve",
|
|
"--db",
|
|
str(db_path),
|
|
"--env-file",
|
|
str(env_path),
|
|
"--host",
|
|
"0.0.0.0",
|
|
"--port",
|
|
"8421",
|
|
],
|
|
)
|
|
|
|
self.assertEqual(0, result.exit_code, msg=result.output)
|
|
uvicorn_run.assert_called_once()
|
|
```
|
|
|
|
- [ ] **Step 2: Run the targeted tests and verify they fail**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_cli tests.catalogsync.test_runtime -v`
|
|
|
|
Expected:
|
|
|
|
- `FAIL` because the `serve` command and web runtime fields do not exist yet
|
|
|
|
- [ ] **Step 3: Implement the serve command, runtime fields, dependencies, and NAS script**
|
|
|
|
```python
|
|
# musicdl/catalogsync/runtime.py
|
|
@dataclass
|
|
class CatalogSyncRuntimeConfig:
|
|
root_dir: Path
|
|
app_home: Path
|
|
library_dir: Path
|
|
db_path: Path
|
|
input_dir: Path
|
|
log_dir: Path
|
|
python_bin: str
|
|
venv_dir: Path
|
|
download_layout: str
|
|
env_file: Path
|
|
web_host: str
|
|
web_port: int
|
|
```
|
|
|
|
```python
|
|
# musicdl/catalogsync/cli.py
|
|
@cli.command("serve")
|
|
@click.option("--db", "db_path", required=True, type=click.Path(dir_okay=False))
|
|
@click.option("--env-file", required=True, type=click.Path(dir_okay=False, exists=True))
|
|
@click.option("--host", default="0.0.0.0", show_default=True)
|
|
@click.option("--port", default=8421, type=int, show_default=True)
|
|
def serve_command(db_path: str, env_file: str, host: str, port: int):
|
|
import uvicorn
|
|
from .ops.web import create_app
|
|
|
|
app = create_app(db_path=db_path, env_path=env_file)
|
|
uvicorn.run(app, host=host, port=port)
|
|
```
|
|
|
|
```bash
|
|
# scripts/catalogsync/templates/serve_console.sh
|
|
"${VENV_DIR}/bin/python" -m musicdl.catalogsync.cli serve \
|
|
--db "${DB_PATH}" \
|
|
--env-file "${ENV_FILE:-${CONFIG_FILE}}" \
|
|
--host "${WEB_HOST:-0.0.0.0}" \
|
|
--port "${WEB_PORT:-8421}"
|
|
```
|
|
|
|
- [ ] **Step 4: Update docs and runtime templates**
|
|
|
|
```text
|
|
# scripts/catalogsync/templates/catalogsync.env.example
|
|
ENV_FILE=/volume4/Music_Cloud/catalogsync/config/catalogsync.env
|
|
WEB_HOST=0.0.0.0
|
|
WEB_PORT=8421
|
|
```
|
|
|
|
Document in:
|
|
|
|
- `docs/catalogsync.md`
|
|
- `README.md`
|
|
|
|
- [ ] **Step 5: Run the full verification suite**
|
|
|
|
Run: `python -m unittest discover -s tests/catalogsync -v`
|
|
|
|
Expected:
|
|
|
|
- `OK`
|
|
- all previous catalogsync tests still pass
|
|
- the new operations-console tests pass
|
|
|
|
Run: `python -m musicdl.catalogsync.cli serve --help`
|
|
|
|
Expected:
|
|
|
|
- help output lists `--db`, `--env-file`, `--host`, and `--port`
|
|
|
|
- [ ] **Step 6: Commit**
|
|
|
|
```bash
|
|
git add musicdl/catalogsync/cli.py musicdl/catalogsync/runtime.py requirements.txt setup.py scripts/catalogsync/templates/catalogsync.env.example scripts/catalogsync/templates/install_runtime.sh scripts/catalogsync/templates/serve_console.sh docs/catalogsync.md README.md tests/catalogsync/test_cli.py tests/catalogsync/test_runtime.py
|
|
git commit -m "feat: ship operations console runtime and docs"
|
|
```
|