# Task Center Download Lanes Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Turn the operations console into a real task center with lane-aware scheduling, playlist-bulk sync, and live download throughput for running download jobs. **Architecture:** Move job-type rules into one shared `ops` module, extend the SQLite-backed operations repository with dashboard-ready task summaries and worker throughput fields, and refactor `OpsRunner` into a two-lane supervisor that can keep one download-class job plus several general jobs alive concurrently. Keep the existing collect/sync/download/upload stage executors intact, reuse `/api/jobs/{id}` as the source of expanded row detail, and compute download speed from file-growth monitoring inside `catalogsync` rather than parsing terminal text. **Tech Stack:** Python 3, unittest, SQLite, FastAPI, Jinja2, vanilla JavaScript, ThreadPoolExecutor, existing `musicdl.catalogsync` repositories and downloader. --- ## File Structure ### New files - `musicdl/catalogsync/ops/jobdefs.py` - canonical job-stage sequences, lane classification, and user-facing job labels shared by runner, repository, and web layers - `tests/catalogsync/test_ops_repository.py` - repository-level coverage for task-center summaries, queue labels, scope text, and worker speed aggregation ### Modified files - `musicdl/catalogsync/db.py` - add `job_workers` throughput columns and an idempotent column-upgrade helper for existing NAS databases - `musicdl/catalogsync/ops/repository.py` - add queued/active job queries, CAS job claiming by id, task-center summary projection, worker throughput persistence, and worker cleanup - `musicdl/catalogsync/ops/runner.py` - replace the single-active-job loop with a lane-aware supervisor and per-job execution futures - `musicdl/catalogsync/ops/web.py` - expose task-center payloads, playlist bulk sync, and dashboard/task summary serialization - `musicdl/catalogsync/templates/ops/dashboard.html` - replace the old active-job/recent-jobs split with a single inline-expandable task table - `musicdl/catalogsync/templates/ops/playlists.html` - add `sync selected playlists` and light `0 songs, sync recommended` hints - `musicdl/catalogsync/templates/ops/jobs.html` - simplify the fallback archive page now that `Dashboard` is the main operator surface - `musicdl/catalogsync/templates/ops/job_detail.html` - keep the deep-link troubleshooting page aligned with the new task-center wording and metrics - `musicdl/catalogsync/static/ops/app.js` - render task-center rows, keep expanded rows refreshed, send compact job commands, and call the new playlist sync endpoint - `musicdl/catalogsync/downloader.py` - monitor file growth during downloads and emit structured throughput updates back to the ops layer - `musicdl/catalogsync/ops/executors.py` - pass structured worker progress from downloader callbacks into `OpsRepository.update_worker_state` - `tests/catalogsync/test_db.py` - verify column-upgrade behavior for `job_workers` - `tests/catalogsync/test_ops_runner.py` - prove download-lane serialization and general-lane concurrency - `tests/catalogsync/test_ops_api.py` - verify new API payload fields, dashboard markup, playlist bulk sync, and speed rendering - `docs/catalogsync.md` - document the task center, lane semantics, playlist sync action, and speed display rules ## Task 1: Add Shared Job Definitions, Worker Throughput Schema, And Task Summary Queries **Files:** - Create: `musicdl/catalogsync/ops/jobdefs.py` - Modify: `musicdl/catalogsync/db.py` - Modify: `musicdl/catalogsync/ops/repository.py` - Modify: `tests/catalogsync/test_db.py` - Create: `tests/catalogsync/test_ops_repository.py` - [ ] **Step 1: Write the failing schema and repository tests** ```python import sqlite3 import tempfile import unittest from contextlib import closing from pathlib import Path class DatabaseSchemaTests(unittest.TestCase): def test_initialize_database_upgrades_job_workers_with_throughput_columns(self): from musicdl.catalogsync.db import initialize_database with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" with closing(sqlite3.connect(db_path)) as conn: conn.execute( """ CREATE TABLE job_workers ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_run_id INTEGER, job_stage_id INTEGER, worker_name TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'idle', current_job_item_id INTEGER, current_song_id INTEGER, current_playlist_id INTEGER, current_display_text TEXT, heartbeat_at TEXT, last_progress_text TEXT, processed_count INTEGER NOT NULL DEFAULT 0, error_count INTEGER NOT NULL DEFAULT 0 ) """ ) conn.commit() initialize_database(db_path).close() with closing(sqlite3.connect(db_path)) as conn: columns = { row[1] for row in conn.execute("PRAGMA table_info(job_workers)").fetchall() } self.assertIn("downloaded_bytes", columns) self.assertIn("total_bytes", columns) self.assertIn("speed_bytes_per_sec", columns) self.assertIn("progress_percent", columns) ``` ```python import tempfile import unittest from pathlib import Path class OpsRepositoryTaskCenterTests(unittest.TestCase): def setUp(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus from musicdl.catalogsync.ops.repository import OpsRepository self.tmpdir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True) self.addCleanup(self.tmpdir.cleanup) self.db_path = Path(self.tmpdir.name) / "catalogsync.db" initialize_database(self.db_path).close() self.repo = OpsRepository(self.db_path) self.ItemStatus = ItemStatus self.JobStatus = JobStatus self.StageStatus = StageStatus def test_list_task_center_rows_computes_lane_queue_progress_and_speed(self): running_download_job = self.repo.create_job( job_type="download_only", config_snapshot={}, status=self.JobStatus.RUNNING, ) queued_download_job = self.repo.create_job( job_type="catalog_sync", config_snapshot={}, status=self.JobStatus.QUEUED, ) general_job = self.repo.create_job( job_type="sync_only", config_snapshot={}, status=self.JobStatus.RUNNING, playlist_scope={"playlist_ids": [11, 12]}, ) download_stage = self.repo.create_stage( job_run_id=running_download_job, stage_type="download", seq_no=1, status=self.StageStatus.RUNNING, ) self.repo.create_item( job_stage_id=download_stage, item_type="song_download", item_key="song:done", song_id=201, status=self.ItemStatus.SUCCEEDED, ) running_item = self.repo.create_item( job_stage_id=download_stage, item_type="song_download", item_key="song:running", song_id=202, status=self.ItemStatus.PENDING, payload={"row": {"id": 202, "name": "Song 202"}}, ) self.repo.claim_item(item_id=running_item, worker_name="download-1") self.repo.update_worker_state( "download-1", status="running", current_job_item_id=running_item, current_song_id=202, current_display_text="Song 202", downloaded_bytes=10 * 1024 * 1024, total_bytes=20 * 1024 * 1024, speed_bytes_per_sec=3 * 1024 * 1024, progress_percent=50.0, ) rows = self.repo.list_task_center_rows(limit=10) by_id = {int(row["id"]): row for row in rows} self.assertEqual("download", by_id[running_download_job]["lane_type"]) self.assertEqual("queued #1", by_id[queued_download_job]["queue_label"]) self.assertEqual("general", by_id[general_job]["lane_type"]) self.assertEqual("2 playlists", by_id[general_job]["scope_summary"]) self.assertEqual(1, by_id[running_download_job]["active_worker_count"]) self.assertEqual(50, by_id[running_download_job]["primary_progress_percent"]) self.assertEqual(3 * 1024 * 1024, by_id[running_download_job]["speed_bytes_per_sec"]) self.assertEqual("3.0 MB/s", by_id[running_download_job]["speed_text"]) self.assertIn("1 / 2 songs", by_id[running_download_job]["primary_progress_text"]) ``` - [ ] **Step 2: Run the targeted tests and verify they fail** Run: `python -m unittest tests.catalogsync.test_db.DatabaseSchemaTests.test_initialize_database_upgrades_job_workers_with_throughput_columns -v` Expected: - `FAIL` - the failure shows one or more throughput columns missing from `job_workers` Run: `python -m unittest tests.catalogsync.test_ops_repository.OpsRepositoryTaskCenterTests.test_list_task_center_rows_computes_lane_queue_progress_and_speed -v` Expected: - `ERROR` - `OpsRepository` does not yet expose `list_task_center_rows` - [ ] **Step 3: Implement shared job rules, schema upgrade, and task-center summaries** ```python # musicdl/catalogsync/ops/jobdefs.py from __future__ import annotations DOWNLOAD_LANE = "download" GENERAL_LANE = "general" JOB_STAGE_SEQUENCES = { "catalog_sync": ["collect", "sync", "download"], "collect_only": ["collect"], "sync_only": ["sync"], "sync_download": ["sync", "download"], "download_only": ["download"], "upload_only": ["upload"], "download_upload": ["download", "upload"], } def job_has_stage(job_type: str, stage_type: str) -> bool: return stage_type in JOB_STAGE_SEQUENCES.get(str(job_type), []) def job_lane_type(job_type: str) -> str: return DOWNLOAD_LANE if job_has_stage(job_type, "download") else GENERAL_LANE def primary_stage_type(job_type: str) -> str | None: for stage_type in ("download", "upload", "sync", "collect"): if job_has_stage(job_type, stage_type): return stage_type return None def display_name(job_type: str, playlist_scope: dict[str, object] | None = None) -> str: playlist_ids = (playlist_scope or {}).get("playlist_ids") is_scoped = isinstance(playlist_ids, list) and len(playlist_ids) > 0 mapping = { "catalog_sync": "Full Pipeline", "collect_only": "Collect", "sync_only": "Sync Selected Playlists" if is_scoped else "Sync", "sync_download": "Sync Then Download" if is_scoped else "Sync Then Download All", "download_only": "Download Selected Playlists" if is_scoped else "Download", "upload_only": "Upload", "download_upload": "Download Then Upload", } return mapping.get(str(job_type), str(job_type)) ``` ```python # musicdl/catalogsync/db.py JOB_WORKER_COLUMN_MIGRATIONS = { "downloaded_bytes": "INTEGER", "total_bytes": "INTEGER", "speed_bytes_per_sec": "REAL", "progress_percent": "REAL", } def _ensure_table_columns( conn: sqlite3.Connection, *, table_name: str, required_columns: dict[str, str], ) -> None: existing = { str(row["name"]) for row in conn.execute(f"PRAGMA table_info({table_name})").fetchall() } for column_name, column_type in required_columns.items(): if column_name in existing: continue conn.execute( f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_type}" ) def initialize_database( db_path: str | Path, default_library_root: str | Path | None = None, ) -> sqlite3.Connection: conn = connect_database(db_path) for statement in SCHEMA_STATEMENTS: conn.execute(statement) _ensure_table_columns( conn, table_name="job_workers", required_columns=JOB_WORKER_COLUMN_MIGRATIONS, ) if default_library_root is not None: Path(default_library_root).mkdir(parents=True, exist_ok=True) ensure_default_local_backend(conn, default_library_root) conn.commit() return conn ``` ```python # musicdl/catalogsync/ops/repository.py from .jobdefs import GENERAL_LANE, display_name, job_lane_type, primary_stage_type def _format_speed_text(speed_bytes_per_sec: float | int | None) -> str: if not speed_bytes_per_sec: return "-" return f"{float(speed_bytes_per_sec) / 1024 / 1024:.1f} MB/s" class OpsRepository: def list_active_jobs(self) -> list[JobRun]: rows = self._fetchall( """ SELECT * FROM job_runs WHERE status IN (?, ?) ORDER BY COALESCE(started_at, created_at) ASC, id ASC """, (JobStatus.RUNNING.value, JobStatus.PAUSE_REQUESTED.value), ) return [self._row_to_job(row) for row in rows] ``` ```python # musicdl/catalogsync/ops/repository.py class OpsRepository: def list_queued_jobs(self, limit: int = 100) -> list[JobRun]: rows = self._fetchall( """ SELECT * FROM job_runs WHERE status = ? ORDER BY priority ASC, created_at ASC, id ASC LIMIT ? """, (JobStatus.QUEUED.value, int(limit)), ) return [self._row_to_job(row) for row in rows] def list_task_center_rows(self, limit: int = 50) -> list[dict[str, Any]]: jobs = [ self._row_to_job(row) for row in self._fetchall( "SELECT * FROM job_runs ORDER BY id DESC LIMIT ?", (int(limit),), ) ] queued_download_ids = [ job.id for job in reversed(jobs) if job.status == JobStatus.QUEUED and job_lane_type(job.job_type) != GENERAL_LANE ] rows: list[dict[str, Any]] = [] for job in jobs: stages = self.list_job_stages(job.id) stage_by_type = {stage.stage_type: stage for stage in stages} target_stage = stage_by_type.get(primary_stage_type(job.job_type) or "") completed = int(target_stage.success_items) if target_stage else 0 total = int(target_stage.total_items) if target_stage else 0 workers = self.list_job_workers(job.id, active_only=True, limit=100) speed = float( sum(float(worker.get("speed_bytes_per_sec") or 0) for worker in workers) ) lane_type = job_lane_type(job.job_type) queue_label = lane_type if lane_type == "download" and job.status == JobStatus.QUEUED: queue_label = f"queued #{queued_download_ids.index(job.id) + 1}" elif job.status == JobStatus.RUNNING: queue_label = "running" playlist_ids = job.playlist_scope.get("playlist_ids") if isinstance(playlist_ids, list) and playlist_ids: scope_summary = f"{len(playlist_ids)} playlists" elif job.sources: scope_summary = f"{len(job.sources)} sources" else: scope_summary = "All sources" progress_text = f"{completed} / {total} songs" if speed > 0: progress_text = f"{progress_text} | {_format_speed_text(speed)}" rows.append( { "id": int(job.id), "job_type": str(job.job_type), "display_name": display_name(job.job_type, job.playlist_scope), "status": job.status.value, "lane_type": lane_type, "queue_label": queue_label, "scope_summary": scope_summary, "primary_progress_text": progress_text, "primary_progress_percent": _progress_percent(completed, total), "active_worker_count": len(workers), "speed_bytes_per_sec": speed, "speed_text": _format_speed_text(speed), "can_pause": job.status in {JobStatus.QUEUED, JobStatus.RUNNING}, "can_resume": job.status in {JobStatus.PAUSED, JobStatus.PAUSE_REQUESTED}, "can_cancel": job.status in { JobStatus.QUEUED, JobStatus.RUNNING, JobStatus.PAUSED, JobStatus.PAUSE_REQUESTED, }, "detail_url": f"/api/jobs/{job.id}", } ) return rows def update_worker_state(self, worker_name: str, **state: Any) -> None: normalized_worker = str(worker_name).strip() or "worker" with self._connection() as conn: row = conn.execute( """ SELECT id FROM job_workers WHERE worker_name = ? ORDER BY id DESC LIMIT 1 """, (normalized_worker,), ).fetchone() if row is None: worker_id = int( conn.execute( """ INSERT INTO job_workers (worker_name, status, heartbeat_at) VALUES (?, ?, CURRENT_TIMESTAMP) """, (normalized_worker, str(state.get("status") or "running")), ).lastrowid ) else: worker_id = int(row["id"]) updates = ["heartbeat_at = CURRENT_TIMESTAMP"] params: list[Any] = [] for field_name in ( "status", "current_job_item_id", "current_song_id", "current_playlist_id", "current_display_text", "last_progress_text", "downloaded_bytes", "total_bytes", "speed_bytes_per_sec", "progress_percent", ): if field_name in state: updates.append(f"{field_name} = ?") params.append(state[field_name]) processed_increment = int(state.get("processed_increment") or 0) error_increment = int(state.get("error_increment") or 0) if processed_increment: updates.append("processed_count = processed_count + ?") params.append(processed_increment) if error_increment: updates.append("error_count = error_count + ?") params.append(error_increment) params.append(worker_id) conn.execute( f"UPDATE job_workers SET {', '.join(updates)} WHERE id = ?", tuple(params), ) ``` - [ ] **Step 4: Run the targeted tests and verify they pass** Run: `python -m unittest tests.catalogsync.test_db.DatabaseSchemaTests.test_initialize_database_upgrades_job_workers_with_throughput_columns -v` Expected: - `OK` - rerunning `initialize_database()` upgrades an existing `job_workers` table in place Run: `python -m unittest tests.catalogsync.test_ops_repository.OpsRepositoryTaskCenterTests.test_list_task_center_rows_computes_lane_queue_progress_and_speed -v` Expected: - `OK` - task-center rows include `lane_type`, `queue_label`, `scope_summary`, `primary_progress_percent`, and `speed_text` - [ ] **Step 5: Commit** ```bash git add musicdl/catalogsync/ops/jobdefs.py musicdl/catalogsync/db.py musicdl/catalogsync/ops/repository.py tests/catalogsync/test_db.py tests/catalogsync/test_ops_repository.py git commit -m "feat: add task center summary projections" ``` ## Task 2: Refactor The Runner Into Download And General Lanes **Files:** - Modify: `musicdl/catalogsync/ops/runner.py` - Modify: `musicdl/catalogsync/ops/repository.py` - Modify: `tests/catalogsync/test_ops_runner.py` - [ ] **Step 1: Write the failing scheduler tests** ```python import tempfile import threading import time import unittest from pathlib import Path from unittest.mock import patch class OpsRunnerLaneTests(unittest.TestCase): def _seed_stage_item(self, repo, *, job_id: int, stage_type: str, item_key: str) -> int: stage_id = repo.create_stage(job_run_id=job_id, stage_type=stage_type, seq_no=1) return repo.create_item( job_stage_id=stage_id, item_type="song_download" if stage_type == "download" else "playlist_sync", item_key=item_key, song_id=101 if stage_type == "download" else None, playlist_id=201 if stage_type == "sync" else None, payload={"row": {"id": 101, "name": "Task Item"}}, ) def test_download_lane_runs_only_one_job_at_a_time(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.ops.models import JobStatus from musicdl.catalogsync.ops.repository import OpsRepository from musicdl.catalogsync.ops.runner import OpsRunner with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" initialize_database(db_path).close() repo = OpsRepository(db_path) runner = OpsRunner(repository=repo, sleep_seconds=0.01) job_a = repo.create_job(job_type="download_only", config_snapshot={}) job_b = repo.create_job(job_type="catalog_sync", config_snapshot={}) self._seed_stage_item(repo, job_id=job_a, stage_type="download", item_key="song:a") self._seed_stage_item(repo, job_id=job_b, stage_type="download", item_key="song:b") gate = threading.Event() def slow_download(executor, item_id, worker_name, *, already_claimed=False): gate.wait(0.2) executor.ops_repo.mark_item_succeeded(item_id=item_id) with patch( "musicdl.catalogsync.ops.executors.DownloadStageExecutor.process_item", new=slow_download, ): runner.loop_once() time.sleep(0.05) runner.loop_once() status_a = repo.get_job(job_a).status status_b = repo.get_job(job_b).status gate.set() time.sleep(0.1) self.assertEqual(JobStatus.RUNNING, status_a) self.assertEqual(JobStatus.QUEUED, status_b) ``` ```python class OpsRunnerLaneTests(unittest.TestCase): def test_general_lane_job_can_run_while_download_lane_job_is_active(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.ops.models import JobStatus from musicdl.catalogsync.ops.repository import OpsRepository from musicdl.catalogsync.ops.runner import OpsRunner with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" initialize_database(db_path).close() repo = OpsRepository(db_path) runner = OpsRunner(repository=repo, sleep_seconds=0.01) download_job = repo.create_job(job_type="download_only", config_snapshot={}) sync_job = repo.create_job(job_type="sync_only", config_snapshot={}) self._seed_stage_item(repo, job_id=download_job, stage_type="download", item_key="song:d") self._seed_stage_item(repo, job_id=sync_job, stage_type="sync", item_key="playlist:s") download_gate = threading.Event() sync_seen = threading.Event() def slow_download(executor, item_id, worker_name, *, already_claimed=False): download_gate.wait(0.2) executor.ops_repo.mark_item_succeeded(item_id=item_id) def fast_sync(executor, item_id, worker_name, *, already_claimed=False): sync_seen.set() executor.ops_repo.mark_item_succeeded(item_id=item_id) with patch( "musicdl.catalogsync.ops.executors.DownloadStageExecutor.process_item", new=slow_download, ): with patch( "musicdl.catalogsync.ops.executors.SyncStageExecutor.process_item", new=fast_sync, ): runner.loop_once() time.sleep(0.05) runner.loop_once() sync_seen.wait(0.2) download_status = repo.get_job(download_job).status sync_status = repo.get_job(sync_job).status download_gate.set() time.sleep(0.1) self.assertEqual(JobStatus.RUNNING, download_status) self.assertIn(sync_status, {JobStatus.RUNNING, JobStatus.COMPLETED}) def test_catalog_sync_is_classified_into_download_lane(self): from musicdl.catalogsync.ops.jobdefs import DOWNLOAD_LANE, GENERAL_LANE, job_lane_type self.assertEqual(DOWNLOAD_LANE, job_lane_type("catalog_sync")) self.assertEqual(DOWNLOAD_LANE, job_lane_type("download_upload")) self.assertEqual(GENERAL_LANE, job_lane_type("sync_only")) ``` - [ ] **Step 2: Run the targeted tests and verify they fail** Run: `python -m unittest tests.catalogsync.test_ops_runner.OpsRunnerLaneTests -v` Expected: - `FAIL` - the second download job is incorrectly started, or the sync job cannot overlap with the active download job - [ ] **Step 3: Implement the lane-aware supervisor without rewriting stage executors** ```python # musicdl/catalogsync/ops/repository.py class OpsRepository: def claim_job_if_queued(self, job_id: int) -> JobRun | None: with self._connection() as conn: cursor = conn.execute( """ UPDATE job_runs SET status = ?, started_at = COALESCE(started_at, CURRENT_TIMESTAMP), ended_at = NULL WHERE id = ? AND status = ? """, (JobStatus.RUNNING.value, int(job_id), JobStatus.QUEUED.value), ) if cursor.rowcount != 1: return None row = conn.execute("SELECT * FROM job_runs WHERE id = ?", (int(job_id),)).fetchone() return self._row_to_job(row) if row is not None else None ``` ```python # musicdl/catalogsync/ops/runner.py from concurrent.futures import Future, ThreadPoolExecutor from collections import Counter import threading from .jobdefs import DOWNLOAD_LANE, GENERAL_LANE, JOB_STAGE_SEQUENCES, job_lane_type class OpsRunner: def __init__( self, repository: OpsRepository, sleep_seconds: float = 1.0, *, download_lane_concurrency: int = 1, general_lane_concurrency: int = 3, ): self.repository = repository self.sleep_seconds = max(float(sleep_seconds), 0.1) self.download_lane_concurrency = max(int(download_lane_concurrency), 1) self.general_lane_concurrency = max(int(general_lane_concurrency), 1) self._job_pool = ThreadPoolExecutor( max_workers=self.download_lane_concurrency + self.general_lane_concurrency ) self._futures: dict[int, Future[None]] = {} self._futures_lock = threading.Lock() self.db_path = Path(self.repository.db_path) self.catalog_repo = CatalogRepository(self.db_path) def loop_once(self) -> bool: had_commands = bool(self.repository.list_pending_commands()) self.apply_pending_commands() finished = self._reap_finished_jobs() started = self._start_eligible_jobs() return bool(had_commands or finished or started) def _reap_finished_jobs(self) -> int: finished_count = 0 with self._futures_lock: for job_id, future in list(self._futures.items()): if not future.done(): continue future.result() del self._futures[job_id] finished_count += 1 return finished_count def _start_eligible_jobs(self) -> int: started_count = 0 active_jobs = self.repository.list_active_jobs() lane_counts = Counter(job_lane_type(job.job_type) for job in active_jobs) for queued_job in self.repository.list_queued_jobs(limit=100): lane_type = job_lane_type(queued_job.job_type) lane_limit = ( self.download_lane_concurrency if lane_type == DOWNLOAD_LANE else self.general_lane_concurrency ) if lane_counts[lane_type] >= lane_limit: continue claimed = self.repository.claim_job_if_queued(queued_job.id) if claimed is None: continue with self._futures_lock: self._futures[claimed.id] = self._job_pool.submit(self._run_job, claimed.id) lane_counts[lane_type] += 1 started_count += 1 return started_count def _run_job(self, job_id: int) -> None: current_job = self.repository.get_job(job_id) if current_job is None: return self._ensure_job_stages(current_job) while True: current_job = self.repository.get_job(job_id) if current_job is None: return if current_job.status == JobStatus.CANCELED: self.repository.finalize_canceled_job(job_id) return if current_job.status == JobStatus.PAUSE_REQUESTED: self.reconcile_pause_state(job_id) return stage = self._next_runnable_stage(job_id) if stage is None: self._finalize_job(job_id) return self._run_stage(current_job, stage) self.apply_pending_commands() ``` - [ ] **Step 4: Run the targeted tests and verify they pass** Run: `python -m unittest tests.catalogsync.test_ops_runner.OpsRunnerLaneTests -v` Expected: - `OK` - only one download-class job runs at once - a `sync_only` job can overlap with an active download job - [ ] **Step 5: Commit** ```bash git add musicdl/catalogsync/ops/repository.py musicdl/catalogsync/ops/runner.py tests/catalogsync/test_ops_runner.py git commit -m "feat: add lane aware ops runner" ``` ## Task 3: Extend The API For Task-Center Payloads And Playlist Bulk Sync **Files:** - Modify: `musicdl/catalogsync/ops/web.py` - Modify: `tests/catalogsync/test_ops_api.py` - [ ] **Step 1: Write the failing API tests** ```python def test_api_playlists_sync_creates_sync_only_job_with_scope(self): client, db_path, _ = self._build_client() playlist_a = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="sync-api-a", name="Sync API A", ) playlist_b = self._seed_playlist( db_path, platform="netease", pool_kind="playlist_square", remote_id="sync-api-b", name="Sync API B", ) response = client.post( "/api/playlists/sync", json={"playlist_ids": [playlist_b, playlist_a]}, ) self.assertEqual(201, response.status_code) payload = response.json()["job"] self.assertEqual("sync_only", payload["job_type"]) self.assertEqual([playlist_b, playlist_a], payload["playlist_scope"]["playlist_ids"]) ``` ```python def test_api_dashboard_returns_task_center_rows_with_lane_fields(self): from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus from musicdl.catalogsync.ops.repository import OpsRepository client, db_path, _ = self._build_client() repo = OpsRepository(db_path) running_job = repo.create_job( job_type="download_only", config_snapshot={}, status=JobStatus.RUNNING, ) queued_job = repo.create_job( job_type="catalog_sync", config_snapshot={}, status=JobStatus.QUEUED, ) stage_id = repo.create_stage( job_run_id=running_job, stage_type="download", seq_no=1, status=StageStatus.RUNNING, ) repo.create_item( job_stage_id=stage_id, item_type="song_download", item_key="song:done", song_id=1, status=ItemStatus.SUCCEEDED, ) running_item = repo.create_item( job_stage_id=stage_id, item_type="song_download", item_key="song:running", song_id=2, status=ItemStatus.PENDING, payload={"row": {"id": 2, "name": "Song 2"}}, ) repo.claim_item(item_id=running_item, worker_name="download-1") repo.update_worker_state( "download-1", status="running", current_job_item_id=running_item, current_song_id=2, current_display_text="Song 2", downloaded_bytes=5 * 1024 * 1024, total_bytes=10 * 1024 * 1024, speed_bytes_per_sec=2 * 1024 * 1024, progress_percent=50.0, ) response = client.get("/api/dashboard") self.assertEqual(200, response.status_code) payload = response.json() self.assertIn("task_rows", payload) by_id = {int(row["id"]): row for row in payload["task_rows"]} self.assertEqual("download", by_id[running_job]["lane_type"]) self.assertEqual("queued #1", by_id[queued_job]["queue_label"]) self.assertEqual("2.0 MB/s", by_id[running_job]["speed_text"]) self.assertIn("queued_download_jobs", payload["summary"]) ``` - [ ] **Step 2: Run the targeted API tests and verify they fail** Run: `python -m unittest tests.catalogsync.test_ops_api.OperationsApiTests.test_api_playlists_sync_creates_sync_only_job_with_scope -v` Expected: - `404` - `/api/playlists/sync` does not exist yet Run: `python -m unittest tests.catalogsync.test_ops_api.OperationsApiTests.test_api_dashboard_returns_task_center_rows_with_lane_fields -v` Expected: - `FAIL` - `/api/dashboard` does not return `task_rows`, lane fields, or queued download counts - [ ] **Step 3: Implement playlist bulk sync and task-center payloads** ```python # musicdl/catalogsync/ops/web.py from .jobdefs import job_lane_type def _dashboard_summary(repo: OpsRepository) -> dict[str, int]: rows = repo._fetchall( """ SELECT status, COUNT(*) AS count_value FROM job_runs GROUP BY status """ ) counts = {str(row["status"]): int(row["count_value"]) for row in rows} queued_download_jobs = sum( 1 for job in repo.list_queued_jobs(limit=1000) if job_lane_type(job.job_type) == "download" ) return { "total_jobs": int(sum(counts.values())), "queued_jobs": int(counts.get("queued", 0)), "queued_download_jobs": int(queued_download_jobs), "running_jobs": int(counts.get("running", 0)), "paused_jobs": int(counts.get("paused", 0) + counts.get("pause_requested", 0)), "failed_jobs": int(counts.get("failed", 0) + counts.get("completed_with_errors", 0)), } def _dashboard_payload(repo: OpsRepository) -> dict[str, Any]: payload = { "summary": _dashboard_summary(repo), "download_stats": _download_stats(repo), "playlist_sources": _playlist_source_stats(repo), "task_rows": repo.list_task_center_rows(limit=50), "workers": _dashboard_workers(repo), "running_items": _dashboard_running_items(repo), } recent_events = repo._fetchall( """ SELECT id, job_run_id, event_type, message, created_at FROM job_events ORDER BY id DESC LIMIT 5 """ ) payload["recent_events"] = [dict(row) for row in recent_events] return payload def _build_playlist_job( repo: OpsRepository, env_manager: CatalogsyncEnvManager, *, job_type: Literal["download_only", "sync_download", "sync_only"], payload: PlaylistBulkRequest, ) -> dict[str, Any]: playlist_ids = _normalize_playlist_ids(payload.playlist_ids) if not playlist_ids: raise HTTPException(status_code=422, detail="playlist_ids is required") snapshot = env_manager.build_job_snapshot() job_id = repo.create_job( job_type=job_type, requested_by=payload.requested_by, config_snapshot=dict(snapshot), sources=_normalize_allowed_list(snapshot.get("SOURCES"), ALLOWED_COLLECT_SOURCES), download_sources=_normalize_allowed_list( snapshot.get("DOWNLOAD_SOURCES"), ALLOWED_DOWNLOAD_SOURCES, ), playlist_scope={"playlist_ids": playlist_ids}, ) _ensure_job_stages(repo, job_id, job_type) job = repo.get_job(job_id) if job is None: raise HTTPException(status_code=500, detail="job create failed") return {"job": _serialize_job(job)} @app.post("/api/playlists/sync", status_code=201) def api_sync_selected_playlists(payload: PlaylistBulkRequest): return _build_playlist_job(repo, env_manager, job_type="sync_only", payload=payload) ``` - [ ] **Step 4: Run the targeted API tests and verify they pass** Run: `python -m unittest tests.catalogsync.test_ops_api.OperationsApiTests.test_api_playlists_sync_creates_sync_only_job_with_scope -v` Expected: - `OK` - the created job is `sync_only` and keeps the selected `playlist_ids` ordering Run: `python -m unittest tests.catalogsync.test_ops_api.OperationsApiTests.test_api_dashboard_returns_task_center_rows_with_lane_fields -v` Expected: - `OK` - `/api/dashboard` returns `task_rows`, `lane_type`, `queue_label`, and `queued_download_jobs` - [ ] **Step 5: Commit** ```bash git add musicdl/catalogsync/ops/web.py tests/catalogsync/test_ops_api.py git commit -m "feat: add task center dashboard api" ``` ## Task 4: Rebuild The Dashboard And Playlist UX Around The Task Center **Files:** - Modify: `musicdl/catalogsync/templates/ops/dashboard.html` - Modify: `musicdl/catalogsync/templates/ops/playlists.html` - Modify: `musicdl/catalogsync/templates/ops/jobs.html` - Modify: `musicdl/catalogsync/templates/ops/job_detail.html` - Modify: `musicdl/catalogsync/static/ops/app.js` - Modify: `tests/catalogsync/test_ops_api.py` - [ ] **Step 1: Write the failing page-render tests** ```python def test_dashboard_page_renders_task_center_table_and_inline_detail_shell(self): from musicdl.catalogsync.ops.models import JobStatus from musicdl.catalogsync.ops.repository import OpsRepository client, db_path, _ = self._build_client() repo = OpsRepository(db_path) job_id = repo.create_job( job_type="download_only", config_snapshot={}, status=JobStatus.RUNNING, ) response = client.get("/dashboard") self.assertEqual(200, response.status_code) html = response.text self.assertIn("data-task-center-table", html) self.assertIn(f'data-task-row="{job_id}"', html) self.assertIn(f'data-task-row-expand="{job_id}"', html) self.assertIn(f'data-task-row-detail="{job_id}"', html) self.assertIn('data-task-command-toggle', html) self.assertIn('data-task-command-cancel', html) ``` ```python def test_playlists_page_renders_sync_selected_playlists_action(self): client, db_path, _ = self._build_client() self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="playlist-render-sync", name="Playlist Render Sync", ) response = client.get("/playlists") self.assertEqual(200, response.status_code) html = response.text self.assertIn('data-playlists-page', html) self.assertIn('data-playlist-action="sync"', html) self.assertIn('data-playlist-select-all', html) ``` - [ ] **Step 2: Run the targeted page-render tests and verify they fail** Run: `python -m unittest tests.catalogsync.test_ops_api.OperationsApiTests.test_dashboard_page_renders_task_center_table_and_inline_detail_shell -v` Expected: - `FAIL` - the current dashboard still renders separate `Active Job` and `Recent Jobs` sections instead of a single task table Run: `python -m unittest tests.catalogsync.test_ops_api.OperationsApiTests.test_playlists_page_renders_sync_selected_playlists_action -v` Expected: - `FAIL` - the playlist page does not yet expose the `sync` bulk action - [ ] **Step 3: Replace the dashboard shell, add inline expansion, and add playlist bulk sync UI** ```html

Summary

Total Jobs{{ summary.total_jobs }}
Running{{ summary.running_jobs }}
Queued Download Jobs{{ summary.queued_download_jobs }}
Paused{{ summary.paused_jobs }}
Failed / Errors{{ summary.failed_jobs }}
Running Songs{{ download_stats.running_song_items }}

Quick Actions

``` ```html

Task Center

{% for row in task_rows %} {% endfor %}
ID Task Status Scope Primary Progress Active Workers Lane Actions
{{ row.id }} {{ row.display_name }} {{ row.status }} {{ row.scope_summary }} {{ row.primary_progress_text }} {{ row.active_worker_count }} {{ row.queue_label }}
``` ```html
{% if playlist.song_count == 0 %} 0 songs, sync recommended {% elif playlist.running_download_song_count %} Running {{ playlist.running_download_song_count }} {% else %} Idle {% endif %}
``` ```html

Jobs Archive

Use Dashboard for the main task center. This page stays available for fallback browsing.

``` ```html

Back to Dashboard

Job {{ job.id }}

``` ```javascript // musicdl/catalogsync/static/ops/app.js function postJson(path, payload) { return window.fetch(path, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify(payload), }).then(function (response) { return response.json().then(function (data) { if (!response.ok) { throw new Error(data.detail || "request failed"); } return data; }); }); } function commandTypeForRow(row) { if (row.can_resume) { return "resume"; } if (row.can_pause) { return "pause"; } return null; } function renderTaskCenterRows(rows) { var bodyNode = document.querySelector("[data-task-center-body]"); if (!bodyNode) { return; } bodyNode.innerHTML = (rows || []) .map(function (row) { return [ '', '', "" + escapeHtml(row.id) + "", "" + escapeHtml(row.display_name || row.job_type) + "", "" + escapeHtml(row.status || "-") + "", "" + escapeHtml(row.scope_summary || "-") + "", "" + escapeHtml(row.primary_progress_text || "-") + "", "" + escapeHtml(row.active_worker_count || 0) + "", "" + escapeHtml(row.queue_label || row.lane_type || "-") + "", '', "", '
Loading...
', ].join(""); }) .join(""); } function renderTaskDetail(payload) { var playlistRows = (payload.playlist_progress || []).map(function (row) { return ( "" + escapeHtml(row.playlist_name) + "" + escapeHtml((row.downloaded_songs || 0) + " / " + (row.total_songs || 0)) + "" + escapeHtml((row.progress_percent || 0) + "%") + "" ); }); var workerRows = (payload.workers || []).map(function (worker) { return ( "" + escapeHtml(worker.worker_name || "-") + "" + escapeHtml(worker.display_text || "-") + "" + escapeHtml(worker.last_progress_text || "-") + "" ); }); return [ "
", "

Stages

" + escapeHtml(JSON.stringify(payload.stages || [], null, 2)) + "
", "

Workers

" + workerRows.join("") + "
", "

Playlist Progress

" + playlistRows.join("") + "
", "
", ].join(""); } ``` ```javascript // musicdl/catalogsync/static/ops/app.js function bindTaskCenter(payload) { document.querySelectorAll("[data-task-row-expand]").forEach(function (button) { button.onclick = function () { var jobId = button.getAttribute("data-task-row-expand"); var detailRow = document.querySelector('[data-task-row-detail="' + jobId + '"]'); var detailBody = document.querySelector('[data-task-row-detail-body="' + jobId + '"]'); var shouldOpen = detailRow.hasAttribute("hidden"); if (!shouldOpen) { detailRow.setAttribute("hidden", "hidden"); return; } detailRow.removeAttribute("hidden"); window.fetch("/api/jobs/" + jobId) .then(function (response) { return response.json(); }) .then(function (data) { detailBody.innerHTML = renderTaskDetail(data); }); }; }); document.querySelectorAll("[data-task-command-toggle]").forEach(function (button) { button.onclick = function () { var jobId = button.getAttribute("data-task-command-toggle"); var row = (payload.task_rows || []).find(function (item) { return String(item.id) === String(jobId); }); var commandType = row ? commandTypeForRow(row) : null; if (!commandType) { return; } postJson("/api/jobs/" + jobId + "/commands", { command_type: commandType }) .then(function () { window.location.reload(); }) .catch(function (error) { showMessage(error.message || "request failed", true); }); }; }); document.querySelectorAll("[data-task-command-cancel]").forEach(function (button) { button.onclick = function () { var jobId = button.getAttribute("data-task-command-cancel"); postJson("/api/jobs/" + jobId + "/commands", { command_type: "cancel" }) .then(function () { window.location.reload(); }) .catch(function (error) { showMessage(error.message || "request failed", true); }); }; }); } function updateDashboard(payload) { if (!payload) { return; } var summary = payload.summary || {}; Object.keys(summary).forEach(function (key) { var node = document.querySelector('[data-summary-field="' + key + '"]'); if (node) { node.textContent = String(summary[key]); } }); var downloadStats = payload.download_stats || {}; Object.keys(downloadStats).forEach(function (key) { var node = document.querySelector('[data-download-field="' + key + '"]'); if (node) { node.textContent = String(downloadStats[key]); } }); renderTaskCenterRows(payload.task_rows || []); bindTaskCenter(payload); } function bindPlaylistPage() { var root = document.querySelector("[data-playlists-page]"); if (!root) { return; } var endpointMap = { sync: "/api/playlists/sync", download: "/api/playlists/download", "sync-download": "/api/playlists/sync-download", "mark-wanted": "/api/playlists/mark-wanted", "unmark-wanted": "/api/playlists/unmark-wanted" }; var checkboxNodes = Array.prototype.slice.call( root.querySelectorAll("[data-playlist-checkbox]") ); var selectionCountNode = root.querySelector("[data-playlist-selection-count]"); function selectedPlaylistIds() { return checkboxNodes .filter(function (node) { return Boolean(node.checked); }) .map(function (node) { return Number(node.value); }) .filter(function (value) { return Number.isInteger(value) && value > 0; }); } function updateSelectionCount() { if (selectionCountNode) { selectionCountNode.textContent = String(selectedPlaylistIds().length); } } checkboxNodes.forEach(function (node) { node.addEventListener("change", updateSelectionCount); }); root.querySelectorAll("[data-playlist-action]").forEach(function (button) { button.addEventListener("click", function () { var action = button.getAttribute("data-playlist-action"); var endpoint = endpointMap[action || ""]; var playlistIds = selectedPlaylistIds(); if (!endpoint || !playlistIds.length) { return; } postJson(endpoint, { playlist_ids: playlistIds }) .then(function (data) { if (data.job && data.job.id) { window.location.href = "/jobs/" + data.job.id; return; } window.location.reload(); }) .catch(function (error) { showMessage(error.message || "request failed", true); }); }); }); updateSelectionCount(); } ``` - [ ] **Step 4: Run the targeted page-render tests and verify they pass** Run: `python -m unittest tests.catalogsync.test_ops_api.OperationsApiTests.test_dashboard_page_renders_task_center_table_and_inline_detail_shell -v` Expected: - `OK` - `/dashboard` renders a single task-center table and inline detail placeholders Run: `python -m unittest tests.catalogsync.test_ops_api.OperationsApiTests.test_playlists_page_renders_sync_selected_playlists_action -v` Expected: - `OK` - `/playlists` renders the new `sync` bulk action and keeps the selection toolbar - [ ] **Step 5: Commit** ```bash git add musicdl/catalogsync/templates/ops/dashboard.html musicdl/catalogsync/templates/ops/playlists.html musicdl/catalogsync/templates/ops/jobs.html musicdl/catalogsync/templates/ops/job_detail.html musicdl/catalogsync/static/ops/app.js tests/catalogsync/test_ops_api.py git commit -m "feat: build task center dashboard ui" ``` ## Task 5: Add Structured Download Throughput Reporting **Files:** - Modify: `musicdl/catalogsync/downloader.py` - Modify: `musicdl/catalogsync/ops/executors.py` - Modify: `musicdl/catalogsync/ops/repository.py` - Modify: `musicdl/catalogsync/ops/web.py` - Modify: `tests/catalogsync/test_ops_repository.py` - Modify: `tests/catalogsync/test_ops_api.py` - [ ] **Step 1: Write the failing speed tests** ```python def test_list_task_center_rows_uses_live_worker_speed_for_download_jobs(self): running_job = self.repo.create_job( job_type="download_only", config_snapshot={}, status=self.JobStatus.RUNNING, ) stage_id = self.repo.create_stage( job_run_id=running_job, stage_type="download", seq_no=1, status=self.StageStatus.RUNNING, ) item_id = self.repo.create_item( job_stage_id=stage_id, item_type="song_download", item_key="song:live-speed", song_id=999, status=self.ItemStatus.PENDING, payload={"row": {"id": 999, "name": "Speed Song"}}, ) self.repo.claim_item(item_id=item_id, worker_name="download-1") self.repo.update_worker_state( "download-1", status="running", current_job_item_id=item_id, current_song_id=999, current_display_text="Speed Song", downloaded_bytes=12 * 1024 * 1024, total_bytes=24 * 1024 * 1024, speed_bytes_per_sec=4 * 1024 * 1024, progress_percent=50.0, ) row = next( row for row in self.repo.list_task_center_rows(limit=20) if int(row["id"]) == running_job ) self.assertEqual("4.0 MB/s", row["speed_text"]) self.assertIn("4.0 MB/s", row["primary_progress_text"]) ``` ```python def test_api_dashboard_exposes_worker_speed_and_bytes_text(self): from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus from musicdl.catalogsync.ops.repository import OpsRepository client, db_path, _ = self._build_client() repo = OpsRepository(db_path) job_id = repo.create_job( job_type="download_only", config_snapshot={}, status=JobStatus.RUNNING, ) stage_id = repo.create_stage( job_run_id=job_id, stage_type="download", seq_no=1, status=StageStatus.RUNNING, ) item_id = repo.create_item( job_stage_id=stage_id, item_type="song_download", item_key="song:api-speed", song_id=301, status=ItemStatus.PENDING, payload={"row": {"id": 301, "name": "API Speed"}}, ) repo.claim_item(item_id=item_id, worker_name="download-1") repo.update_worker_state( "download-1", status="running", current_job_item_id=item_id, current_song_id=301, current_display_text="API Speed", downloaded_bytes=10 * 1024 * 1024, total_bytes=20 * 1024 * 1024, speed_bytes_per_sec=5 * 1024 * 1024, progress_percent=50.0, ) payload = client.get("/api/dashboard").json() row = next(row for row in payload["task_rows"] if int(row["id"]) == job_id) worker = next(worker for worker in payload["workers"] if worker["worker_name"] == "download-1") self.assertEqual(5 * 1024 * 1024, worker["speed_bytes_per_sec"]) self.assertEqual("5.0 MB/s", worker["speed_text"]) self.assertEqual("10.0 / 20.0 MB", worker["bytes_text"]) self.assertIn("5.0 MB/s", row["primary_progress_text"]) ``` - [ ] **Step 2: Run the targeted speed tests and verify they fail** Run: `python -m unittest tests.catalogsync.test_ops_repository.OpsRepositoryTaskCenterTests.test_list_task_center_rows_uses_live_worker_speed_for_download_jobs -v` Expected: - `FAIL` - `primary_progress_text` still contains only song counts and does not append the live rate Run: `python -m unittest tests.catalogsync.test_ops_api.OperationsApiTests.test_api_dashboard_exposes_worker_speed_and_bytes_text -v` Expected: - `FAIL` - serialized workers do not yet include `speed_text` or `bytes_text` - [ ] **Step 3: Monitor file growth during downloads and surface the metrics** ```python # musicdl/catalogsync/downloader.py import time def _progress_percent(downloaded_bytes: int, total_bytes: int | None) -> float | None: if not total_bytes: return None if total_bytes <= 0: return None return round((float(downloaded_bytes) * 100.0) / float(total_bytes), 2) class CatalogDownloader: def _monitor_save_path( self, *, save_path: Path, expected_bytes: int | None, progress_callback, stop_event: threading.Event, ) -> None: previous_bytes = 0 previous_at = time.monotonic() while not stop_event.wait(0.5): current_bytes = save_path.stat().st_size if save_path.exists() else 0 current_at = time.monotonic() delta_bytes = max(current_bytes - previous_bytes, 0) delta_seconds = max(current_at - previous_at, 0.001) if progress_callback is not None and current_bytes > 0: progress_callback( downloaded_bytes=current_bytes, total_bytes=expected_bytes, speed_bytes_per_sec=float(delta_bytes) / float(delta_seconds), progress_percent=_progress_percent(current_bytes, expected_bytes), ) previous_bytes = current_bytes previous_at = current_at ``` ```python # musicdl/catalogsync/downloader.py class CatalogDownloader: def _download_one( self, row: dict, default_root: Path, download_sources: list[str] | None = None, progress_callback=None, ) -> bool: metadata = json.loads(row["metadata_json"]) if row.get("metadata_json") else {} song_info = deserialize_song_info(metadata.get("snapshot")) if song_info is None: return False song_info = self.resolve_song_info_for_download( row=row, song_info=song_info, download_sources=download_sources, ) download_platform = self._detect_download_platform(song_info, row["platform"]) client = self.get_client(download_platform) target_root = self.ensure_space( default_root, getattr(song_info, "file_size_bytes", None) or row.get("file_size_bytes"), ) is_default_root = target_root.resolve() == default_root backend_id = self.repository.ensure_local_backend( target_root, name="default-local" if is_default_root else None, is_default=is_default_root, ) singers = self._normalize_singers(getattr(song_info, "singers", None)) or self._normalize_singers( row.get("singers") ) relative_dir = build_download_relative_dir(platform=download_platform, singers=singers) target_dir = target_root / relative_dir target_dir.mkdir(parents=True, exist_ok=True) song_info.work_dir = str(target_dir) if hasattr(song_info, "_save_path"): song_info._save_path = None save_path = Path(song_info.save_path) expected_bytes = int( getattr(song_info, "file_size_bytes", None) or row.get("file_size_bytes") or 0 ) or None monitor_stop = threading.Event() monitor_thread = threading.Thread( target=self._monitor_save_path, kwargs={ "save_path": save_path, "expected_bytes": expected_bytes, "progress_callback": progress_callback, "stop_event": monitor_stop, }, daemon=True, name=f"download-monitor-{row['id']}", ) monitor_thread.start() try: downloaded = client.download( [song_info], num_threadings=1, request_overrides=self._request_overrides((10, 60)), auto_supplement_song=False, ) except TypeError: downloaded = client.download( [song_info], num_threadings=1, auto_supplement_song=False, ) finally: monitor_stop.set() monitor_thread.join(timeout=1.0) if save_path.exists() and progress_callback is not None: final_size = save_path.stat().st_size progress_callback( downloaded_bytes=final_size, total_bytes=expected_bytes or final_size, speed_bytes_per_sec=None, progress_percent=_progress_percent(final_size, expected_bytes or final_size), ) if not downloaded: return False saved_song = downloaded[0] saved_path = Path(saved_song.save_path) relative_path = saved_path.relative_to(target_root).as_posix() actual_size = saved_path.stat().st_size if saved_path.exists() else row.get("file_size_bytes") actual_ext = saved_path.suffix.lstrip(".") or row.get("ext") self.repository.record_local_file( song_id=int(row["id"]), backend_id=backend_id, relative_path=relative_path, file_size_bytes=actual_size, ext=actual_ext, quality_label=self._detect_quality_label(song_info, actual_ext, fallback=row.get("quality_label")), ) return True def download_song_row( self, row, library_root: str | Path, download_sources: list[str] | None = None, worker_callback=None, ) -> bool: row_dict = dict(row) default_root = Path(library_root).resolve() self._current_library_root = default_root self.repository.ensure_local_backend(default_root, name="default-local", is_default=True) if worker_callback is not None: display_name = str(row_dict.get("name") or row_dict.get("id") or "") singers = str(row_dict.get("singers") or "").strip() display_text = f"{display_name} / {singers}".strip(" /") worker_callback( current_song_id=int(row_dict["id"]) if row_dict.get("id") is not None else None, current_playlist_id=row_dict.get("playlist_id"), current_display_text=display_text, ) return self._download_one( row=row_dict, default_root=default_root, download_sources=download_sources, progress_callback=worker_callback, ) ``` ```python # musicdl/catalogsync/ops/executors.py class DownloadStageExecutor: def process_item(self, item_id: int, worker_name: str, *, already_claimed: bool = False) -> None: if not already_claimed: self.ops_repo.claim_item(item_id=item_id, worker_name=worker_name) try: row = self.ops_repo.build_download_row(item_id=item_id) succeeded = self.downloader.download_song_row( row=row, library_root=self.library_root, download_sources=self.download_sources, worker_callback=lambda **state: self.ops_repo.update_worker_state( worker_name=worker_name, current_job_item_id=item_id, status="running", **state, ), ) if succeeded: _ensure_transition_applied( self.ops_repo.mark_item_succeeded(item_id=item_id), item_id=item_id, action="mark_item_succeeded", ) else: _ensure_transition_applied( self.ops_repo.mark_item_failed( item_id=item_id, error_message="download returned no file", ), item_id=item_id, action="mark_item_failed", ) ``` ```python # musicdl/catalogsync/ops/repository.py class OpsRepository: def mark_item_succeeded(self, item_id: int, result_payload: dict[str, Any] | None = None) -> bool: with self._connection() as conn: row = conn.execute( "SELECT job_stage_id, status, worker_id, payload_json FROM job_items WHERE id = ?", (item_id,), ).fetchone() if row is None: return False from_status = ItemStatus(str(row["status"])) if from_status is not ItemStatus.RUNNING: return False payload = _json_loads(row["payload_json"]) if result_payload: payload.update(result_payload) cursor = conn.execute( """ UPDATE job_items SET status = ?, payload_json = ?, worker_id = NULL, ended_at = CURRENT_TIMESTAMP, last_error = NULL, last_error_code = NULL WHERE id = ? AND status = ? """, ( ItemStatus.SUCCEEDED.value, _json_dumps(payload), item_id, from_status.value, ), ) if cursor.rowcount != 1: return False if worker_id is not None: conn.execute( """ UPDATE job_workers SET status = 'idle', current_job_item_id = NULL, current_song_id = NULL, current_playlist_id = NULL, current_display_text = NULL, downloaded_bytes = NULL, total_bytes = NULL, speed_bytes_per_sec = NULL, progress_percent = NULL, processed_count = processed_count + 1, heartbeat_at = CURRENT_TIMESTAMP WHERE id = ? """, (int(worker_id),), ) return True ``` ```python # musicdl/catalogsync/ops/web.py def _format_bytes_text(downloaded_bytes: int | None, total_bytes: int | None) -> str: if not downloaded_bytes and not total_bytes: return "-" downloaded_mb = float(downloaded_bytes or 0) / 1024 / 1024 total_mb = float(total_bytes or 0) / 1024 / 1024 if total_bytes: return f"{downloaded_mb:.1f} / {total_mb:.1f} MB" return f"{downloaded_mb:.1f} MB" def _serialize_worker_row(row: dict[str, Any]) -> dict[str, Any]: payload_text = str(row.get("current_display_text") or "").strip() fallback_text = str(row.get("song_name") or row.get("playlist_name") or row.get("item_key") or "").strip() speed_bytes_per_sec = row.get("speed_bytes_per_sec") downloaded_bytes = row.get("downloaded_bytes") total_bytes = row.get("total_bytes") return { "id": int(row["id"]), "job_run_id": row["job_run_id"], "job_stage_id": row["job_stage_id"], "worker_name": row["worker_name"], "status": row["status"], "current_job_item_id": row["current_job_item_id"], "current_song_id": row["current_song_id"], "current_playlist_id": row["current_playlist_id"], "display_text": payload_text or fallback_text, "stage_type": row.get("stage_type"), "item_type": row.get("item_type"), "last_progress_text": row.get("last_progress_text"), "downloaded_bytes": int(downloaded_bytes) if downloaded_bytes is not None else None, "total_bytes": int(total_bytes) if total_bytes is not None else None, "speed_bytes_per_sec": float(speed_bytes_per_sec) if speed_bytes_per_sec is not None else None, "speed_text": _format_speed_text(speed_bytes_per_sec), "bytes_text": _format_bytes_text(downloaded_bytes, total_bytes), "progress_percent": float(row["progress_percent"]) if row.get("progress_percent") is not None else None, "processed_count": int(row["processed_count"] or 0), "error_count": int(row["error_count"] or 0), "heartbeat_at": row.get("heartbeat_at"), } ``` - [ ] **Step 4: Run the targeted speed tests and verify they pass** Run: `python -m unittest tests.catalogsync.test_ops_repository.OpsRepositoryTaskCenterTests.test_list_task_center_rows_uses_live_worker_speed_for_download_jobs -v` Expected: - `OK` - download task summaries append the live transfer rate when a real worker speed exists Run: `python -m unittest tests.catalogsync.test_ops_api.OperationsApiTests.test_api_dashboard_exposes_worker_speed_and_bytes_text -v` Expected: - `OK` - worker rows include `downloaded_bytes`, `total_bytes`, `speed_bytes_per_sec`, `speed_text`, and `bytes_text` - [ ] **Step 5: Commit** ```bash git add musicdl/catalogsync/downloader.py musicdl/catalogsync/ops/executors.py musicdl/catalogsync/ops/repository.py musicdl/catalogsync/ops/web.py tests/catalogsync/test_ops_repository.py tests/catalogsync/test_ops_api.py git commit -m "feat: add download throughput reporting" ``` ## Task 6: Update Documentation And Run Regression Coverage **Files:** - Modify: `docs/catalogsync.md` - [ ] **Step 1: Update the operator documentation** ```markdown ## Task Center - `Dashboard` is now the main operator page. - Every job appears in one task table with inline expansion. - Download-class jobs are serialized into one `download` lane: - `catalog_sync` - `sync_download` - `download_only` - `download_upload` - `collect_only` and `sync_only` run in the `general` lane and are not blocked by the active download job. ## Playlist Bulk Actions - `/playlists` supports: - `Sync Selected Playlists` - `Download Selected Playlists` - `Sync Then Download` - `Mark Wanted` - `Unmark Wanted` - When a playlist shows `0 songs, sync recommended`, use `Sync Selected Playlists` first to refresh `playlist_songs` and `song_count`. ## Download Speed - Running download workers publish structured progress: - `downloaded_bytes` - `total_bytes` - `speed_bytes_per_sec` - `progress_percent` - Task rows aggregate active worker speed and display it as `MB/s`. - If a provider does not expose live file growth, the UI shows `-` instead of guessing a rate. ``` - [ ] **Step 2: Run focused regression tests** Run: `python -m unittest tests.catalogsync.test_db tests.catalogsync.test_ops_repository tests.catalogsync.test_ops_runner tests.catalogsync.test_ops_api -v` Expected: - `OK` - schema upgrades, repository summaries, scheduler lanes, API routes, and HTML rendering all pass together - [ ] **Step 3: Run the full `catalogsync` test suite** Run: `python -m unittest discover -s tests/catalogsync -v` Expected: - `OK` - no existing `catalogsync` behavior regresses after the task-center refactor - [ ] **Step 4: Commit** ```bash git add docs/catalogsync.md git commit -m "docs: describe task center operations flow" ```