1943 lines
72 KiB
Markdown
1943 lines
72 KiB
Markdown
# Task Center Download Lanes Implementation Plan
|
|
|
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
|
|
|
**Goal:** Turn the operations console into a real task center with lane-aware scheduling, playlist-bulk sync, and live download throughput for running download jobs.
|
|
|
|
**Architecture:** Move job-type rules into one shared `ops` module, extend the SQLite-backed operations repository with dashboard-ready task summaries and worker throughput fields, and refactor `OpsRunner` into a two-lane supervisor that can keep one download-class job plus several general jobs alive concurrently. Keep the existing collect/sync/download/upload stage executors intact, reuse `/api/jobs/{id}` as the source of expanded row detail, and compute download speed from file-growth monitoring inside `catalogsync` rather than parsing terminal text.
|
|
|
|
**Tech Stack:** Python 3, unittest, SQLite, FastAPI, Jinja2, vanilla JavaScript, ThreadPoolExecutor, existing `musicdl.catalogsync` repositories and downloader.
|
|
|
|
---
|
|
|
|
## File Structure
|
|
|
|
### New files
|
|
|
|
- `musicdl/catalogsync/ops/jobdefs.py`
|
|
- canonical job-stage sequences, lane classification, and user-facing job labels shared by runner, repository, and web layers
|
|
- `tests/catalogsync/test_ops_repository.py`
|
|
- repository-level coverage for task-center summaries, queue labels, scope text, and worker speed aggregation
|
|
|
|
### Modified files
|
|
|
|
- `musicdl/catalogsync/db.py`
|
|
- add `job_workers` throughput columns and an idempotent column-upgrade helper for existing NAS databases
|
|
- `musicdl/catalogsync/ops/repository.py`
|
|
- add queued/active job queries, CAS job claiming by id, task-center summary projection, worker throughput persistence, and worker cleanup
|
|
- `musicdl/catalogsync/ops/runner.py`
|
|
- replace the single-active-job loop with a lane-aware supervisor and per-job execution futures
|
|
- `musicdl/catalogsync/ops/web.py`
|
|
- expose task-center payloads, playlist bulk sync, and dashboard/task summary serialization
|
|
- `musicdl/catalogsync/templates/ops/dashboard.html`
|
|
- replace the old active-job/recent-jobs split with a single inline-expandable task table
|
|
- `musicdl/catalogsync/templates/ops/playlists.html`
|
|
- add `sync selected playlists` and light `0 songs, sync recommended` hints
|
|
- `musicdl/catalogsync/templates/ops/jobs.html`
|
|
- simplify the fallback archive page now that `Dashboard` is the main operator surface
|
|
- `musicdl/catalogsync/templates/ops/job_detail.html`
|
|
- keep the deep-link troubleshooting page aligned with the new task-center wording and metrics
|
|
- `musicdl/catalogsync/static/ops/app.js`
|
|
- render task-center rows, keep expanded rows refreshed, send compact job commands, and call the new playlist sync endpoint
|
|
- `musicdl/catalogsync/downloader.py`
|
|
- monitor file growth during downloads and emit structured throughput updates back to the ops layer
|
|
- `musicdl/catalogsync/ops/executors.py`
|
|
- pass structured worker progress from downloader callbacks into `OpsRepository.update_worker_state`
|
|
- `tests/catalogsync/test_db.py`
|
|
- verify column-upgrade behavior for `job_workers`
|
|
- `tests/catalogsync/test_ops_runner.py`
|
|
- prove download-lane serialization and general-lane concurrency
|
|
- `tests/catalogsync/test_ops_api.py`
|
|
- verify new API payload fields, dashboard markup, playlist bulk sync, and speed rendering
|
|
- `docs/catalogsync.md`
|
|
- document the task center, lane semantics, playlist sync action, and speed display rules
|
|
|
|
## Task 1: Add Shared Job Definitions, Worker Throughput Schema, And Task Summary Queries
|
|
|
|
**Files:**
|
|
- Create: `musicdl/catalogsync/ops/jobdefs.py`
|
|
- Modify: `musicdl/catalogsync/db.py`
|
|
- Modify: `musicdl/catalogsync/ops/repository.py`
|
|
- Modify: `tests/catalogsync/test_db.py`
|
|
- Create: `tests/catalogsync/test_ops_repository.py`
|
|
|
|
- [ ] **Step 1: Write the failing schema and repository tests**
|
|
|
|
```python
|
|
import sqlite3
|
|
import tempfile
|
|
import unittest
|
|
from contextlib import closing
|
|
from pathlib import Path
|
|
|
|
|
|
class DatabaseSchemaTests(unittest.TestCase):
|
|
def test_initialize_database_upgrades_job_workers_with_throughput_columns(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
with closing(sqlite3.connect(db_path)) as conn:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE job_workers (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
job_run_id INTEGER,
|
|
job_stage_id INTEGER,
|
|
worker_name TEXT NOT NULL,
|
|
status TEXT NOT NULL DEFAULT 'idle',
|
|
current_job_item_id INTEGER,
|
|
current_song_id INTEGER,
|
|
current_playlist_id INTEGER,
|
|
current_display_text TEXT,
|
|
heartbeat_at TEXT,
|
|
last_progress_text TEXT,
|
|
processed_count INTEGER NOT NULL DEFAULT 0,
|
|
error_count INTEGER NOT NULL DEFAULT 0
|
|
)
|
|
"""
|
|
)
|
|
conn.commit()
|
|
|
|
initialize_database(db_path).close()
|
|
|
|
with closing(sqlite3.connect(db_path)) as conn:
|
|
columns = {
|
|
row[1]
|
|
for row in conn.execute("PRAGMA table_info(job_workers)").fetchall()
|
|
}
|
|
|
|
self.assertIn("downloaded_bytes", columns)
|
|
self.assertIn("total_bytes", columns)
|
|
self.assertIn("speed_bytes_per_sec", columns)
|
|
self.assertIn("progress_percent", columns)
|
|
```
|
|
|
|
```python
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
|
|
class OpsRepositoryTaskCenterTests(unittest.TestCase):
|
|
def setUp(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
self.tmpdir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True)
|
|
self.addCleanup(self.tmpdir.cleanup)
|
|
self.db_path = Path(self.tmpdir.name) / "catalogsync.db"
|
|
initialize_database(self.db_path).close()
|
|
self.repo = OpsRepository(self.db_path)
|
|
self.ItemStatus = ItemStatus
|
|
self.JobStatus = JobStatus
|
|
self.StageStatus = StageStatus
|
|
|
|
def test_list_task_center_rows_computes_lane_queue_progress_and_speed(self):
|
|
running_download_job = self.repo.create_job(
|
|
job_type="download_only",
|
|
config_snapshot={},
|
|
status=self.JobStatus.RUNNING,
|
|
)
|
|
queued_download_job = self.repo.create_job(
|
|
job_type="catalog_sync",
|
|
config_snapshot={},
|
|
status=self.JobStatus.QUEUED,
|
|
)
|
|
general_job = self.repo.create_job(
|
|
job_type="sync_only",
|
|
config_snapshot={},
|
|
status=self.JobStatus.RUNNING,
|
|
playlist_scope={"playlist_ids": [11, 12]},
|
|
)
|
|
|
|
download_stage = self.repo.create_stage(
|
|
job_run_id=running_download_job,
|
|
stage_type="download",
|
|
seq_no=1,
|
|
status=self.StageStatus.RUNNING,
|
|
)
|
|
self.repo.create_item(
|
|
job_stage_id=download_stage,
|
|
item_type="song_download",
|
|
item_key="song:done",
|
|
song_id=201,
|
|
status=self.ItemStatus.SUCCEEDED,
|
|
)
|
|
running_item = self.repo.create_item(
|
|
job_stage_id=download_stage,
|
|
item_type="song_download",
|
|
item_key="song:running",
|
|
song_id=202,
|
|
status=self.ItemStatus.PENDING,
|
|
payload={"row": {"id": 202, "name": "Song 202"}},
|
|
)
|
|
self.repo.claim_item(item_id=running_item, worker_name="download-1")
|
|
self.repo.update_worker_state(
|
|
"download-1",
|
|
status="running",
|
|
current_job_item_id=running_item,
|
|
current_song_id=202,
|
|
current_display_text="Song 202",
|
|
downloaded_bytes=10 * 1024 * 1024,
|
|
total_bytes=20 * 1024 * 1024,
|
|
speed_bytes_per_sec=3 * 1024 * 1024,
|
|
progress_percent=50.0,
|
|
)
|
|
|
|
rows = self.repo.list_task_center_rows(limit=10)
|
|
by_id = {int(row["id"]): row for row in rows}
|
|
|
|
self.assertEqual("download", by_id[running_download_job]["lane_type"])
|
|
self.assertEqual("queued #1", by_id[queued_download_job]["queue_label"])
|
|
self.assertEqual("general", by_id[general_job]["lane_type"])
|
|
self.assertEqual("2 playlists", by_id[general_job]["scope_summary"])
|
|
self.assertEqual(1, by_id[running_download_job]["active_worker_count"])
|
|
self.assertEqual(50, by_id[running_download_job]["primary_progress_percent"])
|
|
self.assertEqual(3 * 1024 * 1024, by_id[running_download_job]["speed_bytes_per_sec"])
|
|
self.assertEqual("3.0 MB/s", by_id[running_download_job]["speed_text"])
|
|
self.assertIn("1 / 2 songs", by_id[running_download_job]["primary_progress_text"])
|
|
```
|
|
|
|
- [ ] **Step 2: Run the targeted tests and verify they fail**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_db.DatabaseSchemaTests.test_initialize_database_upgrades_job_workers_with_throughput_columns -v`
|
|
|
|
Expected:
|
|
|
|
- `FAIL`
|
|
- the failure shows one or more throughput columns missing from `job_workers`
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_repository.OpsRepositoryTaskCenterTests.test_list_task_center_rows_computes_lane_queue_progress_and_speed -v`
|
|
|
|
Expected:
|
|
|
|
- `ERROR`
|
|
- `OpsRepository` does not yet expose `list_task_center_rows`
|
|
|
|
- [ ] **Step 3: Implement shared job rules, schema upgrade, and task-center summaries**
|
|
|
|
```python
|
|
# musicdl/catalogsync/ops/jobdefs.py
|
|
from __future__ import annotations
|
|
|
|
DOWNLOAD_LANE = "download"
|
|
GENERAL_LANE = "general"
|
|
|
|
JOB_STAGE_SEQUENCES = {
|
|
"catalog_sync": ["collect", "sync", "download"],
|
|
"collect_only": ["collect"],
|
|
"sync_only": ["sync"],
|
|
"sync_download": ["sync", "download"],
|
|
"download_only": ["download"],
|
|
"upload_only": ["upload"],
|
|
"download_upload": ["download", "upload"],
|
|
}
|
|
|
|
|
|
def job_has_stage(job_type: str, stage_type: str) -> bool:
|
|
return stage_type in JOB_STAGE_SEQUENCES.get(str(job_type), [])
|
|
|
|
|
|
def job_lane_type(job_type: str) -> str:
|
|
return DOWNLOAD_LANE if job_has_stage(job_type, "download") else GENERAL_LANE
|
|
|
|
|
|
def primary_stage_type(job_type: str) -> str | None:
|
|
for stage_type in ("download", "upload", "sync", "collect"):
|
|
if job_has_stage(job_type, stage_type):
|
|
return stage_type
|
|
return None
|
|
|
|
|
|
def display_name(job_type: str, playlist_scope: dict[str, object] | None = None) -> str:
|
|
playlist_ids = (playlist_scope or {}).get("playlist_ids")
|
|
is_scoped = isinstance(playlist_ids, list) and len(playlist_ids) > 0
|
|
mapping = {
|
|
"catalog_sync": "Full Pipeline",
|
|
"collect_only": "Collect",
|
|
"sync_only": "Sync Selected Playlists" if is_scoped else "Sync",
|
|
"sync_download": "Sync Then Download" if is_scoped else "Sync Then Download All",
|
|
"download_only": "Download Selected Playlists" if is_scoped else "Download",
|
|
"upload_only": "Upload",
|
|
"download_upload": "Download Then Upload",
|
|
}
|
|
return mapping.get(str(job_type), str(job_type))
|
|
```
|
|
|
|
```python
|
|
# musicdl/catalogsync/db.py
|
|
JOB_WORKER_COLUMN_MIGRATIONS = {
|
|
"downloaded_bytes": "INTEGER",
|
|
"total_bytes": "INTEGER",
|
|
"speed_bytes_per_sec": "REAL",
|
|
"progress_percent": "REAL",
|
|
}
|
|
|
|
|
|
def _ensure_table_columns(
|
|
conn: sqlite3.Connection,
|
|
*,
|
|
table_name: str,
|
|
required_columns: dict[str, str],
|
|
) -> None:
|
|
existing = {
|
|
str(row["name"])
|
|
for row in conn.execute(f"PRAGMA table_info({table_name})").fetchall()
|
|
}
|
|
for column_name, column_type in required_columns.items():
|
|
if column_name in existing:
|
|
continue
|
|
conn.execute(
|
|
f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_type}"
|
|
)
|
|
|
|
|
|
def initialize_database(
|
|
db_path: str | Path,
|
|
default_library_root: str | Path | None = None,
|
|
) -> sqlite3.Connection:
|
|
conn = connect_database(db_path)
|
|
for statement in SCHEMA_STATEMENTS:
|
|
conn.execute(statement)
|
|
_ensure_table_columns(
|
|
conn,
|
|
table_name="job_workers",
|
|
required_columns=JOB_WORKER_COLUMN_MIGRATIONS,
|
|
)
|
|
if default_library_root is not None:
|
|
Path(default_library_root).mkdir(parents=True, exist_ok=True)
|
|
ensure_default_local_backend(conn, default_library_root)
|
|
conn.commit()
|
|
return conn
|
|
```
|
|
|
|
```python
|
|
# musicdl/catalogsync/ops/repository.py
|
|
from .jobdefs import GENERAL_LANE, display_name, job_lane_type, primary_stage_type
|
|
|
|
|
|
def _format_speed_text(speed_bytes_per_sec: float | int | None) -> str:
|
|
if not speed_bytes_per_sec:
|
|
return "-"
|
|
return f"{float(speed_bytes_per_sec) / 1024 / 1024:.1f} MB/s"
|
|
|
|
|
|
class OpsRepository:
|
|
def list_active_jobs(self) -> list[JobRun]:
|
|
rows = self._fetchall(
|
|
"""
|
|
SELECT *
|
|
FROM job_runs
|
|
WHERE status IN (?, ?)
|
|
ORDER BY COALESCE(started_at, created_at) ASC, id ASC
|
|
""",
|
|
(JobStatus.RUNNING.value, JobStatus.PAUSE_REQUESTED.value),
|
|
)
|
|
return [self._row_to_job(row) for row in rows]
|
|
```
|
|
|
|
```python
|
|
# musicdl/catalogsync/ops/repository.py
|
|
class OpsRepository:
|
|
def list_queued_jobs(self, limit: int = 100) -> list[JobRun]:
|
|
rows = self._fetchall(
|
|
"""
|
|
SELECT *
|
|
FROM job_runs
|
|
WHERE status = ?
|
|
ORDER BY priority ASC, created_at ASC, id ASC
|
|
LIMIT ?
|
|
""",
|
|
(JobStatus.QUEUED.value, int(limit)),
|
|
)
|
|
return [self._row_to_job(row) for row in rows]
|
|
|
|
def list_task_center_rows(self, limit: int = 50) -> list[dict[str, Any]]:
|
|
jobs = [
|
|
self._row_to_job(row)
|
|
for row in self._fetchall(
|
|
"SELECT * FROM job_runs ORDER BY id DESC LIMIT ?",
|
|
(int(limit),),
|
|
)
|
|
]
|
|
queued_download_ids = [
|
|
job.id
|
|
for job in reversed(jobs)
|
|
if job.status == JobStatus.QUEUED and job_lane_type(job.job_type) != GENERAL_LANE
|
|
]
|
|
rows: list[dict[str, Any]] = []
|
|
for job in jobs:
|
|
stages = self.list_job_stages(job.id)
|
|
stage_by_type = {stage.stage_type: stage for stage in stages}
|
|
target_stage = stage_by_type.get(primary_stage_type(job.job_type) or "")
|
|
completed = int(target_stage.success_items) if target_stage else 0
|
|
total = int(target_stage.total_items) if target_stage else 0
|
|
workers = self.list_job_workers(job.id, active_only=True, limit=100)
|
|
speed = float(
|
|
sum(float(worker.get("speed_bytes_per_sec") or 0) for worker in workers)
|
|
)
|
|
lane_type = job_lane_type(job.job_type)
|
|
queue_label = lane_type
|
|
if lane_type == "download" and job.status == JobStatus.QUEUED:
|
|
queue_label = f"queued #{queued_download_ids.index(job.id) + 1}"
|
|
elif job.status == JobStatus.RUNNING:
|
|
queue_label = "running"
|
|
playlist_ids = job.playlist_scope.get("playlist_ids")
|
|
if isinstance(playlist_ids, list) and playlist_ids:
|
|
scope_summary = f"{len(playlist_ids)} playlists"
|
|
elif job.sources:
|
|
scope_summary = f"{len(job.sources)} sources"
|
|
else:
|
|
scope_summary = "All sources"
|
|
progress_text = f"{completed} / {total} songs"
|
|
if speed > 0:
|
|
progress_text = f"{progress_text} | {_format_speed_text(speed)}"
|
|
rows.append(
|
|
{
|
|
"id": int(job.id),
|
|
"job_type": str(job.job_type),
|
|
"display_name": display_name(job.job_type, job.playlist_scope),
|
|
"status": job.status.value,
|
|
"lane_type": lane_type,
|
|
"queue_label": queue_label,
|
|
"scope_summary": scope_summary,
|
|
"primary_progress_text": progress_text,
|
|
"primary_progress_percent": _progress_percent(completed, total),
|
|
"active_worker_count": len(workers),
|
|
"speed_bytes_per_sec": speed,
|
|
"speed_text": _format_speed_text(speed),
|
|
"can_pause": job.status in {JobStatus.QUEUED, JobStatus.RUNNING},
|
|
"can_resume": job.status in {JobStatus.PAUSED, JobStatus.PAUSE_REQUESTED},
|
|
"can_cancel": job.status in {
|
|
JobStatus.QUEUED,
|
|
JobStatus.RUNNING,
|
|
JobStatus.PAUSED,
|
|
JobStatus.PAUSE_REQUESTED,
|
|
},
|
|
"detail_url": f"/api/jobs/{job.id}",
|
|
}
|
|
)
|
|
return rows
|
|
|
|
def update_worker_state(self, worker_name: str, **state: Any) -> None:
|
|
normalized_worker = str(worker_name).strip() or "worker"
|
|
with self._connection() as conn:
|
|
row = conn.execute(
|
|
"""
|
|
SELECT id
|
|
FROM job_workers
|
|
WHERE worker_name = ?
|
|
ORDER BY id DESC
|
|
LIMIT 1
|
|
""",
|
|
(normalized_worker,),
|
|
).fetchone()
|
|
if row is None:
|
|
worker_id = int(
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO job_workers (worker_name, status, heartbeat_at)
|
|
VALUES (?, ?, CURRENT_TIMESTAMP)
|
|
""",
|
|
(normalized_worker, str(state.get("status") or "running")),
|
|
).lastrowid
|
|
)
|
|
else:
|
|
worker_id = int(row["id"])
|
|
updates = ["heartbeat_at = CURRENT_TIMESTAMP"]
|
|
params: list[Any] = []
|
|
for field_name in (
|
|
"status",
|
|
"current_job_item_id",
|
|
"current_song_id",
|
|
"current_playlist_id",
|
|
"current_display_text",
|
|
"last_progress_text",
|
|
"downloaded_bytes",
|
|
"total_bytes",
|
|
"speed_bytes_per_sec",
|
|
"progress_percent",
|
|
):
|
|
if field_name in state:
|
|
updates.append(f"{field_name} = ?")
|
|
params.append(state[field_name])
|
|
processed_increment = int(state.get("processed_increment") or 0)
|
|
error_increment = int(state.get("error_increment") or 0)
|
|
if processed_increment:
|
|
updates.append("processed_count = processed_count + ?")
|
|
params.append(processed_increment)
|
|
if error_increment:
|
|
updates.append("error_count = error_count + ?")
|
|
params.append(error_increment)
|
|
params.append(worker_id)
|
|
conn.execute(
|
|
f"UPDATE job_workers SET {', '.join(updates)} WHERE id = ?",
|
|
tuple(params),
|
|
)
|
|
```
|
|
|
|
- [ ] **Step 4: Run the targeted tests and verify they pass**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_db.DatabaseSchemaTests.test_initialize_database_upgrades_job_workers_with_throughput_columns -v`
|
|
|
|
Expected:
|
|
|
|
- `OK`
|
|
- rerunning `initialize_database()` upgrades an existing `job_workers` table in place
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_repository.OpsRepositoryTaskCenterTests.test_list_task_center_rows_computes_lane_queue_progress_and_speed -v`
|
|
|
|
Expected:
|
|
|
|
- `OK`
|
|
- task-center rows include `lane_type`, `queue_label`, `scope_summary`, `primary_progress_percent`, and `speed_text`
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add musicdl/catalogsync/ops/jobdefs.py musicdl/catalogsync/db.py musicdl/catalogsync/ops/repository.py tests/catalogsync/test_db.py tests/catalogsync/test_ops_repository.py
|
|
git commit -m "feat: add task center summary projections"
|
|
```
|
|
|
|
## Task 2: Refactor The Runner Into Download And General Lanes
|
|
|
|
**Files:**
|
|
- Modify: `musicdl/catalogsync/ops/runner.py`
|
|
- Modify: `musicdl/catalogsync/ops/repository.py`
|
|
- Modify: `tests/catalogsync/test_ops_runner.py`
|
|
|
|
- [ ] **Step 1: Write the failing scheduler tests**
|
|
|
|
```python
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
|
|
class OpsRunnerLaneTests(unittest.TestCase):
|
|
def _seed_stage_item(self, repo, *, job_id: int, stage_type: str, item_key: str) -> int:
|
|
stage_id = repo.create_stage(job_run_id=job_id, stage_type=stage_type, seq_no=1)
|
|
return repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="song_download" if stage_type == "download" else "playlist_sync",
|
|
item_key=item_key,
|
|
song_id=101 if stage_type == "download" else None,
|
|
playlist_id=201 if stage_type == "sync" else None,
|
|
payload={"row": {"id": 101, "name": "Task Item"}},
|
|
)
|
|
|
|
def test_download_lane_runs_only_one_job_at_a_time(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.models import JobStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
from musicdl.catalogsync.ops.runner import OpsRunner
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
initialize_database(db_path).close()
|
|
repo = OpsRepository(db_path)
|
|
runner = OpsRunner(repository=repo, sleep_seconds=0.01)
|
|
|
|
job_a = repo.create_job(job_type="download_only", config_snapshot={})
|
|
job_b = repo.create_job(job_type="catalog_sync", config_snapshot={})
|
|
self._seed_stage_item(repo, job_id=job_a, stage_type="download", item_key="song:a")
|
|
self._seed_stage_item(repo, job_id=job_b, stage_type="download", item_key="song:b")
|
|
|
|
gate = threading.Event()
|
|
|
|
def slow_download(executor, item_id, worker_name, *, already_claimed=False):
|
|
gate.wait(0.2)
|
|
executor.ops_repo.mark_item_succeeded(item_id=item_id)
|
|
|
|
with patch(
|
|
"musicdl.catalogsync.ops.executors.DownloadStageExecutor.process_item",
|
|
new=slow_download,
|
|
):
|
|
runner.loop_once()
|
|
time.sleep(0.05)
|
|
runner.loop_once()
|
|
status_a = repo.get_job(job_a).status
|
|
status_b = repo.get_job(job_b).status
|
|
gate.set()
|
|
time.sleep(0.1)
|
|
|
|
self.assertEqual(JobStatus.RUNNING, status_a)
|
|
self.assertEqual(JobStatus.QUEUED, status_b)
|
|
```
|
|
|
|
```python
|
|
class OpsRunnerLaneTests(unittest.TestCase):
|
|
def test_general_lane_job_can_run_while_download_lane_job_is_active(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.models import JobStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
from musicdl.catalogsync.ops.runner import OpsRunner
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
initialize_database(db_path).close()
|
|
repo = OpsRepository(db_path)
|
|
runner = OpsRunner(repository=repo, sleep_seconds=0.01)
|
|
|
|
download_job = repo.create_job(job_type="download_only", config_snapshot={})
|
|
sync_job = repo.create_job(job_type="sync_only", config_snapshot={})
|
|
self._seed_stage_item(repo, job_id=download_job, stage_type="download", item_key="song:d")
|
|
self._seed_stage_item(repo, job_id=sync_job, stage_type="sync", item_key="playlist:s")
|
|
|
|
download_gate = threading.Event()
|
|
sync_seen = threading.Event()
|
|
|
|
def slow_download(executor, item_id, worker_name, *, already_claimed=False):
|
|
download_gate.wait(0.2)
|
|
executor.ops_repo.mark_item_succeeded(item_id=item_id)
|
|
|
|
def fast_sync(executor, item_id, worker_name, *, already_claimed=False):
|
|
sync_seen.set()
|
|
executor.ops_repo.mark_item_succeeded(item_id=item_id)
|
|
|
|
with patch(
|
|
"musicdl.catalogsync.ops.executors.DownloadStageExecutor.process_item",
|
|
new=slow_download,
|
|
):
|
|
with patch(
|
|
"musicdl.catalogsync.ops.executors.SyncStageExecutor.process_item",
|
|
new=fast_sync,
|
|
):
|
|
runner.loop_once()
|
|
time.sleep(0.05)
|
|
runner.loop_once()
|
|
sync_seen.wait(0.2)
|
|
download_status = repo.get_job(download_job).status
|
|
sync_status = repo.get_job(sync_job).status
|
|
download_gate.set()
|
|
time.sleep(0.1)
|
|
|
|
self.assertEqual(JobStatus.RUNNING, download_status)
|
|
self.assertIn(sync_status, {JobStatus.RUNNING, JobStatus.COMPLETED})
|
|
|
|
def test_catalog_sync_is_classified_into_download_lane(self):
|
|
from musicdl.catalogsync.ops.jobdefs import DOWNLOAD_LANE, GENERAL_LANE, job_lane_type
|
|
|
|
self.assertEqual(DOWNLOAD_LANE, job_lane_type("catalog_sync"))
|
|
self.assertEqual(DOWNLOAD_LANE, job_lane_type("download_upload"))
|
|
self.assertEqual(GENERAL_LANE, job_lane_type("sync_only"))
|
|
```
|
|
|
|
- [ ] **Step 2: Run the targeted tests and verify they fail**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_runner.OpsRunnerLaneTests -v`
|
|
|
|
Expected:
|
|
|
|
- `FAIL`
|
|
- the second download job is incorrectly started, or the sync job cannot overlap with the active download job
|
|
|
|
- [ ] **Step 3: Implement the lane-aware supervisor without rewriting stage executors**
|
|
|
|
```python
|
|
# musicdl/catalogsync/ops/repository.py
|
|
class OpsRepository:
|
|
def claim_job_if_queued(self, job_id: int) -> JobRun | None:
|
|
with self._connection() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
UPDATE job_runs
|
|
SET
|
|
status = ?,
|
|
started_at = COALESCE(started_at, CURRENT_TIMESTAMP),
|
|
ended_at = NULL
|
|
WHERE id = ? AND status = ?
|
|
""",
|
|
(JobStatus.RUNNING.value, int(job_id), JobStatus.QUEUED.value),
|
|
)
|
|
if cursor.rowcount != 1:
|
|
return None
|
|
row = conn.execute("SELECT * FROM job_runs WHERE id = ?", (int(job_id),)).fetchone()
|
|
return self._row_to_job(row) if row is not None else None
|
|
```
|
|
|
|
```python
|
|
# musicdl/catalogsync/ops/runner.py
|
|
from concurrent.futures import Future, ThreadPoolExecutor
|
|
from collections import Counter
|
|
import threading
|
|
|
|
from .jobdefs import DOWNLOAD_LANE, GENERAL_LANE, JOB_STAGE_SEQUENCES, job_lane_type
|
|
|
|
|
|
class OpsRunner:
|
|
def __init__(
|
|
self,
|
|
repository: OpsRepository,
|
|
sleep_seconds: float = 1.0,
|
|
*,
|
|
download_lane_concurrency: int = 1,
|
|
general_lane_concurrency: int = 3,
|
|
):
|
|
self.repository = repository
|
|
self.sleep_seconds = max(float(sleep_seconds), 0.1)
|
|
self.download_lane_concurrency = max(int(download_lane_concurrency), 1)
|
|
self.general_lane_concurrency = max(int(general_lane_concurrency), 1)
|
|
self._job_pool = ThreadPoolExecutor(
|
|
max_workers=self.download_lane_concurrency + self.general_lane_concurrency
|
|
)
|
|
self._futures: dict[int, Future[None]] = {}
|
|
self._futures_lock = threading.Lock()
|
|
self.db_path = Path(self.repository.db_path)
|
|
self.catalog_repo = CatalogRepository(self.db_path)
|
|
|
|
def loop_once(self) -> bool:
|
|
had_commands = bool(self.repository.list_pending_commands())
|
|
self.apply_pending_commands()
|
|
finished = self._reap_finished_jobs()
|
|
started = self._start_eligible_jobs()
|
|
return bool(had_commands or finished or started)
|
|
|
|
def _reap_finished_jobs(self) -> int:
|
|
finished_count = 0
|
|
with self._futures_lock:
|
|
for job_id, future in list(self._futures.items()):
|
|
if not future.done():
|
|
continue
|
|
future.result()
|
|
del self._futures[job_id]
|
|
finished_count += 1
|
|
return finished_count
|
|
|
|
def _start_eligible_jobs(self) -> int:
|
|
started_count = 0
|
|
active_jobs = self.repository.list_active_jobs()
|
|
lane_counts = Counter(job_lane_type(job.job_type) for job in active_jobs)
|
|
for queued_job in self.repository.list_queued_jobs(limit=100):
|
|
lane_type = job_lane_type(queued_job.job_type)
|
|
lane_limit = (
|
|
self.download_lane_concurrency
|
|
if lane_type == DOWNLOAD_LANE
|
|
else self.general_lane_concurrency
|
|
)
|
|
if lane_counts[lane_type] >= lane_limit:
|
|
continue
|
|
claimed = self.repository.claim_job_if_queued(queued_job.id)
|
|
if claimed is None:
|
|
continue
|
|
with self._futures_lock:
|
|
self._futures[claimed.id] = self._job_pool.submit(self._run_job, claimed.id)
|
|
lane_counts[lane_type] += 1
|
|
started_count += 1
|
|
return started_count
|
|
|
|
def _run_job(self, job_id: int) -> None:
|
|
current_job = self.repository.get_job(job_id)
|
|
if current_job is None:
|
|
return
|
|
self._ensure_job_stages(current_job)
|
|
while True:
|
|
current_job = self.repository.get_job(job_id)
|
|
if current_job is None:
|
|
return
|
|
if current_job.status == JobStatus.CANCELED:
|
|
self.repository.finalize_canceled_job(job_id)
|
|
return
|
|
if current_job.status == JobStatus.PAUSE_REQUESTED:
|
|
self.reconcile_pause_state(job_id)
|
|
return
|
|
stage = self._next_runnable_stage(job_id)
|
|
if stage is None:
|
|
self._finalize_job(job_id)
|
|
return
|
|
self._run_stage(current_job, stage)
|
|
self.apply_pending_commands()
|
|
```
|
|
|
|
- [ ] **Step 4: Run the targeted tests and verify they pass**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_runner.OpsRunnerLaneTests -v`
|
|
|
|
Expected:
|
|
|
|
- `OK`
|
|
- only one download-class job runs at once
|
|
- a `sync_only` job can overlap with an active download job
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add musicdl/catalogsync/ops/repository.py musicdl/catalogsync/ops/runner.py tests/catalogsync/test_ops_runner.py
|
|
git commit -m "feat: add lane aware ops runner"
|
|
```
|
|
|
|
## Task 3: Extend The API For Task-Center Payloads And Playlist Bulk Sync
|
|
|
|
**Files:**
|
|
- Modify: `musicdl/catalogsync/ops/web.py`
|
|
- Modify: `tests/catalogsync/test_ops_api.py`
|
|
|
|
- [ ] **Step 1: Write the failing API tests**
|
|
|
|
```python
|
|
def test_api_playlists_sync_creates_sync_only_job_with_scope(self):
|
|
client, db_path, _ = self._build_client()
|
|
playlist_a = self._seed_playlist(
|
|
db_path,
|
|
platform="qq",
|
|
pool_kind="manual_file",
|
|
remote_id="sync-api-a",
|
|
name="Sync API A",
|
|
)
|
|
playlist_b = self._seed_playlist(
|
|
db_path,
|
|
platform="netease",
|
|
pool_kind="playlist_square",
|
|
remote_id="sync-api-b",
|
|
name="Sync API B",
|
|
)
|
|
|
|
response = client.post(
|
|
"/api/playlists/sync",
|
|
json={"playlist_ids": [playlist_b, playlist_a]},
|
|
)
|
|
self.assertEqual(201, response.status_code)
|
|
payload = response.json()["job"]
|
|
self.assertEqual("sync_only", payload["job_type"])
|
|
self.assertEqual([playlist_b, playlist_a], payload["playlist_scope"]["playlist_ids"])
|
|
```
|
|
|
|
```python
|
|
def test_api_dashboard_returns_task_center_rows_with_lane_fields(self):
|
|
from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
client, db_path, _ = self._build_client()
|
|
repo = OpsRepository(db_path)
|
|
|
|
running_job = repo.create_job(
|
|
job_type="download_only",
|
|
config_snapshot={},
|
|
status=JobStatus.RUNNING,
|
|
)
|
|
queued_job = repo.create_job(
|
|
job_type="catalog_sync",
|
|
config_snapshot={},
|
|
status=JobStatus.QUEUED,
|
|
)
|
|
stage_id = repo.create_stage(
|
|
job_run_id=running_job,
|
|
stage_type="download",
|
|
seq_no=1,
|
|
status=StageStatus.RUNNING,
|
|
)
|
|
repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="song_download",
|
|
item_key="song:done",
|
|
song_id=1,
|
|
status=ItemStatus.SUCCEEDED,
|
|
)
|
|
running_item = repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="song_download",
|
|
item_key="song:running",
|
|
song_id=2,
|
|
status=ItemStatus.PENDING,
|
|
payload={"row": {"id": 2, "name": "Song 2"}},
|
|
)
|
|
repo.claim_item(item_id=running_item, worker_name="download-1")
|
|
repo.update_worker_state(
|
|
"download-1",
|
|
status="running",
|
|
current_job_item_id=running_item,
|
|
current_song_id=2,
|
|
current_display_text="Song 2",
|
|
downloaded_bytes=5 * 1024 * 1024,
|
|
total_bytes=10 * 1024 * 1024,
|
|
speed_bytes_per_sec=2 * 1024 * 1024,
|
|
progress_percent=50.0,
|
|
)
|
|
|
|
response = client.get("/api/dashboard")
|
|
self.assertEqual(200, response.status_code)
|
|
payload = response.json()
|
|
self.assertIn("task_rows", payload)
|
|
by_id = {int(row["id"]): row for row in payload["task_rows"]}
|
|
self.assertEqual("download", by_id[running_job]["lane_type"])
|
|
self.assertEqual("queued #1", by_id[queued_job]["queue_label"])
|
|
self.assertEqual("2.0 MB/s", by_id[running_job]["speed_text"])
|
|
self.assertIn("queued_download_jobs", payload["summary"])
|
|
```
|
|
|
|
- [ ] **Step 2: Run the targeted API tests and verify they fail**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_api.OperationsApiTests.test_api_playlists_sync_creates_sync_only_job_with_scope -v`
|
|
|
|
Expected:
|
|
|
|
- `404`
|
|
- `/api/playlists/sync` does not exist yet
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_api.OperationsApiTests.test_api_dashboard_returns_task_center_rows_with_lane_fields -v`
|
|
|
|
Expected:
|
|
|
|
- `FAIL`
|
|
- `/api/dashboard` does not return `task_rows`, lane fields, or queued download counts
|
|
|
|
- [ ] **Step 3: Implement playlist bulk sync and task-center payloads**
|
|
|
|
```python
|
|
# musicdl/catalogsync/ops/web.py
|
|
from .jobdefs import job_lane_type
|
|
|
|
|
|
def _dashboard_summary(repo: OpsRepository) -> dict[str, int]:
|
|
rows = repo._fetchall(
|
|
"""
|
|
SELECT status, COUNT(*) AS count_value
|
|
FROM job_runs
|
|
GROUP BY status
|
|
"""
|
|
)
|
|
counts = {str(row["status"]): int(row["count_value"]) for row in rows}
|
|
queued_download_jobs = sum(
|
|
1
|
|
for job in repo.list_queued_jobs(limit=1000)
|
|
if job_lane_type(job.job_type) == "download"
|
|
)
|
|
return {
|
|
"total_jobs": int(sum(counts.values())),
|
|
"queued_jobs": int(counts.get("queued", 0)),
|
|
"queued_download_jobs": int(queued_download_jobs),
|
|
"running_jobs": int(counts.get("running", 0)),
|
|
"paused_jobs": int(counts.get("paused", 0) + counts.get("pause_requested", 0)),
|
|
"failed_jobs": int(counts.get("failed", 0) + counts.get("completed_with_errors", 0)),
|
|
}
|
|
|
|
|
|
def _dashboard_payload(repo: OpsRepository) -> dict[str, Any]:
|
|
payload = {
|
|
"summary": _dashboard_summary(repo),
|
|
"download_stats": _download_stats(repo),
|
|
"playlist_sources": _playlist_source_stats(repo),
|
|
"task_rows": repo.list_task_center_rows(limit=50),
|
|
"workers": _dashboard_workers(repo),
|
|
"running_items": _dashboard_running_items(repo),
|
|
}
|
|
recent_events = repo._fetchall(
|
|
"""
|
|
SELECT id, job_run_id, event_type, message, created_at
|
|
FROM job_events
|
|
ORDER BY id DESC
|
|
LIMIT 5
|
|
"""
|
|
)
|
|
payload["recent_events"] = [dict(row) for row in recent_events]
|
|
return payload
|
|
|
|
|
|
def _build_playlist_job(
|
|
repo: OpsRepository,
|
|
env_manager: CatalogsyncEnvManager,
|
|
*,
|
|
job_type: Literal["download_only", "sync_download", "sync_only"],
|
|
payload: PlaylistBulkRequest,
|
|
) -> dict[str, Any]:
|
|
playlist_ids = _normalize_playlist_ids(payload.playlist_ids)
|
|
if not playlist_ids:
|
|
raise HTTPException(status_code=422, detail="playlist_ids is required")
|
|
snapshot = env_manager.build_job_snapshot()
|
|
job_id = repo.create_job(
|
|
job_type=job_type,
|
|
requested_by=payload.requested_by,
|
|
config_snapshot=dict(snapshot),
|
|
sources=_normalize_allowed_list(snapshot.get("SOURCES"), ALLOWED_COLLECT_SOURCES),
|
|
download_sources=_normalize_allowed_list(
|
|
snapshot.get("DOWNLOAD_SOURCES"),
|
|
ALLOWED_DOWNLOAD_SOURCES,
|
|
),
|
|
playlist_scope={"playlist_ids": playlist_ids},
|
|
)
|
|
_ensure_job_stages(repo, job_id, job_type)
|
|
job = repo.get_job(job_id)
|
|
if job is None:
|
|
raise HTTPException(status_code=500, detail="job create failed")
|
|
return {"job": _serialize_job(job)}
|
|
|
|
|
|
@app.post("/api/playlists/sync", status_code=201)
|
|
def api_sync_selected_playlists(payload: PlaylistBulkRequest):
|
|
return _build_playlist_job(repo, env_manager, job_type="sync_only", payload=payload)
|
|
```
|
|
|
|
- [ ] **Step 4: Run the targeted API tests and verify they pass**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_api.OperationsApiTests.test_api_playlists_sync_creates_sync_only_job_with_scope -v`
|
|
|
|
Expected:
|
|
|
|
- `OK`
|
|
- the created job is `sync_only` and keeps the selected `playlist_ids` ordering
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_api.OperationsApiTests.test_api_dashboard_returns_task_center_rows_with_lane_fields -v`
|
|
|
|
Expected:
|
|
|
|
- `OK`
|
|
- `/api/dashboard` returns `task_rows`, `lane_type`, `queue_label`, and `queued_download_jobs`
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add musicdl/catalogsync/ops/web.py tests/catalogsync/test_ops_api.py
|
|
git commit -m "feat: add task center dashboard api"
|
|
```
|
|
|
|
## Task 4: Rebuild The Dashboard And Playlist UX Around The Task Center
|
|
|
|
**Files:**
|
|
- Modify: `musicdl/catalogsync/templates/ops/dashboard.html`
|
|
- Modify: `musicdl/catalogsync/templates/ops/playlists.html`
|
|
- Modify: `musicdl/catalogsync/templates/ops/jobs.html`
|
|
- Modify: `musicdl/catalogsync/templates/ops/job_detail.html`
|
|
- Modify: `musicdl/catalogsync/static/ops/app.js`
|
|
- Modify: `tests/catalogsync/test_ops_api.py`
|
|
|
|
- [ ] **Step 1: Write the failing page-render tests**
|
|
|
|
```python
|
|
def test_dashboard_page_renders_task_center_table_and_inline_detail_shell(self):
|
|
from musicdl.catalogsync.ops.models import JobStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
client, db_path, _ = self._build_client()
|
|
repo = OpsRepository(db_path)
|
|
job_id = repo.create_job(
|
|
job_type="download_only",
|
|
config_snapshot={},
|
|
status=JobStatus.RUNNING,
|
|
)
|
|
|
|
response = client.get("/dashboard")
|
|
self.assertEqual(200, response.status_code)
|
|
html = response.text
|
|
self.assertIn("data-task-center-table", html)
|
|
self.assertIn(f'data-task-row="{job_id}"', html)
|
|
self.assertIn(f'data-task-row-expand="{job_id}"', html)
|
|
self.assertIn(f'data-task-row-detail="{job_id}"', html)
|
|
self.assertIn('data-task-command-toggle', html)
|
|
self.assertIn('data-task-command-cancel', html)
|
|
```
|
|
|
|
```python
|
|
def test_playlists_page_renders_sync_selected_playlists_action(self):
|
|
client, db_path, _ = self._build_client()
|
|
self._seed_playlist(
|
|
db_path,
|
|
platform="qq",
|
|
pool_kind="manual_file",
|
|
remote_id="playlist-render-sync",
|
|
name="Playlist Render Sync",
|
|
)
|
|
|
|
response = client.get("/playlists")
|
|
self.assertEqual(200, response.status_code)
|
|
html = response.text
|
|
self.assertIn('data-playlists-page', html)
|
|
self.assertIn('data-playlist-action="sync"', html)
|
|
self.assertIn('data-playlist-select-all', html)
|
|
```
|
|
|
|
- [ ] **Step 2: Run the targeted page-render tests and verify they fail**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_api.OperationsApiTests.test_dashboard_page_renders_task_center_table_and_inline_detail_shell -v`
|
|
|
|
Expected:
|
|
|
|
- `FAIL`
|
|
- the current dashboard still renders separate `Active Job` and `Recent Jobs` sections instead of a single task table
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_api.OperationsApiTests.test_playlists_page_renders_sync_selected_playlists_action -v`
|
|
|
|
Expected:
|
|
|
|
- `FAIL`
|
|
- the playlist page does not yet expose the `sync` bulk action
|
|
|
|
- [ ] **Step 3: Replace the dashboard shell, add inline expansion, and add playlist bulk sync UI**
|
|
|
|
```html
|
|
<!-- musicdl/catalogsync/templates/ops/dashboard.html -->
|
|
<div class="grid">
|
|
<div class="card">
|
|
<h2>Summary</h2>
|
|
<table>
|
|
<tr><th>Total Jobs</th><td data-summary-field="total_jobs">{{ summary.total_jobs }}</td></tr>
|
|
<tr><th>Running</th><td data-summary-field="running_jobs">{{ summary.running_jobs }}</td></tr>
|
|
<tr><th>Queued Download Jobs</th><td data-summary-field="queued_download_jobs">{{ summary.queued_download_jobs }}</td></tr>
|
|
<tr><th>Paused</th><td data-summary-field="paused_jobs">{{ summary.paused_jobs }}</td></tr>
|
|
<tr><th>Failed / Errors</th><td data-summary-field="failed_jobs">{{ summary.failed_jobs }}</td></tr>
|
|
<tr><th>Running Songs</th><td data-download-field="running_song_items">{{ download_stats.running_song_items }}</td></tr>
|
|
</table>
|
|
</div>
|
|
|
|
<div class="card">
|
|
<h2>Quick Actions</h2>
|
|
<div class="button-grid">
|
|
<form action="/api/jobs" method="post" data-json-form data-success="reload">
|
|
<input type="hidden" name="job_type" value="catalog_sync" />
|
|
<input type="hidden" name="requested_by" value="ops-console" />
|
|
<input type="hidden" name="sources" value="{{ default_sources }}" />
|
|
<input type="hidden" name="download_sources" value="{{ default_download_sources }}" />
|
|
<button type="submit">Full Pipeline</button>
|
|
</form>
|
|
<form action="/api/jobs" method="post" data-json-form data-success="reload">
|
|
<input type="hidden" name="job_type" value="collect_only" />
|
|
<input type="hidden" name="requested_by" value="ops-console" />
|
|
<input type="hidden" name="sources" value="{{ default_sources }}" />
|
|
<button type="submit">Collect</button>
|
|
</form>
|
|
<form action="/api/jobs" method="post" data-json-form data-success="reload">
|
|
<input type="hidden" name="job_type" value="sync_only" />
|
|
<input type="hidden" name="requested_by" value="ops-console" />
|
|
<input type="hidden" name="sources" value="{{ default_sources }}" />
|
|
<button type="submit">Sync</button>
|
|
</form>
|
|
<form action="/api/jobs" method="post" data-json-form data-success="reload">
|
|
<input type="hidden" name="job_type" value="download_only" />
|
|
<input type="hidden" name="requested_by" value="ops-console" />
|
|
<input type="hidden" name="download_sources" value="{{ default_download_sources }}" />
|
|
<button type="submit">Download</button>
|
|
</form>
|
|
</div>
|
|
</div>
|
|
</div>
|
|
```
|
|
|
|
```html
|
|
<!-- musicdl/catalogsync/templates/ops/dashboard.html -->
|
|
<div class="card">
|
|
<h2>Task Center</h2>
|
|
<table data-task-center-table>
|
|
<thead>
|
|
<tr>
|
|
<th></th>
|
|
<th>ID</th>
|
|
<th>Task</th>
|
|
<th>Status</th>
|
|
<th>Scope</th>
|
|
<th>Primary Progress</th>
|
|
<th>Active Workers</th>
|
|
<th>Lane</th>
|
|
<th>Actions</th>
|
|
</tr>
|
|
</thead>
|
|
<tbody data-task-center-body>
|
|
{% for row in task_rows %}
|
|
<tr data-task-row="{{ row.id }}">
|
|
<td><button type="button" data-task-row-expand="{{ row.id }}">+</button></td>
|
|
<td>{{ row.id }}</td>
|
|
<td>{{ row.display_name }}</td>
|
|
<td>{{ row.status }}</td>
|
|
<td>{{ row.scope_summary }}</td>
|
|
<td>{{ row.primary_progress_text }}</td>
|
|
<td>{{ row.active_worker_count }}</td>
|
|
<td>{{ row.queue_label }}</td>
|
|
<td>
|
|
<button type="button" data-task-command-toggle="{{ row.id }}">
|
|
{% if row.can_resume %}>{% else %}||{% endif %}
|
|
</button>
|
|
<button type="button" data-task-command-cancel="{{ row.id }}">x</button>
|
|
</td>
|
|
</tr>
|
|
<tr data-task-row-detail="{{ row.id }}" hidden>
|
|
<td colspan="9">
|
|
<div data-task-row-detail-body="{{ row.id }}">Loading...</div>
|
|
</td>
|
|
</tr>
|
|
{% endfor %}
|
|
</tbody>
|
|
</table>
|
|
</div>
|
|
```
|
|
|
|
```html
|
|
<!-- musicdl/catalogsync/templates/ops/playlists.html -->
|
|
<div class="button-grid" style="margin-top: 0.8rem;">
|
|
<button type="button" data-playlist-action="sync">Sync Selected Playlists</button>
|
|
<button type="button" data-playlist-action="download">Download Selected Playlists</button>
|
|
<button type="button" data-playlist-action="sync-download">Sync Then Download</button>
|
|
<button type="button" class="secondary" data-playlist-action="mark-wanted">Mark Wanted</button>
|
|
<button type="button" class="secondary" data-playlist-action="unmark-wanted">Unmark Wanted</button>
|
|
</div>
|
|
<div class="progress-note muted">
|
|
{% if playlist.song_count == 0 %}
|
|
0 songs, sync recommended
|
|
{% elif playlist.running_download_song_count %}
|
|
Running {{ playlist.running_download_song_count }}
|
|
{% else %}
|
|
Idle
|
|
{% endif %}
|
|
</div>
|
|
```
|
|
|
|
```html
|
|
<!-- musicdl/catalogsync/templates/ops/jobs.html -->
|
|
<h1>Jobs Archive</h1>
|
|
<p class="muted">Use <a href="/dashboard">Dashboard</a> for the main task center. This page stays available for fallback browsing.</p>
|
|
```
|
|
|
|
```html
|
|
<!-- musicdl/catalogsync/templates/ops/job_detail.html -->
|
|
<p><a href="/dashboard">Back to Dashboard</a></p>
|
|
<h1>Job {{ job.id }}</h1>
|
|
```
|
|
|
|
```javascript
|
|
// musicdl/catalogsync/static/ops/app.js
|
|
function postJson(path, payload) {
|
|
return window.fetch(path, {
|
|
method: "POST",
|
|
headers: { "Content-Type": "application/json" },
|
|
body: JSON.stringify(payload),
|
|
}).then(function (response) {
|
|
return response.json().then(function (data) {
|
|
if (!response.ok) {
|
|
throw new Error(data.detail || "request failed");
|
|
}
|
|
return data;
|
|
});
|
|
});
|
|
}
|
|
|
|
function commandTypeForRow(row) {
|
|
if (row.can_resume) {
|
|
return "resume";
|
|
}
|
|
if (row.can_pause) {
|
|
return "pause";
|
|
}
|
|
return null;
|
|
}
|
|
|
|
function renderTaskCenterRows(rows) {
|
|
var bodyNode = document.querySelector("[data-task-center-body]");
|
|
if (!bodyNode) {
|
|
return;
|
|
}
|
|
bodyNode.innerHTML = (rows || [])
|
|
.map(function (row) {
|
|
return [
|
|
'<tr data-task-row="' + row.id + '">',
|
|
'<td><button type="button" data-task-row-expand="' + row.id + '">+</button></td>',
|
|
"<td>" + escapeHtml(row.id) + "</td>",
|
|
"<td>" + escapeHtml(row.display_name || row.job_type) + "</td>",
|
|
"<td>" + escapeHtml(row.status || "-") + "</td>",
|
|
"<td>" + escapeHtml(row.scope_summary || "-") + "</td>",
|
|
"<td>" + escapeHtml(row.primary_progress_text || "-") + "</td>",
|
|
"<td>" + escapeHtml(row.active_worker_count || 0) + "</td>",
|
|
"<td>" + escapeHtml(row.queue_label || row.lane_type || "-") + "</td>",
|
|
'<td><button type="button" data-task-command-toggle="' + row.id + '">' +
|
|
escapeHtml(row.can_resume ? ">" : "||") +
|
|
'</button><button type="button" data-task-command-cancel="' + row.id + '">x</button></td>',
|
|
"</tr>",
|
|
'<tr data-task-row-detail="' + row.id + '" hidden><td colspan="9"><div data-task-row-detail-body="' + row.id + '">Loading...</div></td></tr>',
|
|
].join("");
|
|
})
|
|
.join("");
|
|
}
|
|
|
|
function renderTaskDetail(payload) {
|
|
var playlistRows = (payload.playlist_progress || []).map(function (row) {
|
|
return (
|
|
"<tr><td>" +
|
|
escapeHtml(row.playlist_name) +
|
|
"</td><td>" +
|
|
escapeHtml((row.downloaded_songs || 0) + " / " + (row.total_songs || 0)) +
|
|
"</td><td>" +
|
|
escapeHtml((row.progress_percent || 0) + "%") +
|
|
"</td></tr>"
|
|
);
|
|
});
|
|
var workerRows = (payload.workers || []).map(function (worker) {
|
|
return (
|
|
"<tr><td>" +
|
|
escapeHtml(worker.worker_name || "-") +
|
|
"</td><td>" +
|
|
escapeHtml(worker.display_text || "-") +
|
|
"</td><td>" +
|
|
escapeHtml(worker.last_progress_text || "-") +
|
|
"</td></tr>"
|
|
);
|
|
});
|
|
return [
|
|
"<div class=\"grid\">",
|
|
"<div><h3>Stages</h3><pre>" + escapeHtml(JSON.stringify(payload.stages || [], null, 2)) + "</pre></div>",
|
|
"<div><h3>Workers</h3><table><tbody>" + workerRows.join("") + "</tbody></table></div>",
|
|
"<div><h3>Playlist Progress</h3><table><tbody>" + playlistRows.join("") + "</tbody></table></div>",
|
|
"</div>",
|
|
].join("");
|
|
}
|
|
```
|
|
|
|
```javascript
|
|
// musicdl/catalogsync/static/ops/app.js
|
|
function bindTaskCenter(payload) {
|
|
document.querySelectorAll("[data-task-row-expand]").forEach(function (button) {
|
|
button.onclick = function () {
|
|
var jobId = button.getAttribute("data-task-row-expand");
|
|
var detailRow = document.querySelector('[data-task-row-detail="' + jobId + '"]');
|
|
var detailBody = document.querySelector('[data-task-row-detail-body="' + jobId + '"]');
|
|
var shouldOpen = detailRow.hasAttribute("hidden");
|
|
if (!shouldOpen) {
|
|
detailRow.setAttribute("hidden", "hidden");
|
|
return;
|
|
}
|
|
detailRow.removeAttribute("hidden");
|
|
window.fetch("/api/jobs/" + jobId)
|
|
.then(function (response) { return response.json(); })
|
|
.then(function (data) { detailBody.innerHTML = renderTaskDetail(data); });
|
|
};
|
|
});
|
|
|
|
document.querySelectorAll("[data-task-command-toggle]").forEach(function (button) {
|
|
button.onclick = function () {
|
|
var jobId = button.getAttribute("data-task-command-toggle");
|
|
var row = (payload.task_rows || []).find(function (item) { return String(item.id) === String(jobId); });
|
|
var commandType = row ? commandTypeForRow(row) : null;
|
|
if (!commandType) {
|
|
return;
|
|
}
|
|
postJson("/api/jobs/" + jobId + "/commands", { command_type: commandType })
|
|
.then(function () { window.location.reload(); })
|
|
.catch(function (error) { showMessage(error.message || "request failed", true); });
|
|
};
|
|
});
|
|
|
|
document.querySelectorAll("[data-task-command-cancel]").forEach(function (button) {
|
|
button.onclick = function () {
|
|
var jobId = button.getAttribute("data-task-command-cancel");
|
|
postJson("/api/jobs/" + jobId + "/commands", { command_type: "cancel" })
|
|
.then(function () { window.location.reload(); })
|
|
.catch(function (error) { showMessage(error.message || "request failed", true); });
|
|
};
|
|
});
|
|
}
|
|
|
|
function updateDashboard(payload) {
|
|
if (!payload) {
|
|
return;
|
|
}
|
|
var summary = payload.summary || {};
|
|
Object.keys(summary).forEach(function (key) {
|
|
var node = document.querySelector('[data-summary-field="' + key + '"]');
|
|
if (node) {
|
|
node.textContent = String(summary[key]);
|
|
}
|
|
});
|
|
var downloadStats = payload.download_stats || {};
|
|
Object.keys(downloadStats).forEach(function (key) {
|
|
var node = document.querySelector('[data-download-field="' + key + '"]');
|
|
if (node) {
|
|
node.textContent = String(downloadStats[key]);
|
|
}
|
|
});
|
|
renderTaskCenterRows(payload.task_rows || []);
|
|
bindTaskCenter(payload);
|
|
}
|
|
|
|
function bindPlaylistPage() {
|
|
var root = document.querySelector("[data-playlists-page]");
|
|
if (!root) {
|
|
return;
|
|
}
|
|
var endpointMap = {
|
|
sync: "/api/playlists/sync",
|
|
download: "/api/playlists/download",
|
|
"sync-download": "/api/playlists/sync-download",
|
|
"mark-wanted": "/api/playlists/mark-wanted",
|
|
"unmark-wanted": "/api/playlists/unmark-wanted"
|
|
};
|
|
var checkboxNodes = Array.prototype.slice.call(
|
|
root.querySelectorAll("[data-playlist-checkbox]")
|
|
);
|
|
var selectionCountNode = root.querySelector("[data-playlist-selection-count]");
|
|
function selectedPlaylistIds() {
|
|
return checkboxNodes
|
|
.filter(function (node) { return Boolean(node.checked); })
|
|
.map(function (node) { return Number(node.value); })
|
|
.filter(function (value) { return Number.isInteger(value) && value > 0; });
|
|
}
|
|
function updateSelectionCount() {
|
|
if (selectionCountNode) {
|
|
selectionCountNode.textContent = String(selectedPlaylistIds().length);
|
|
}
|
|
}
|
|
checkboxNodes.forEach(function (node) {
|
|
node.addEventListener("change", updateSelectionCount);
|
|
});
|
|
root.querySelectorAll("[data-playlist-action]").forEach(function (button) {
|
|
button.addEventListener("click", function () {
|
|
var action = button.getAttribute("data-playlist-action");
|
|
var endpoint = endpointMap[action || ""];
|
|
var playlistIds = selectedPlaylistIds();
|
|
if (!endpoint || !playlistIds.length) {
|
|
return;
|
|
}
|
|
postJson(endpoint, { playlist_ids: playlistIds })
|
|
.then(function (data) {
|
|
if (data.job && data.job.id) {
|
|
window.location.href = "/jobs/" + data.job.id;
|
|
return;
|
|
}
|
|
window.location.reload();
|
|
})
|
|
.catch(function (error) { showMessage(error.message || "request failed", true); });
|
|
});
|
|
});
|
|
updateSelectionCount();
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Run the targeted page-render tests and verify they pass**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_api.OperationsApiTests.test_dashboard_page_renders_task_center_table_and_inline_detail_shell -v`
|
|
|
|
Expected:
|
|
|
|
- `OK`
|
|
- `/dashboard` renders a single task-center table and inline detail placeholders
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_api.OperationsApiTests.test_playlists_page_renders_sync_selected_playlists_action -v`
|
|
|
|
Expected:
|
|
|
|
- `OK`
|
|
- `/playlists` renders the new `sync` bulk action and keeps the selection toolbar
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add musicdl/catalogsync/templates/ops/dashboard.html musicdl/catalogsync/templates/ops/playlists.html musicdl/catalogsync/templates/ops/jobs.html musicdl/catalogsync/templates/ops/job_detail.html musicdl/catalogsync/static/ops/app.js tests/catalogsync/test_ops_api.py
|
|
git commit -m "feat: build task center dashboard ui"
|
|
```
|
|
|
|
## Task 5: Add Structured Download Throughput Reporting
|
|
|
|
**Files:**
|
|
- Modify: `musicdl/catalogsync/downloader.py`
|
|
- Modify: `musicdl/catalogsync/ops/executors.py`
|
|
- Modify: `musicdl/catalogsync/ops/repository.py`
|
|
- Modify: `musicdl/catalogsync/ops/web.py`
|
|
- Modify: `tests/catalogsync/test_ops_repository.py`
|
|
- Modify: `tests/catalogsync/test_ops_api.py`
|
|
|
|
- [ ] **Step 1: Write the failing speed tests**
|
|
|
|
```python
|
|
def test_list_task_center_rows_uses_live_worker_speed_for_download_jobs(self):
|
|
running_job = self.repo.create_job(
|
|
job_type="download_only",
|
|
config_snapshot={},
|
|
status=self.JobStatus.RUNNING,
|
|
)
|
|
stage_id = self.repo.create_stage(
|
|
job_run_id=running_job,
|
|
stage_type="download",
|
|
seq_no=1,
|
|
status=self.StageStatus.RUNNING,
|
|
)
|
|
item_id = self.repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="song_download",
|
|
item_key="song:live-speed",
|
|
song_id=999,
|
|
status=self.ItemStatus.PENDING,
|
|
payload={"row": {"id": 999, "name": "Speed Song"}},
|
|
)
|
|
self.repo.claim_item(item_id=item_id, worker_name="download-1")
|
|
self.repo.update_worker_state(
|
|
"download-1",
|
|
status="running",
|
|
current_job_item_id=item_id,
|
|
current_song_id=999,
|
|
current_display_text="Speed Song",
|
|
downloaded_bytes=12 * 1024 * 1024,
|
|
total_bytes=24 * 1024 * 1024,
|
|
speed_bytes_per_sec=4 * 1024 * 1024,
|
|
progress_percent=50.0,
|
|
)
|
|
|
|
row = next(
|
|
row for row in self.repo.list_task_center_rows(limit=20)
|
|
if int(row["id"]) == running_job
|
|
)
|
|
|
|
self.assertEqual("4.0 MB/s", row["speed_text"])
|
|
self.assertIn("4.0 MB/s", row["primary_progress_text"])
|
|
```
|
|
|
|
```python
|
|
def test_api_dashboard_exposes_worker_speed_and_bytes_text(self):
|
|
from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
client, db_path, _ = self._build_client()
|
|
repo = OpsRepository(db_path)
|
|
|
|
job_id = repo.create_job(
|
|
job_type="download_only",
|
|
config_snapshot={},
|
|
status=JobStatus.RUNNING,
|
|
)
|
|
stage_id = repo.create_stage(
|
|
job_run_id=job_id,
|
|
stage_type="download",
|
|
seq_no=1,
|
|
status=StageStatus.RUNNING,
|
|
)
|
|
item_id = repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="song_download",
|
|
item_key="song:api-speed",
|
|
song_id=301,
|
|
status=ItemStatus.PENDING,
|
|
payload={"row": {"id": 301, "name": "API Speed"}},
|
|
)
|
|
repo.claim_item(item_id=item_id, worker_name="download-1")
|
|
repo.update_worker_state(
|
|
"download-1",
|
|
status="running",
|
|
current_job_item_id=item_id,
|
|
current_song_id=301,
|
|
current_display_text="API Speed",
|
|
downloaded_bytes=10 * 1024 * 1024,
|
|
total_bytes=20 * 1024 * 1024,
|
|
speed_bytes_per_sec=5 * 1024 * 1024,
|
|
progress_percent=50.0,
|
|
)
|
|
|
|
payload = client.get("/api/dashboard").json()
|
|
row = next(row for row in payload["task_rows"] if int(row["id"]) == job_id)
|
|
worker = next(worker for worker in payload["workers"] if worker["worker_name"] == "download-1")
|
|
|
|
self.assertEqual(5 * 1024 * 1024, worker["speed_bytes_per_sec"])
|
|
self.assertEqual("5.0 MB/s", worker["speed_text"])
|
|
self.assertEqual("10.0 / 20.0 MB", worker["bytes_text"])
|
|
self.assertIn("5.0 MB/s", row["primary_progress_text"])
|
|
```
|
|
|
|
- [ ] **Step 2: Run the targeted speed tests and verify they fail**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_repository.OpsRepositoryTaskCenterTests.test_list_task_center_rows_uses_live_worker_speed_for_download_jobs -v`
|
|
|
|
Expected:
|
|
|
|
- `FAIL`
|
|
- `primary_progress_text` still contains only song counts and does not append the live rate
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_api.OperationsApiTests.test_api_dashboard_exposes_worker_speed_and_bytes_text -v`
|
|
|
|
Expected:
|
|
|
|
- `FAIL`
|
|
- serialized workers do not yet include `speed_text` or `bytes_text`
|
|
|
|
- [ ] **Step 3: Monitor file growth during downloads and surface the metrics**
|
|
|
|
```python
|
|
# musicdl/catalogsync/downloader.py
|
|
import time
|
|
|
|
|
|
def _progress_percent(downloaded_bytes: int, total_bytes: int | None) -> float | None:
|
|
if not total_bytes:
|
|
return None
|
|
if total_bytes <= 0:
|
|
return None
|
|
return round((float(downloaded_bytes) * 100.0) / float(total_bytes), 2)
|
|
|
|
|
|
class CatalogDownloader:
|
|
def _monitor_save_path(
|
|
self,
|
|
*,
|
|
save_path: Path,
|
|
expected_bytes: int | None,
|
|
progress_callback,
|
|
stop_event: threading.Event,
|
|
) -> None:
|
|
previous_bytes = 0
|
|
previous_at = time.monotonic()
|
|
while not stop_event.wait(0.5):
|
|
current_bytes = save_path.stat().st_size if save_path.exists() else 0
|
|
current_at = time.monotonic()
|
|
delta_bytes = max(current_bytes - previous_bytes, 0)
|
|
delta_seconds = max(current_at - previous_at, 0.001)
|
|
if progress_callback is not None and current_bytes > 0:
|
|
progress_callback(
|
|
downloaded_bytes=current_bytes,
|
|
total_bytes=expected_bytes,
|
|
speed_bytes_per_sec=float(delta_bytes) / float(delta_seconds),
|
|
progress_percent=_progress_percent(current_bytes, expected_bytes),
|
|
)
|
|
previous_bytes = current_bytes
|
|
previous_at = current_at
|
|
```
|
|
|
|
```python
|
|
# musicdl/catalogsync/downloader.py
|
|
class CatalogDownloader:
|
|
def _download_one(
|
|
self,
|
|
row: dict,
|
|
default_root: Path,
|
|
download_sources: list[str] | None = None,
|
|
progress_callback=None,
|
|
) -> bool:
|
|
metadata = json.loads(row["metadata_json"]) if row.get("metadata_json") else {}
|
|
song_info = deserialize_song_info(metadata.get("snapshot"))
|
|
if song_info is None:
|
|
return False
|
|
song_info = self.resolve_song_info_for_download(
|
|
row=row,
|
|
song_info=song_info,
|
|
download_sources=download_sources,
|
|
)
|
|
download_platform = self._detect_download_platform(song_info, row["platform"])
|
|
client = self.get_client(download_platform)
|
|
target_root = self.ensure_space(
|
|
default_root,
|
|
getattr(song_info, "file_size_bytes", None) or row.get("file_size_bytes"),
|
|
)
|
|
is_default_root = target_root.resolve() == default_root
|
|
backend_id = self.repository.ensure_local_backend(
|
|
target_root,
|
|
name="default-local" if is_default_root else None,
|
|
is_default=is_default_root,
|
|
)
|
|
singers = self._normalize_singers(getattr(song_info, "singers", None)) or self._normalize_singers(
|
|
row.get("singers")
|
|
)
|
|
relative_dir = build_download_relative_dir(platform=download_platform, singers=singers)
|
|
target_dir = target_root / relative_dir
|
|
target_dir.mkdir(parents=True, exist_ok=True)
|
|
song_info.work_dir = str(target_dir)
|
|
if hasattr(song_info, "_save_path"):
|
|
song_info._save_path = None
|
|
save_path = Path(song_info.save_path)
|
|
expected_bytes = int(
|
|
getattr(song_info, "file_size_bytes", None) or row.get("file_size_bytes") or 0
|
|
) or None
|
|
monitor_stop = threading.Event()
|
|
monitor_thread = threading.Thread(
|
|
target=self._monitor_save_path,
|
|
kwargs={
|
|
"save_path": save_path,
|
|
"expected_bytes": expected_bytes,
|
|
"progress_callback": progress_callback,
|
|
"stop_event": monitor_stop,
|
|
},
|
|
daemon=True,
|
|
name=f"download-monitor-{row['id']}",
|
|
)
|
|
monitor_thread.start()
|
|
try:
|
|
downloaded = client.download(
|
|
[song_info],
|
|
num_threadings=1,
|
|
request_overrides=self._request_overrides((10, 60)),
|
|
auto_supplement_song=False,
|
|
)
|
|
except TypeError:
|
|
downloaded = client.download(
|
|
[song_info],
|
|
num_threadings=1,
|
|
auto_supplement_song=False,
|
|
)
|
|
finally:
|
|
monitor_stop.set()
|
|
monitor_thread.join(timeout=1.0)
|
|
if save_path.exists() and progress_callback is not None:
|
|
final_size = save_path.stat().st_size
|
|
progress_callback(
|
|
downloaded_bytes=final_size,
|
|
total_bytes=expected_bytes or final_size,
|
|
speed_bytes_per_sec=None,
|
|
progress_percent=_progress_percent(final_size, expected_bytes or final_size),
|
|
)
|
|
if not downloaded:
|
|
return False
|
|
saved_song = downloaded[0]
|
|
saved_path = Path(saved_song.save_path)
|
|
relative_path = saved_path.relative_to(target_root).as_posix()
|
|
actual_size = saved_path.stat().st_size if saved_path.exists() else row.get("file_size_bytes")
|
|
actual_ext = saved_path.suffix.lstrip(".") or row.get("ext")
|
|
self.repository.record_local_file(
|
|
song_id=int(row["id"]),
|
|
backend_id=backend_id,
|
|
relative_path=relative_path,
|
|
file_size_bytes=actual_size,
|
|
ext=actual_ext,
|
|
quality_label=self._detect_quality_label(song_info, actual_ext, fallback=row.get("quality_label")),
|
|
)
|
|
return True
|
|
|
|
def download_song_row(
|
|
self,
|
|
row,
|
|
library_root: str | Path,
|
|
download_sources: list[str] | None = None,
|
|
worker_callback=None,
|
|
) -> bool:
|
|
row_dict = dict(row)
|
|
default_root = Path(library_root).resolve()
|
|
self._current_library_root = default_root
|
|
self.repository.ensure_local_backend(default_root, name="default-local", is_default=True)
|
|
if worker_callback is not None:
|
|
display_name = str(row_dict.get("name") or row_dict.get("id") or "")
|
|
singers = str(row_dict.get("singers") or "").strip()
|
|
display_text = f"{display_name} / {singers}".strip(" /")
|
|
worker_callback(
|
|
current_song_id=int(row_dict["id"]) if row_dict.get("id") is not None else None,
|
|
current_playlist_id=row_dict.get("playlist_id"),
|
|
current_display_text=display_text,
|
|
)
|
|
return self._download_one(
|
|
row=row_dict,
|
|
default_root=default_root,
|
|
download_sources=download_sources,
|
|
progress_callback=worker_callback,
|
|
)
|
|
```
|
|
|
|
```python
|
|
# musicdl/catalogsync/ops/executors.py
|
|
class DownloadStageExecutor:
|
|
def process_item(self, item_id: int, worker_name: str, *, already_claimed: bool = False) -> None:
|
|
if not already_claimed:
|
|
self.ops_repo.claim_item(item_id=item_id, worker_name=worker_name)
|
|
try:
|
|
row = self.ops_repo.build_download_row(item_id=item_id)
|
|
succeeded = self.downloader.download_song_row(
|
|
row=row,
|
|
library_root=self.library_root,
|
|
download_sources=self.download_sources,
|
|
worker_callback=lambda **state: self.ops_repo.update_worker_state(
|
|
worker_name=worker_name,
|
|
current_job_item_id=item_id,
|
|
status="running",
|
|
**state,
|
|
),
|
|
)
|
|
if succeeded:
|
|
_ensure_transition_applied(
|
|
self.ops_repo.mark_item_succeeded(item_id=item_id),
|
|
item_id=item_id,
|
|
action="mark_item_succeeded",
|
|
)
|
|
else:
|
|
_ensure_transition_applied(
|
|
self.ops_repo.mark_item_failed(
|
|
item_id=item_id,
|
|
error_message="download returned no file",
|
|
),
|
|
item_id=item_id,
|
|
action="mark_item_failed",
|
|
)
|
|
```
|
|
|
|
```python
|
|
# musicdl/catalogsync/ops/repository.py
|
|
class OpsRepository:
|
|
def mark_item_succeeded(self, item_id: int, result_payload: dict[str, Any] | None = None) -> bool:
|
|
with self._connection() as conn:
|
|
row = conn.execute(
|
|
"SELECT job_stage_id, status, worker_id, payload_json FROM job_items WHERE id = ?",
|
|
(item_id,),
|
|
).fetchone()
|
|
if row is None:
|
|
return False
|
|
from_status = ItemStatus(str(row["status"]))
|
|
if from_status is not ItemStatus.RUNNING:
|
|
return False
|
|
payload = _json_loads(row["payload_json"])
|
|
if result_payload:
|
|
payload.update(result_payload)
|
|
cursor = conn.execute(
|
|
"""
|
|
UPDATE job_items
|
|
SET
|
|
status = ?,
|
|
payload_json = ?,
|
|
worker_id = NULL,
|
|
ended_at = CURRENT_TIMESTAMP,
|
|
last_error = NULL,
|
|
last_error_code = NULL
|
|
WHERE id = ? AND status = ?
|
|
""",
|
|
(
|
|
ItemStatus.SUCCEEDED.value,
|
|
_json_dumps(payload),
|
|
item_id,
|
|
from_status.value,
|
|
),
|
|
)
|
|
if cursor.rowcount != 1:
|
|
return False
|
|
if worker_id is not None:
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_workers
|
|
SET
|
|
status = 'idle',
|
|
current_job_item_id = NULL,
|
|
current_song_id = NULL,
|
|
current_playlist_id = NULL,
|
|
current_display_text = NULL,
|
|
downloaded_bytes = NULL,
|
|
total_bytes = NULL,
|
|
speed_bytes_per_sec = NULL,
|
|
progress_percent = NULL,
|
|
processed_count = processed_count + 1,
|
|
heartbeat_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""",
|
|
(int(worker_id),),
|
|
)
|
|
return True
|
|
```
|
|
|
|
```python
|
|
# musicdl/catalogsync/ops/web.py
|
|
def _format_bytes_text(downloaded_bytes: int | None, total_bytes: int | None) -> str:
|
|
if not downloaded_bytes and not total_bytes:
|
|
return "-"
|
|
downloaded_mb = float(downloaded_bytes or 0) / 1024 / 1024
|
|
total_mb = float(total_bytes or 0) / 1024 / 1024
|
|
if total_bytes:
|
|
return f"{downloaded_mb:.1f} / {total_mb:.1f} MB"
|
|
return f"{downloaded_mb:.1f} MB"
|
|
|
|
|
|
def _serialize_worker_row(row: dict[str, Any]) -> dict[str, Any]:
|
|
payload_text = str(row.get("current_display_text") or "").strip()
|
|
fallback_text = str(row.get("song_name") or row.get("playlist_name") or row.get("item_key") or "").strip()
|
|
speed_bytes_per_sec = row.get("speed_bytes_per_sec")
|
|
downloaded_bytes = row.get("downloaded_bytes")
|
|
total_bytes = row.get("total_bytes")
|
|
return {
|
|
"id": int(row["id"]),
|
|
"job_run_id": row["job_run_id"],
|
|
"job_stage_id": row["job_stage_id"],
|
|
"worker_name": row["worker_name"],
|
|
"status": row["status"],
|
|
"current_job_item_id": row["current_job_item_id"],
|
|
"current_song_id": row["current_song_id"],
|
|
"current_playlist_id": row["current_playlist_id"],
|
|
"display_text": payload_text or fallback_text,
|
|
"stage_type": row.get("stage_type"),
|
|
"item_type": row.get("item_type"),
|
|
"last_progress_text": row.get("last_progress_text"),
|
|
"downloaded_bytes": int(downloaded_bytes) if downloaded_bytes is not None else None,
|
|
"total_bytes": int(total_bytes) if total_bytes is not None else None,
|
|
"speed_bytes_per_sec": float(speed_bytes_per_sec) if speed_bytes_per_sec is not None else None,
|
|
"speed_text": _format_speed_text(speed_bytes_per_sec),
|
|
"bytes_text": _format_bytes_text(downloaded_bytes, total_bytes),
|
|
"progress_percent": float(row["progress_percent"]) if row.get("progress_percent") is not None else None,
|
|
"processed_count": int(row["processed_count"] or 0),
|
|
"error_count": int(row["error_count"] or 0),
|
|
"heartbeat_at": row.get("heartbeat_at"),
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Run the targeted speed tests and verify they pass**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_repository.OpsRepositoryTaskCenterTests.test_list_task_center_rows_uses_live_worker_speed_for_download_jobs -v`
|
|
|
|
Expected:
|
|
|
|
- `OK`
|
|
- download task summaries append the live transfer rate when a real worker speed exists
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_ops_api.OperationsApiTests.test_api_dashboard_exposes_worker_speed_and_bytes_text -v`
|
|
|
|
Expected:
|
|
|
|
- `OK`
|
|
- worker rows include `downloaded_bytes`, `total_bytes`, `speed_bytes_per_sec`, `speed_text`, and `bytes_text`
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add musicdl/catalogsync/downloader.py musicdl/catalogsync/ops/executors.py musicdl/catalogsync/ops/repository.py musicdl/catalogsync/ops/web.py tests/catalogsync/test_ops_repository.py tests/catalogsync/test_ops_api.py
|
|
git commit -m "feat: add download throughput reporting"
|
|
```
|
|
|
|
## Task 6: Update Documentation And Run Regression Coverage
|
|
|
|
**Files:**
|
|
- Modify: `docs/catalogsync.md`
|
|
|
|
- [ ] **Step 1: Update the operator documentation**
|
|
|
|
```markdown
|
|
## Task Center
|
|
|
|
- `Dashboard` is now the main operator page.
|
|
- Every job appears in one task table with inline expansion.
|
|
- Download-class jobs are serialized into one `download` lane:
|
|
- `catalog_sync`
|
|
- `sync_download`
|
|
- `download_only`
|
|
- `download_upload`
|
|
- `collect_only` and `sync_only` run in the `general` lane and are not blocked by the active download job.
|
|
|
|
## Playlist Bulk Actions
|
|
|
|
- `/playlists` supports:
|
|
- `Sync Selected Playlists`
|
|
- `Download Selected Playlists`
|
|
- `Sync Then Download`
|
|
- `Mark Wanted`
|
|
- `Unmark Wanted`
|
|
- When a playlist shows `0 songs, sync recommended`, use `Sync Selected Playlists` first to refresh `playlist_songs` and `song_count`.
|
|
|
|
## Download Speed
|
|
|
|
- Running download workers publish structured progress:
|
|
- `downloaded_bytes`
|
|
- `total_bytes`
|
|
- `speed_bytes_per_sec`
|
|
- `progress_percent`
|
|
- Task rows aggregate active worker speed and display it as `MB/s`.
|
|
- If a provider does not expose live file growth, the UI shows `-` instead of guessing a rate.
|
|
```
|
|
|
|
- [ ] **Step 2: Run focused regression tests**
|
|
|
|
Run: `python -m unittest tests.catalogsync.test_db tests.catalogsync.test_ops_repository tests.catalogsync.test_ops_runner tests.catalogsync.test_ops_api -v`
|
|
|
|
Expected:
|
|
|
|
- `OK`
|
|
- schema upgrades, repository summaries, scheduler lanes, API routes, and HTML rendering all pass together
|
|
|
|
- [ ] **Step 3: Run the full `catalogsync` test suite**
|
|
|
|
Run: `python -m unittest discover -s tests/catalogsync -v`
|
|
|
|
Expected:
|
|
|
|
- `OK`
|
|
- no existing `catalogsync` behavior regresses after the task-center refactor
|
|
|
|
- [ ] **Step 4: Commit**
|
|
|
|
```bash
|
|
git add docs/catalogsync.md
|
|
git commit -m "docs: describe task center operations flow"
|
|
```
|