import json
import tempfile
import io
import time
import unittest
import warnings
import zipfile
from pathlib import Path
from unittest.mock import patch
from fastapi.testclient import TestClient
class OperationsApiTests(unittest.TestCase):
def _build_client(self) -> tuple[TestClient, Path, Path]:
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.web import create_app
tmpdir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True)
self.addCleanup(tmpdir.cleanup)
root = Path(tmpdir.name)
db_path = root / "catalogsync.db"
env_path = root / "catalogsync.env"
env_path.write_text("ROOT_DIR=/music\nDOWNLOAD_SOURCES=qq\n", encoding="utf-8")
initialize_database(db_path).close()
app = create_app(db_path=db_path, env_path=env_path)
client = TestClient(app)
self.addCleanup(client.close)
return client, db_path, env_path
def test_create_app_start_runner_emits_no_deprecation_warning(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.web import create_app
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
root = Path(tmpdir)
db_path = root / "catalogsync.db"
env_path = root / "catalogsync.env"
env_path.write_text("ROOT_DIR=/music\nDOWNLOAD_SOURCES=qq\n", encoding="utf-8")
initialize_database(db_path).close()
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always", DeprecationWarning)
create_app(
db_path=db_path,
env_path=env_path,
start_runner=True,
runner_sleep_seconds=0.01,
)
deprecations = [warning for warning in caught if issubclass(warning.category, DeprecationWarning)]
self.assertEqual([], deprecations)
def test_create_app_initializes_resolver_stats_side_db(self):
from musicdl.catalogsync.ops.web import create_app
from musicdl.catalogsync.resolver_stats import default_resolver_stats_db_path
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
root = Path(tmpdir)
db_path = root / "catalogsync.db"
env_path = root / "catalogsync.env"
env_path.write_text("ROOT_DIR=/music\nDOWNLOAD_SOURCES=qq\n", encoding="utf-8")
create_app(db_path=db_path, env_path=env_path)
resolver_stats_db_path = default_resolver_stats_db_path(db_path)
self.assertTrue(resolver_stats_db_path.exists())
def test_events_stream_interval_is_tuned_for_more_realtime_snapshots(self):
from musicdl.catalogsync.ops import web
self.assertLessEqual(web.LIVE_DASHBOARD_SNAPSHOT_INTERVAL_SECONDS, 0.5)
def _seed_playlist(
self,
db_path: Path,
*,
platform: str,
pool_kind: str,
remote_id: str,
name: str,
play_count: int | None = None,
collected_song_count: int | None = None,
) -> int:
from musicdl.catalogsync.models import PlaylistCandidate
from musicdl.catalogsync.repository import CatalogRepository
repo = CatalogRepository(db_path)
playlist_id = repo.upsert_playlist(
PlaylistCandidate(
platform=platform,
pool_kind=pool_kind,
remote_id=remote_id,
name=name,
url=f"https://example.invalid/{platform}/{remote_id}",
play_count=play_count,
collected_song_count=collected_song_count,
)
)
pool_id = repo.upsert_playlist_pool(
platform=platform,
pool_kind=pool_kind,
external_id=f"{pool_kind}:{remote_id}",
name=f"{pool_kind}-{platform}",
url=f"https://example.invalid/pool/{pool_kind}/{remote_id}",
)
repo.link_pool_playlist(pool_id, playlist_id)
return playlist_id
def _seed_song(
self,
db_path: Path,
*,
platform: str,
remote_id: str,
name: str,
singers: str = "Singer A",
metadata: dict | None = None,
) -> int:
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
repo = CatalogRepository(db_path)
return repo.upsert_song(
CatalogSong(
platform=platform,
remote_song_id=remote_id,
name=name,
singers=singers,
ext="mp3",
file_size_bytes=128,
quality_label="standard",
metadata=metadata or {},
)
)
def _link_playlist_song(self, db_path: Path, *, playlist_id: int, song_id: int, position: int) -> None:
from musicdl.catalogsync.repository import CatalogRepository
repo = CatalogRepository(db_path)
repo.link_playlist_song(playlist_id, song_id, position)
def _mark_local_downloaded(self, db_path: Path, *, song_id: int, relative_path: str) -> None:
from musicdl.catalogsync.repository import CatalogRepository
repo = CatalogRepository(db_path)
backend_id = repo.ensure_local_backend(
Path(db_path).parent / "library",
name="default-local",
is_default=True,
)
repo.record_local_file(
song_id=song_id,
backend_id=backend_id,
relative_path=relative_path,
file_size_bytes=128,
ext="mp3",
quality_label="standard",
)
def _mark_remote_uploaded(
self,
db_path: Path,
*,
song_id: int,
relative_path: str,
public_url: str,
download_url: str | None = None,
) -> None:
from musicdl.catalogsync.repository import CatalogRepository
repo = CatalogRepository(db_path)
local_backend_id = repo.ensure_local_backend(
Path(db_path).parent / "library",
name="default-local",
is_default=True,
)
asset_id = repo.record_local_file(
song_id=song_id,
backend_id=local_backend_id,
relative_path=relative_path,
file_size_bytes=128,
ext="mp3",
quality_label="standard",
)
remote_backend_id = repo.upsert_object_storage_backend(
name="catalog-cloud",
container_name="music-bucket",
endpoint="https://s3.example.invalid",
region=None,
base_prefix="catalogsync",
credential_env_prefix="CATALOGSYNC_TEST",
public_base_url="https://cdn.example.invalid",
)
repo.record_remote_file(
file_asset_id=asset_id,
backend_id=remote_backend_id,
container_name="music-bucket",
locator=relative_path,
public_url=public_url,
download_url=download_url,
)
def test_pages_and_jobs_endpoint_return_200(self):
client, _, _ = self._build_client()
dashboard_response = client.get("/dashboard")
self.assertEqual(200, dashboard_response.status_code)
self.assertIn("data-sse-url", dashboard_response.text)
jobs_page_response = client.get("/jobs")
self.assertEqual(200, jobs_page_response.status_code)
self.assertNotIn("data-sse-url", jobs_page_response.text)
for path in (
"/playlists",
"/songs",
"/logs",
"/config",
):
response = client.get(path)
self.assertEqual(200, response.status_code)
jobs = client.get("/api/jobs")
self.assertEqual(200, jobs.status_code)
payload = jobs.json()
self.assertIn("items", payload)
self.assertIsInstance(payload["items"], list)
invalid_limit = client.get("/api/jobs?limit=0")
self.assertEqual(422, invalid_limit.status_code)
def test_create_job_get_job_and_create_command(self):
client, _, _ = self._build_client()
create_response = client.post(
"/api/jobs",
json={
"job_type": "catalog_sync",
"requested_by": "unittest",
"sources": ["qq", "netease"],
},
)
self.assertEqual(201, create_response.status_code)
created = create_response.json()
self.assertIn("job", created)
job_id = int(created["job"]["id"])
get_response = client.get(f"/api/jobs/{job_id}")
self.assertEqual(200, get_response.status_code)
fetched = get_response.json()
self.assertEqual(job_id, int(fetched["job"]["id"]))
self.assertEqual("catalog_sync", fetched["job"]["job_type"])
self.assertEqual(["qq", "netease"], fetched["job"]["sources"])
command_response = client.post(
f"/api/jobs/{job_id}/commands",
json={"command_type": "pause", "payload": {"reason": "unit-test"}},
)
self.assertEqual(201, command_response.status_code)
command_payload = command_response.json()
self.assertEqual(job_id, int(command_payload["job_id"]))
self.assertIn("command_id", command_payload)
invalid_command_response = client.post(
f"/api/jobs/{job_id}/commands",
json={"command_type": "invalid-command"},
)
self.assertEqual(422, invalid_command_response.status_code)
retry_without_target_response = client.post(
f"/api/jobs/{job_id}/commands",
json={"command_type": "retry_item"},
)
self.assertEqual(422, retry_without_target_response.status_code)
dedupe_response = client.post(
"/api/jobs",
json={
"job_type": "collect_only",
"requested_by": "unittest",
"sources": ["qq", "qq", "netease"],
"download_sources": ["qq", "qq", "netease"],
},
)
self.assertEqual(201, dedupe_response.status_code)
deduped_job = dedupe_response.json()["job"]
self.assertEqual(["qq", "netease"], deduped_job["sources"])
self.assertEqual(["qq", "netease"], deduped_job["download_sources"])
def test_job_detail_page_exposes_job_command_entry(self):
from musicdl.catalogsync.ops.repository import OpsRepository
client, db_path, _ = self._build_client()
repo = OpsRepository(db_path)
job_id = repo.create_job(job_type="catalog_sync", config_snapshot={})
response = client.get(f"/jobs/{job_id}")
self.assertEqual(200, response.status_code)
self.assertIn(f"/api/jobs/{job_id}/commands", response.text)
self.assertIn('name="command_type"', response.text)
self.assertIn('value="cancel"', response.text)
self.assertIn('value="pause"', response.text)
def test_job_detail_and_dashboard_snapshot_expose_worker_and_download_stats(self):
from musicdl.catalogsync.ops.models import StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
client, db_path, _ = self._build_client()
repo = OpsRepository(db_path)
job_id = repo.create_job(
job_type="catalog_sync",
config_snapshot={},
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
item_id = repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:101",
song_id=101,
payload={"row": {"id": 101, "platform": "qq", "name": "Song 101"}},
)
repo.claim_item(item_id=item_id, worker_name="download-1")
repo.update_worker_state(
"download-1",
current_song_id=101,
current_display_text="Song 101",
last_progress_text="downloading",
)
detail_response = client.get(f"/api/jobs/{job_id}")
self.assertEqual(200, detail_response.status_code)
detail_payload = detail_response.json()
self.assertIn("workers", detail_payload)
self.assertIn("running_items", detail_payload)
self.assertIn("download_stats", detail_payload)
stream_response = client.get("/api/events/stream?once=true")
self.assertEqual(200, stream_response.status_code)
data_line = next(
line for line in stream_response.text.splitlines() if line.startswith("data: ")
)
snapshot_payload = json.loads(data_line.removeprefix("data: "))
self.assertIn("workers", snapshot_payload)
self.assertIn("running_items", snapshot_payload)
self.assertIn("download_stats", snapshot_payload)
self.assertIn("playlist_sources", snapshot_payload)
self.assertIn("active_job", snapshot_payload)
def test_dashboard_exposes_resolver_and_downloader_workers_during_download_stage(self):
from musicdl.catalogsync.ops.models import JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
client, db_path, _ = self._build_client()
repo = OpsRepository(db_path)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={},
status=JobStatus.RUNNING,
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
item_a = repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:201",
song_id=201,
payload={"row": {"id": 201, "platform": "qq", "name": "Song A / Singer A"}},
)
item_b = repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:202",
song_id=202,
payload={"row": {"id": 202, "platform": "qq", "name": "Song B / Singer B"}},
)
repo.claim_item(item_id=item_a, worker_name="resolve-1")
repo.update_worker_state(
worker_name="resolve-1",
current_job_item_id=item_a,
status="running",
current_song_id=201,
current_display_text="Song A / Singer A",
last_progress_text="resolving source qq (1/6)",
)
repo.claim_item(item_id=item_b, worker_name="resolve-2")
repo.update_worker_state(
worker_name="download-1",
current_job_item_id=item_b,
status="running",
current_song_id=202,
current_display_text="Song B / Singer B",
last_progress_text="12.00MB/48.00MB",
downloaded_bytes=12 * 1024 * 1024,
total_bytes=48 * 1024 * 1024,
speed_bytes_per_sec=3 * 1024 * 1024,
progress_percent=25,
)
response = client.get("/api/dashboard?include_task_rows=false")
self.assertEqual(200, response.status_code)
payload = response.json()
worker_names = [worker["worker_name"] for worker in payload["workers"]]
self.assertIn("resolve-1", worker_names)
self.assertIn("download-1", worker_names)
self.assertEqual(3 * 1024 * 1024, payload["transfer_stats"]["download_speed_bytes_per_sec"])
def test_dashboard_page_renders_task_tree_shell_without_detail_tables(self):
from musicdl.catalogsync.ops.models import JobStatus
from musicdl.catalogsync.ops.repository import OpsRepository
client, db_path, _ = self._build_client()
repo = OpsRepository(db_path)
doing_job_id = repo.create_job(
job_type="download_only",
config_snapshot={},
status=JobStatus.RUNNING,
)
done_job_id = repo.create_job(
job_type="sync_only",
config_snapshot={},
status=JobStatus.COMPLETED,
)
response = client.get("/dashboard")
self.assertEqual(200, response.status_code)
html = response.text
self.assertIn('data-task-tree-root="doing"', html)
self.assertIn('data-task-tree-root="done"', html)
self.assertIn(f'data-task-node="{doing_job_id}"', html)
self.assertIn(f'data-task-toggle="{doing_job_id}"', html)
self.assertIn(f'data-task-node="{done_job_id}"', html)
self.assertIn(f'data-task-toggle="{done_job_id}"', html)
self.assertIn("data-task-command-toggle", html)
self.assertIn("data-task-command-cancel", html)
self.assertIn("data-task-meta-inline", html)
self.assertNotIn("data-task-subtitle", html)
self.assertNotIn("data-playlist-toggle", html)
self.assertNotIn("
Summary
", html)
self.assertNotIn("Stages
", html)
self.assertNotIn("Workers
", html)
self.assertNotIn("Running Items
", html)
def test_dashboard_page_orders_done_tree_by_latest_finished_at(self):
from musicdl.catalogsync.ops.models import JobStatus
from musicdl.catalogsync.ops.repository import OpsRepository
client, db_path, _ = self._build_client()
repo = OpsRepository(db_path)
older_job_id = repo.create_job(
job_type="sync_only",
config_snapshot={},
status=JobStatus.COMPLETED,
)
newer_job_id = repo.create_job(
job_type="download_only",
config_snapshot={},
status=JobStatus.COMPLETED,
)
with repo._connection() as conn:
conn.execute(
"UPDATE job_runs SET ended_at = ? WHERE id = ?",
("2026-04-18 09:00:00", older_job_id),
)
conn.execute(
"UPDATE job_runs SET ended_at = ? WHERE id = ?",
("2026-04-18 12:00:00", newer_job_id),
)
response = client.get("/dashboard")
self.assertEqual(200, response.status_code)
html = response.text
newer_index = html.index(f'data-task-node="{newer_job_id}"')
older_index = html.index(f'data-task-node="{older_job_id}"')
self.assertLess(newer_index, older_index)
def test_dashboard_page_limits_done_tree_to_recent_ten(self):
from musicdl.catalogsync.ops.models import JobStatus
from musicdl.catalogsync.ops.repository import OpsRepository
client, db_path, _ = self._build_client()
repo = OpsRepository(db_path)
created_ids: list[int] = []
for index in range(1, 13):
job_id = repo.create_job(
job_type="sync_only",
config_snapshot={},
status=JobStatus.COMPLETED,
)
created_ids.append(job_id)
with repo._connection() as conn:
conn.execute(
"UPDATE job_runs SET ended_at = ? WHERE id = ?",
(f"2026-04-18 {index:02d}:00:00", job_id),
)
response = client.get("/dashboard")
self.assertEqual(200, response.status_code)
html = response.text
self.assertEqual(10, html.count('data-task-node="'))
self.assertNotIn(f'data-task-node="{created_ids[0]}"', html)
self.assertNotIn(f'data-task-node="{created_ids[1]}"', html)
self.assertIn(f'data-task-node="{created_ids[-1]}"', html)
def test_dashboard_page_renders_local_duplicate_maintenance_controls(self):
client, _, _ = self._build_client()
response = client.get("/dashboard")
self.assertEqual(200, response.status_code)
html = response.text
self.assertIn('data-maintenance-panel="local-duplicates"', html)
self.assertIn('data-maintenance-action="scan"', html)
self.assertIn('data-maintenance-action="dedupe"', html)
self.assertIn("data-maintenance-status", html)
self.assertIn("data-maintenance-result", html)
def test_config_env_revision_list_and_apply_flow(self):
client, _, _ = self._build_client()
first_content = "ROOT_DIR=/music-a\nDOWNLOAD_SOURCES=qq,netease\n"
second_content = "ROOT_DIR=/music-b\nDOWNLOAD_SOURCES=kuwo\n"
put_first = client.put(
"/api/config/env",
json={"content": first_content, "note": "first"},
)
self.assertEqual(200, put_first.status_code)
first_revision_id = int(put_first.json()["revision"]["id"])
put_second = client.put(
"/api/config/env",
json={"content": second_content, "note": "second"},
)
self.assertEqual(200, put_second.status_code)
second_revision_id = int(put_second.json()["revision"]["id"])
self.assertNotEqual(first_revision_id, second_revision_id)
revisions_response = client.get("/api/config/revisions")
self.assertEqual(200, revisions_response.status_code)
revisions = revisions_response.json()["items"]
revision_ids = {int(item["id"]) for item in revisions}
self.assertIn(first_revision_id, revision_ids)
self.assertIn(second_revision_id, revision_ids)
apply_response = client.post(f"/api/config/revisions/{first_revision_id}/apply")
self.assertEqual(200, apply_response.status_code)
env_response = client.get("/api/config/env")
self.assertEqual(200, env_response.status_code)
env_payload = env_response.json()
self.assertEqual(first_content, env_payload["content"])
self.assertEqual("/music-a", env_payload["parsed"]["ROOT_DIR"])
apply_unknown = client.post("/api/config/revisions/999999/apply")
self.assertEqual(404, apply_unknown.status_code)
def test_events_stream_returns_sse_content_type(self):
client, _, _ = self._build_client()
response = client.get("/api/events/stream?once=true")
self.assertEqual(200, response.status_code)
self.assertIn("text/event-stream", response.headers.get("content-type", ""))
self.assertIn("event: snapshot", response.text)
self.assertIn("data:", response.text)
def test_events_stream_snapshot_omits_task_rows(self):
client, _, _ = self._build_client()
response = client.get("/api/events/stream?once=true")
self.assertEqual(200, response.status_code)
data_line = next(
line for line in response.text.splitlines() if line.startswith("data: ")
)
payload = json.loads(data_line.removeprefix("data: "))
self.assertNotIn("task_rows", payload)
def test_api_playlists_mark_wanted_and_filter_wanted_only(self):
client, db_path, _ = self._build_client()
playlist_id = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="wanted-api-1",
name="Wanted API Playlist",
)
mark_response = client.post(
"/api/playlists/mark-wanted",
json={"playlist_ids": [playlist_id], "marked_by": "api-test"},
)
self.assertEqual(200, mark_response.status_code)
wanted_response = client.get("/api/playlists?wanted_only=1")
self.assertEqual(200, wanted_response.status_code)
wanted_payload = wanted_response.json()
self.assertEqual(1, wanted_payload["total_count"])
self.assertEqual(playlist_id, wanted_payload["items"][0]["id"])
self.assertEqual(1, wanted_payload["items"][0]["is_wanted"])
self.assertEqual("api-test", wanted_payload["items"][0]["marked_by"])
def test_api_playlists_unmark_wanted(self):
client, db_path, _ = self._build_client()
playlist_id = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="wanted-api-2",
name="Wanted API Playlist 2",
)
mark_response = client.post(
"/api/playlists/mark-wanted",
json={"playlist_ids": [playlist_id], "marked_by": "api-test"},
)
self.assertEqual(200, mark_response.status_code)
unmark_response = client.post(
"/api/playlists/unmark-wanted",
json={"playlist_ids": [playlist_id]},
)
self.assertEqual(200, unmark_response.status_code)
wanted_response = client.get("/api/playlists?wanted_only=1")
self.assertEqual(200, wanted_response.status_code)
wanted_payload = wanted_response.json()
self.assertEqual(0, wanted_payload["total_count"])
self.assertEqual([], wanted_payload["items"])
def test_api_playlists_download_creates_download_only_job_with_playlist_scope(self):
client, db_path, _ = self._build_client()
playlist_a = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="download-api-a",
name="Download API A",
)
playlist_b = self._seed_playlist(
db_path,
platform="netease",
pool_kind="playlist_square",
remote_id="download-api-b",
name="Download API B",
)
song_a = self._seed_song(
db_path,
platform="qq",
remote_id="download-api-song-a",
name="Download API Song A",
)
song_b = self._seed_song(
db_path,
platform="netease",
remote_id="download-api-song-b",
name="Download API Song B",
)
self._link_playlist_song(db_path, playlist_id=playlist_a, song_id=song_a, position=1)
self._link_playlist_song(db_path, playlist_id=playlist_b, song_id=song_b, position=1)
response = client.post(
"/api/playlists/download",
json={
"playlist_ids": [playlist_a, str(playlist_b), playlist_a, -1, 0],
"requested_by": "api-test",
},
)
self.assertEqual(201, response.status_code)
payload = response.json()["job"]
self.assertEqual("download_only", payload["job_type"])
self.assertEqual([playlist_a, playlist_b], payload["playlist_scope"]["playlist_ids"])
def test_api_playlists_download_adaptive_routes_mixed_states_to_two_jobs(self):
client, db_path, _ = self._build_client()
ready_for_download_playlist_id = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="download-adaptive-ready",
name="Download Adaptive Ready",
)
unsynced_playlist_id = self._seed_playlist(
db_path,
platform="netease",
pool_kind="playlist_square",
remote_id="download-adaptive-unsynced",
name="Download Adaptive Unsynced",
)
ready_song_id = self._seed_song(
db_path,
platform="qq",
remote_id="download-adaptive-song-ready",
name="Download Adaptive Song Ready",
)
self._link_playlist_song(
db_path,
playlist_id=ready_for_download_playlist_id,
song_id=ready_song_id,
position=1,
)
response = client.post(
"/api/playlists/download",
json={
"playlist_ids": [ready_for_download_playlist_id, unsynced_playlist_id],
"requested_by": "api-test",
},
)
self.assertEqual(201, response.status_code)
payload = response.json()
self.assertEqual(
[ready_for_download_playlist_id],
payload["download_job"]["playlist_scope"]["playlist_ids"],
)
self.assertEqual(
[unsynced_playlist_id],
payload["sync_download_job"]["playlist_scope"]["playlist_ids"],
)
self.assertIsNone(payload.get("job"))
def test_api_playlists_download_returns_404_when_any_playlist_id_is_missing(self):
client, db_path, _ = self._build_client()
playlist_id = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="download-missing-known",
name="Download Missing Known",
)
song_id = self._seed_song(
db_path,
platform="qq",
remote_id="download-missing-song",
name="Download Missing Song",
)
self._link_playlist_song(db_path, playlist_id=playlist_id, song_id=song_id, position=1)
response = client.post(
"/api/playlists/download",
json={"playlist_ids": [playlist_id, 999999]},
)
self.assertEqual(404, response.status_code)
payload = response.json()
self.assertEqual([999999], payload["detail"]["missing_playlist_ids"])
def test_api_playlists_sync_download_creates_job_with_playlist_scope(self):
client, db_path, _ = self._build_client()
playlist_a = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="sync-download-api-a",
name="Sync Download API A",
)
playlist_b = self._seed_playlist(
db_path,
platform="qq",
pool_kind="playlist_square",
remote_id="sync-download-api-b",
name="Sync Download API B",
)
response = client.post(
"/api/playlists/sync-download",
json={"playlist_ids": [playlist_b, playlist_a]},
)
self.assertEqual(201, response.status_code)
payload = response.json()["job"]
self.assertEqual("sync_download", payload["job_type"])
self.assertEqual([playlist_b, playlist_a], payload["playlist_scope"]["playlist_ids"])
def test_api_playlists_sync_creates_sync_only_job_with_scope(self):
client, db_path, _ = self._build_client()
playlist_a = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="sync-api-a",
name="Sync API A",
)
playlist_b = self._seed_playlist(
db_path,
platform="netease",
pool_kind="playlist_square",
remote_id="sync-api-b",
name="Sync API B",
)
response = client.post(
"/api/playlists/sync",
json={"playlist_ids": [playlist_b, playlist_a]},
)
self.assertEqual(201, response.status_code)
payload = response.json()["job"]
self.assertEqual("sync_only", payload["job_type"])
self.assertEqual([playlist_b, playlist_a], payload["playlist_scope"]["playlist_ids"])
def test_api_playlists_export_routes_selected_playlists_by_state(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.web import create_app
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
root = Path(tmpdir)
db_path = root / "catalogsync.db"
env_path = root / "catalogsync.env"
env_path.write_text(
f"ROOT_DIR={root.as_posix()}\nDOWNLOAD_SOURCES=qq,kuwo\n",
encoding="utf-8",
)
initialize_database(db_path, default_library_root=root / "library").close()
downloaded_playlist_id = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="export-downloaded",
name="Export Downloaded Playlist",
)
downloaded_song_id = self._seed_song(
db_path,
platform="qq",
remote_id="export-downloaded-song-1",
name="Export Downloaded Song 1",
)
self._link_playlist_song(
db_path,
playlist_id=downloaded_playlist_id,
song_id=downloaded_song_id,
position=1,
)
self._mark_local_downloaded(
db_path,
song_id=downloaded_song_id,
relative_path="qq/Singer A/export-downloaded-song-1.mp3",
)
download_playlist_id = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="export-not-downloaded",
name="Export Not Downloaded Playlist",
)
download_song_id = self._seed_song(
db_path,
platform="qq",
remote_id="export-not-downloaded-song-1",
name="Export Not Downloaded Song 1",
)
self._link_playlist_song(
db_path,
playlist_id=download_playlist_id,
song_id=download_song_id,
position=1,
)
sync_download_playlist_id = self._seed_playlist(
db_path,
platform="netease",
pool_kind="playlist_square",
remote_id="export-unsynced",
name="Export Unsynced Playlist",
)
app = create_app(db_path=db_path, env_path=env_path)
client = TestClient(app)
self.addCleanup(client.close)
response = client.post(
"/api/playlists/export",
json={
"playlist_ids": [
sync_download_playlist_id,
downloaded_playlist_id,
download_playlist_id,
],
"requested_by": "api-test",
},
)
playlist_dir = root / "playlists" / f"Export Downloaded Playlist_{downloaded_playlist_id}"
playlist_dir_exists = playlist_dir.exists()
playlist_yaml_exists = (playlist_dir / "playlist.yaml").exists()
self.assertEqual(200, response.status_code)
payload = response.json()
self.assertEqual([downloaded_playlist_id], payload["exported_playlist_ids"])
self.assertEqual(1, payload["exported_count"])
self.assertEqual(
[download_playlist_id],
payload["download_job"]["playlist_scope"]["playlist_ids"],
)
self.assertEqual(
[sync_download_playlist_id],
payload["sync_download_job"]["playlist_scope"]["playlist_ids"],
)
self.assertTrue(playlist_dir_exists)
self.assertTrue(playlist_yaml_exists)
def test_api_playlist_export_zip_returns_zip_for_downloaded_playlist(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.web import create_app
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
root = Path(tmpdir)
db_path = root / "catalogsync.db"
env_path = root / "catalogsync.env"
env_path.write_text(
f"ROOT_DIR={root.as_posix()}\nDOWNLOAD_SOURCES=qq\n",
encoding="utf-8",
)
initialize_database(db_path, default_library_root=root / "library").close()
playlist_id = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="playlist-export-zip-ready",
name="Playlist Export Zip Ready",
)
song_id = self._seed_song(
db_path,
platform="qq",
remote_id="playlist-export-zip-song-1",
name="Playlist Export Zip Song 1",
)
self._link_playlist_song(db_path, playlist_id=playlist_id, song_id=song_id, position=1)
self._mark_local_downloaded(
db_path,
song_id=song_id,
relative_path="qq/Singer A/playlist-export-zip-song-1.mp3",
)
playlist_dir = root / "playlists" / f"Playlist Export Zip Ready_{playlist_id}"
covers_dir = playlist_dir / "covers"
covers_dir.mkdir(parents=True, exist_ok=True)
(covers_dir / "playlist-cover.jpg").write_bytes(b"fake-cover")
app = create_app(db_path=db_path, env_path=env_path)
client = TestClient(app)
self.addCleanup(client.close)
response = client.get(f"/api/playlists/{playlist_id}/export.zip")
self.assertEqual(200, response.status_code)
self.assertIn("application/zip", response.headers.get("content-type", ""))
self.assertIn(".zip", response.headers.get("content-disposition", ""))
with zipfile.ZipFile(io.BytesIO(response.content), "r") as archive:
members = set(archive.namelist())
self.assertTrue(any(name.endswith("/playlist.yaml") for name in members))
self.assertTrue(any(name.endswith("/.playlist_meta.json") for name in members))
self.assertTrue(any(name.endswith("/covers/playlist-cover.jpg") for name in members))
def test_api_playlist_export_zip_returns_409_payload_for_unsynced_playlist(self):
client, db_path, _ = self._build_client()
playlist_id = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="playlist-export-zip-unsynced",
name="Playlist Export Zip Unsynced",
)
response = client.get(f"/api/playlists/{playlist_id}/export.zip")
self.assertEqual(409, response.status_code)
payload = response.json()
self.assertEqual("unsynced", payload["state_code"])
self.assertEqual(playlist_id, payload["playlist_id"])
self.assertIn("message", payload)
def test_api_export_zip_returns_download_url_and_bundle_downloads_when_all_ready(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.web import create_app
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
root = Path(tmpdir)
db_path = root / "catalogsync.db"
env_path = root / "catalogsync.env"
env_path.write_text(
f"ROOT_DIR={root.as_posix()}\nDOWNLOAD_SOURCES=qq\n",
encoding="utf-8",
)
initialize_database(db_path, default_library_root=root / "library").close()
playlist_a = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="export-zip-all-ready-a",
name="Export Zip All Ready A",
)
playlist_b = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="export-zip-all-ready-b",
name="Export Zip All Ready B",
)
song_a = self._seed_song(
db_path,
platform="qq",
remote_id="export-zip-all-ready-song-a",
name="Export Zip All Ready Song A",
)
song_b = self._seed_song(
db_path,
platform="qq",
remote_id="export-zip-all-ready-song-b",
name="Export Zip All Ready Song B",
)
self._link_playlist_song(db_path, playlist_id=playlist_a, song_id=song_a, position=1)
self._link_playlist_song(db_path, playlist_id=playlist_b, song_id=song_b, position=1)
self._mark_local_downloaded(
db_path,
song_id=song_a,
relative_path="qq/Singer A/export-zip-all-ready-song-a.mp3",
)
self._mark_local_downloaded(
db_path,
song_id=song_b,
relative_path="qq/Singer A/export-zip-all-ready-song-b.mp3",
)
app = create_app(db_path=db_path, env_path=env_path)
client = TestClient(app)
self.addCleanup(client.close)
prepare_response = client.post(
"/api/playlists/export-zip",
json={"playlist_ids": [playlist_a, playlist_b]},
)
prepare_payload = prepare_response.json()
download_url = prepare_payload["download_url"]
download_response = client.get(download_url)
self.assertEqual(200, prepare_response.status_code)
self.assertEqual("ready", prepare_payload["status"])
self.assertEqual([playlist_a, playlist_b], prepare_payload["playlist_ids"])
self.assertTrue(str(download_url).startswith("/api/exports/bundles/"))
self.assertEqual(200, download_response.status_code)
self.assertIn("application/zip", download_response.headers.get("content-type", ""))
with zipfile.ZipFile(io.BytesIO(download_response.content), "r") as archive:
members = set(archive.namelist())
self.assertTrue(any(name.startswith("playlists/") for name in members))
self.assertTrue(any(name.endswith("/playlist.yaml") for name in members))
def test_api_export_zip_returns_queued_when_selection_contains_unsynced_playlist(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.web import create_app
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
root = Path(tmpdir)
db_path = root / "catalogsync.db"
env_path = root / "catalogsync.env"
env_path.write_text(
f"ROOT_DIR={root.as_posix()}\nDOWNLOAD_SOURCES=qq\n",
encoding="utf-8",
)
initialize_database(db_path, default_library_root=root / "library").close()
ready_playlist_id = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="export-zip-queued-ready",
name="Export Zip Queued Ready",
)
ready_song_id = self._seed_song(
db_path,
platform="qq",
remote_id="export-zip-queued-song-ready",
name="Export Zip Queued Song Ready",
)
self._link_playlist_song(db_path, playlist_id=ready_playlist_id, song_id=ready_song_id, position=1)
self._mark_local_downloaded(
db_path,
song_id=ready_song_id,
relative_path="qq/Singer A/export-zip-queued-song-ready.mp3",
)
unsynced_playlist_id = self._seed_playlist(
db_path,
platform="netease",
pool_kind="playlist_square",
remote_id="export-zip-queued-unsynced",
name="Export Zip Queued Unsynced",
)
app = create_app(db_path=db_path, env_path=env_path)
client = TestClient(app)
self.addCleanup(client.close)
response = client.post(
"/api/playlists/export-zip",
json={"playlist_ids": [ready_playlist_id, unsynced_playlist_id]},
)
self.assertEqual(200, response.status_code)
payload = response.json()
self.assertEqual("queued", payload["status"])
self.assertEqual([ready_playlist_id], payload["ready_playlist_ids"])
self.assertEqual([unsynced_playlist_id], payload["blocked_playlist_ids"])
self.assertIsNotNone(payload["sync_download_job"])
self.assertEqual(
[unsynced_playlist_id],
payload["sync_download_job"]["playlist_scope"]["playlist_ids"],
)
def test_api_export_zip_returns_404_when_any_playlist_id_is_missing(self):
client, db_path, _ = self._build_client()
playlist_id = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="export-zip-missing-known",
name="Export Zip Missing Known",
)
song_id = self._seed_song(
db_path,
platform="qq",
remote_id="export-zip-missing-song",
name="Export Zip Missing Song",
)
self._link_playlist_song(db_path, playlist_id=playlist_id, song_id=song_id, position=1)
self._mark_local_downloaded(
db_path,
song_id=song_id,
relative_path="qq/Singer A/export-zip-missing-song.mp3",
)
response = client.post(
"/api/playlists/export-zip",
json={"playlist_ids": [playlist_id, 999999]},
)
self.assertEqual(404, response.status_code)
payload = response.json()
self.assertEqual([999999], payload["detail"]["missing_playlist_ids"])
def test_api_export_bundle_download_returns_404_for_missing_bundle(self):
client, _, _ = self._build_client()
response = client.get("/api/exports/bundles/non-existent-bundle.zip")
self.assertEqual(404, response.status_code)
def test_api_dashboard_returns_task_center_rows_with_lane_fields(self):
from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
client, db_path, _ = self._build_client()
repo = OpsRepository(db_path)
running_job = repo.create_job(
job_type="download_only",
config_snapshot={},
status=JobStatus.RUNNING,
)
queued_job = repo.create_job(
job_type="catalog_sync",
config_snapshot={},
status=JobStatus.QUEUED,
)
stage_id = repo.create_stage(
job_run_id=running_job,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:done",
song_id=1,
status=ItemStatus.SUCCEEDED,
)
running_item = repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:running",
song_id=2,
status=ItemStatus.PENDING,
payload={"row": {"id": 2, "name": "Song 2"}},
)
repo.claim_item(item_id=running_item, worker_name="download-1")
repo.update_worker_state(
"download-1",
status="running",
current_job_item_id=running_item,
current_song_id=2,
current_display_text="Song 2",
downloaded_bytes=5 * 1024 * 1024,
total_bytes=10 * 1024 * 1024,
speed_bytes_per_sec=2 * 1024 * 1024,
progress_percent=50.0,
)
response = client.get("/api/dashboard")
self.assertEqual(200, response.status_code)
payload = response.json()
self.assertIn("task_rows", payload)
by_id = {int(row["id"]): row for row in payload["task_rows"]}
self.assertEqual("download", by_id[running_job]["lane_type"])
self.assertEqual("queued #1", by_id[queued_job]["queue_label"])
self.assertEqual("2.0 MB/s", by_id[running_job]["speed_text"])
self.assertIn("queued_download_jobs", payload["summary"])
self.assertEqual(1, payload["summary"]["queued_download_jobs"])
def test_dashboard_transfer_stats_exposes_download_speed_and_upload_placeholder(self):
from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
client, db_path, _ = self._build_client()
repo = OpsRepository(db_path)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={},
status=JobStatus.RUNNING,
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
item_id = repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:bandwidth",
song_id=9,
status=ItemStatus.PENDING,
payload={"row": {"id": 9, "name": "Bandwidth Song"}},
)
repo.claim_item(item_id=item_id, worker_name="download-bandwidth-1")
repo.update_worker_state(
"download-bandwidth-1",
status="running",
current_job_item_id=item_id,
current_song_id=9,
current_display_text="Bandwidth Song",
downloaded_bytes=4 * 1024 * 1024,
total_bytes=8 * 1024 * 1024,
speed_bytes_per_sec=2 * 1024 * 1024,
progress_percent=50.0,
)
api_response = client.get("/api/dashboard")
self.assertEqual(200, api_response.status_code)
payload = api_response.json()
self.assertEqual("2.0 MB/s", payload["transfer_stats"]["download_speed_text"])
self.assertEqual("-", payload["transfer_stats"]["upload_speed_text"])
page_response = client.get("/dashboard")
self.assertEqual(200, page_response.status_code)
html = page_response.text
self.assertIn("Task Center", html)
self.assertIn("Down 2.0 MB/s", html)
self.assertIn("Up -", html)
def test_dashboard_transfer_stats_reset_when_worker_claims_new_item(self):
from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
client, db_path, _ = self._build_client()
repo = OpsRepository(db_path)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={},
status=JobStatus.RUNNING,
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
first_item_id = repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:first-speed",
song_id=19,
status=ItemStatus.PENDING,
payload={"row": {"id": 19, "name": "First Speed Song"}},
)
repo.claim_item(item_id=first_item_id, worker_name="download-1")
repo.update_worker_state(
"download-1",
status="running",
current_job_item_id=first_item_id,
current_song_id=19,
current_display_text="First Speed Song",
downloaded_bytes=4 * 1024 * 1024,
total_bytes=8 * 1024 * 1024,
speed_bytes_per_sec=2 * 1024 * 1024,
progress_percent=50.0,
)
self.assertTrue(repo.mark_item_succeeded(first_item_id))
second_item_id = repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:second-speed",
song_id=20,
status=ItemStatus.PENDING,
payload={"row": {"id": 20, "name": "Second Speed Song"}},
)
repo.claim_item(item_id=second_item_id, worker_name="download-1")
api_response = client.get("/api/dashboard")
self.assertEqual(200, api_response.status_code)
payload = api_response.json()
self.assertEqual("0 B/s", payload["transfer_stats"]["download_speed_text"])
worker = next(worker for worker in payload["workers"] if worker["worker_name"] == "download-1")
self.assertEqual(second_item_id, int(worker["current_job_item_id"]))
self.assertEqual("0 B/s", worker["speed_text"])
def test_dashboard_transfer_stats_ignore_stale_historical_workers(self):
from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
client, db_path, _ = self._build_client()
repo = OpsRepository(db_path)
live_job_id = repo.create_job(
job_type="download_only",
config_snapshot={},
status=JobStatus.RUNNING,
)
live_stage_id = repo.create_stage(
job_run_id=live_job_id,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
live_item_id = repo.create_item(
job_stage_id=live_stage_id,
item_type="song_download",
item_key="song:live-bandwidth",
song_id=91,
status=ItemStatus.PENDING,
payload={"row": {"id": 91, "name": "Live Speed Song"}},
)
repo.claim_item(item_id=live_item_id, worker_name="download-live-1")
repo.update_worker_state(
"download-live-1",
status="running",
current_job_item_id=live_item_id,
current_song_id=91,
current_display_text="Live Speed Song",
downloaded_bytes=4 * 1024 * 1024,
total_bytes=8 * 1024 * 1024,
speed_bytes_per_sec=2 * 1024 * 1024,
progress_percent=50.0,
)
stale_job_id = repo.create_job(
job_type="download_only",
config_snapshot={},
status=JobStatus.COMPLETED,
)
stale_stage_id = repo.create_stage(
job_run_id=stale_job_id,
stage_type="download",
seq_no=1,
status=StageStatus.COMPLETED,
)
stale_item_id = repo.create_item(
job_stage_id=stale_stage_id,
item_type="song_download",
item_key="song:stale-bandwidth",
song_id=92,
status=ItemStatus.PENDING,
payload={"row": {"id": 92, "name": "Stale Speed Song"}},
)
repo.claim_item(item_id=stale_item_id, worker_name="download-stale-1")
repo.update_worker_state(
"download-stale-1",
status="running",
current_job_item_id=stale_item_id,
current_song_id=92,
current_display_text="Stale Speed Song",
downloaded_bytes=80 * 1024 * 1024,
total_bytes=80 * 1024 * 1024,
speed_bytes_per_sec=96 * 1024 * 1024,
progress_percent=100.0,
)
self.assertTrue(repo.mark_item_succeeded(stale_item_id))
with repo._connection() as conn:
conn.execute(
"""
UPDATE job_workers
SET
status = 'running',
current_job_item_id = ?,
current_song_id = ?,
current_display_text = ?,
downloaded_bytes = ?,
total_bytes = ?,
speed_bytes_per_sec = ?,
progress_percent = ?
WHERE worker_name = ?
""",
(
stale_item_id,
92,
"Stale Speed Song",
80 * 1024 * 1024,
80 * 1024 * 1024,
96 * 1024 * 1024,
100.0,
"download-stale-1",
),
)
api_response = client.get("/api/dashboard")
self.assertEqual(200, api_response.status_code)
payload = api_response.json()
self.assertEqual("2.0 MB/s", payload["transfer_stats"]["download_speed_text"])
self.assertEqual(
["download-live-1"],
[worker["worker_name"] for worker in payload["workers"]],
)
def test_api_dashboard_includes_paused_jobs_in_task_center_rows(self):
from musicdl.catalogsync.ops.models import JobStatus
from musicdl.catalogsync.ops.repository import OpsRepository
client, db_path, _ = self._build_client()
repo = OpsRepository(db_path)
paused_job = repo.create_job(
job_type="download_only",
config_snapshot={},
status=JobStatus.PAUSED,
playlist_scope={"playlist_ids": [77]},
)
response = client.get("/api/dashboard")
self.assertEqual(200, response.status_code)
payload = response.json()
self.assertIn("task_rows", payload)
by_id = {int(row["id"]): row for row in payload["task_rows"]}
self.assertIn(paused_job, by_id)
self.assertEqual("paused", by_id[paused_job]["status"])
def test_api_dashboard_includes_completed_jobs_in_task_center_rows(self):
from musicdl.catalogsync.ops.models import JobStatus
from musicdl.catalogsync.ops.repository import OpsRepository
client, db_path, _ = self._build_client()
repo = OpsRepository(db_path)
completed_job = repo.create_job(
job_type="download_only",
config_snapshot={},
status=JobStatus.COMPLETED,
playlist_scope={"playlist_ids": [88]},
)
response = client.get("/api/dashboard")
self.assertEqual(200, response.status_code)
payload = response.json()
self.assertIn("task_rows", payload)
by_id = {int(row["id"]): row for row in payload["task_rows"]}
self.assertIn(completed_job, by_id)
self.assertEqual("completed", by_id[completed_job]["status"])
def test_api_dashboard_can_omit_task_rows_for_lightweight_refresh(self):
from musicdl.catalogsync.ops.models import JobStatus
from musicdl.catalogsync.ops.repository import OpsRepository
client, db_path, _ = self._build_client()
repo = OpsRepository(db_path)
repo.create_job(
job_type="download_only",
config_snapshot={},
status=JobStatus.RUNNING,
)
response = client.get("/api/dashboard?include_task_rows=false")
self.assertEqual(200, response.status_code)
payload = response.json()
self.assertNotIn("task_rows", payload)
self.assertIn("summary", payload)
self.assertIn("workers", payload)
def test_api_local_duplicates_scan_returns_summary_and_groups(self):
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
client, db_path, _ = self._build_client()
repo = CatalogRepository(db_path)
song_id = repo.upsert_song(
CatalogSong(
platform="qq",
remote_song_id="api-dup-song",
name="API Duplicate Song",
singers="Singer API",
ext="flac",
file_size_bytes=5,
quality_label="lossless",
metadata={},
)
)
library_root = Path(db_path).parent / "library"
backend_id = repo.ensure_local_backend(library_root, name="default-local", is_default=True)
repo.record_local_file(
song_id=song_id,
backend_id=backend_id,
relative_path="Singer API/API Duplicate Song.flac",
file_size_bytes=5,
ext="flac",
quality_label="lossless",
)
repo.record_local_file(
song_id=song_id,
backend_id=backend_id,
relative_path="Singer API/API Duplicate Song (1).flac",
file_size_bytes=5,
ext="flac",
quality_label="lossless",
)
(library_root / "Singer API").mkdir(parents=True, exist_ok=True)
(library_root / "Singer API" / "API Duplicate Song.flac").write_bytes(b"12345")
(library_root / "Singer API" / "API Duplicate Song (1).flac").write_bytes(b"12345")
response = client.get("/api/maintenance/local-duplicates")
self.assertEqual(200, response.status_code)
payload = response.json()
self.assertEqual(1, payload["summary"]["duplicate_group_count"])
self.assertEqual(1, payload["summary"]["duplicate_location_count"])
self.assertEqual(1, len(payload["groups"]))
group = payload["groups"][0]
self.assertEqual(song_id, group["song_id"])
self.assertEqual("Singer API/API Duplicate Song.flac", group["keep"]["locator"])
self.assertEqual(
"Singer API/API Duplicate Song (1).flac",
group["duplicates"][0]["locator"],
)
def test_api_local_duplicates_dedupe_inactivates_duplicates_and_redirects_references(self):
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.ops.models import JobStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.repository import CatalogRepository
client, db_path, _ = self._build_client()
repo = CatalogRepository(db_path)
song_id = repo.upsert_song(
CatalogSong(
platform="qq",
remote_song_id="api-dup-song-dedupe",
name="API Dedupe Song",
singers="Singer API",
ext="flac",
file_size_bytes=6,
quality_label="lossless",
metadata={},
)
)
library_root = Path(db_path).parent / "library"
backend_id = repo.ensure_local_backend(library_root, name="default-local", is_default=True)
asset_id = repo.record_local_file(
song_id=song_id,
backend_id=backend_id,
relative_path="Singer API/API Dedupe Song.flac",
file_size_bytes=6,
ext="flac",
quality_label="lossless",
)
repo.record_local_file(
song_id=song_id,
backend_id=backend_id,
relative_path="Singer API/API Dedupe Song (1).flac",
file_size_bytes=6,
ext="flac",
quality_label="lossless",
)
(library_root / "Singer API").mkdir(parents=True, exist_ok=True)
(library_root / "Singer API" / "API Dedupe Song.flac").write_bytes(b"123456")
duplicate_path = library_root / "Singer API" / "API Dedupe Song (1).flac"
duplicate_path.write_bytes(b"123456")
canonical_location = repo._fetchone(
"SELECT id FROM file_locations WHERE locator = ?",
("Singer API/API Dedupe Song.flac",),
)
duplicate_location = repo._fetchone(
"SELECT id FROM file_locations WHERE locator = ?",
("Singer API/API Dedupe Song (1).flac",),
)
remote_backend_id = repo.upsert_object_storage_backend(
name="test-bucket",
container_name="music",
endpoint="https://s3.example.invalid",
region=None,
base_prefix="catalogsync",
credential_env_prefix="CATALOGSYNC_TEST",
public_base_url="https://cdn.example.invalid",
)
upload_task_id = repo.enqueue_upload_task(
file_asset_id=asset_id,
source_location_id=int(duplicate_location["id"]),
target_backend_id=remote_backend_id,
target_container_name="music",
target_locator="Singer API/API Dedupe Song.flac",
)
ops_repo = OpsRepository(db_path)
job_id = ops_repo.create_job(
job_type="upload_only",
config_snapshot={},
status=JobStatus.QUEUED,
)
stage_id = ops_repo.create_stage(job_run_id=job_id, stage_type="upload", seq_no=1)
item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="song_upload",
item_key="upload:api-dedupe-song",
song_id=song_id,
file_location_id=int(duplicate_location["id"]),
)
response = client.post("/api/maintenance/local-duplicates/dedupe")
self.assertEqual(200, response.status_code)
payload = response.json()
self.assertEqual(0, payload["summary"]["duplicate_group_count"])
self.assertEqual(1, payload["execution"]["inactive_location_count"])
self.assertEqual(1, payload["execution"]["deleted_file_count"])
self.assertEqual(1, payload["execution"]["repointed_upload_task_count"])
self.assertEqual(1, payload["execution"]["repointed_job_item_count"])
duplicate_row = repo._fetchone(
"SELECT status FROM file_locations WHERE id = ?",
(int(duplicate_location["id"]),),
)
self.assertEqual("inactive", duplicate_row["status"])
upload_row = repo._fetchone(
"SELECT source_location_id FROM upload_tasks WHERE id = ?",
(upload_task_id,),
)
self.assertEqual(int(canonical_location["id"]), int(upload_row["source_location_id"]))
job_item = ops_repo._fetchone(
"SELECT file_location_id FROM job_items WHERE id = ?",
(item_id,),
)
self.assertEqual(int(canonical_location["id"]), int(job_item["file_location_id"]))
self.assertFalse(duplicate_path.exists())
def test_api_local_duplicates_dedupe_rejects_while_jobs_are_running(self):
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.ops.models import JobStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.repository import CatalogRepository
client, db_path, _ = self._build_client()
repo = CatalogRepository(db_path)
song_id = repo.upsert_song(
CatalogSong(
platform="qq",
remote_song_id="api-dup-song-blocked",
name="API Blocked Song",
singers="Singer API",
ext="flac",
file_size_bytes=4,
quality_label="lossless",
metadata={},
)
)
library_root = Path(db_path).parent / "library"
backend_id = repo.ensure_local_backend(library_root, name="default-local", is_default=True)
repo.record_local_file(
song_id=song_id,
backend_id=backend_id,
relative_path="Singer API/API Blocked Song.flac",
file_size_bytes=4,
ext="flac",
quality_label="lossless",
)
repo.record_local_file(
song_id=song_id,
backend_id=backend_id,
relative_path="Singer API/API Blocked Song (1).flac",
file_size_bytes=4,
ext="flac",
quality_label="lossless",
)
(library_root / "Singer API").mkdir(parents=True, exist_ok=True)
(library_root / "Singer API" / "API Blocked Song.flac").write_bytes(b"1234")
(library_root / "Singer API" / "API Blocked Song (1).flac").write_bytes(b"1234")
ops_repo = OpsRepository(db_path)
ops_repo.create_job(
job_type="download_only",
config_snapshot={},
status=JobStatus.RUNNING,
)
response = client.post("/api/maintenance/local-duplicates/dedupe")
self.assertEqual(409, response.status_code)
self.assertIn("running", response.json()["detail"])
def test_api_playlists_supports_pagination_and_filter_validation(self):
client, db_path, _ = self._build_client()
self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="page-1",
name="Page 1",
play_count=123456,
)
self._seed_playlist(
db_path,
platform="qq",
pool_kind="playlist_square",
remote_id="page-2",
name="Page 2",
)
self._seed_playlist(
db_path,
platform="netease",
pool_kind="manual_file",
remote_id="page-3",
name="Page 3",
)
page_response = client.get("/api/playlists?page=1&page_size=20")
self.assertEqual(200, page_response.status_code)
page_payload = page_response.json()
self.assertIn("items", page_payload)
self.assertIn("total_count", page_payload)
self.assertIn("page_size", page_payload)
self.assertEqual(20, page_payload["page_size"])
self.assertEqual(3, page_payload["total_count"])
self.assertEqual(3, len(page_payload["items"]))
by_name = {item["name"]: item for item in page_payload["items"]}
self.assertEqual(123456, by_name["Page 1"]["play_count"])
empty_status_response = client.get("/api/playlists?status=&page=1&page_size=20")
self.assertEqual(200, empty_status_response.status_code)
empty_status_payload = empty_status_response.json()
self.assertEqual(page_payload["total_count"], empty_status_payload["total_count"])
self.assertEqual(page_payload["page_size"], empty_status_payload["page_size"])
self.assertEqual(
[item["id"] for item in page_payload["items"]],
[item["id"] for item in empty_status_payload["items"]],
)
empty_wanted_response = client.get("/api/playlists?wanted_only=&page=1&page_size=20")
self.assertEqual(200, empty_wanted_response.status_code)
empty_wanted_payload = empty_wanted_response.json()
self.assertEqual(page_payload["total_count"], empty_wanted_payload["total_count"])
self.assertEqual(page_payload["page_size"], empty_wanted_payload["page_size"])
self.assertEqual(
[item["id"] for item in page_payload["items"]],
[item["id"] for item in empty_wanted_payload["items"]],
)
playlists_page_with_empty_status = client.get("/playlists?status=")
self.assertEqual(200, playlists_page_with_empty_status.status_code)
self.assertIn("Page 1", playlists_page_with_empty_status.text)
playlists_page_with_empty_wanted = client.get("/playlists?wanted_only=")
self.assertEqual(200, playlists_page_with_empty_wanted.status_code)
self.assertIn("Page 1", playlists_page_with_empty_wanted.text)
invalid_status = client.get("/api/playlists?status=bad-status")
self.assertEqual(422, invalid_status.status_code)
invalid_wanted_only = client.get("/api/playlists?wanted_only=maybe")
self.assertEqual(422, invalid_wanted_only.status_code)
invalid_page_size = client.get("/api/playlists?page_size=30")
self.assertEqual(422, invalid_page_size.status_code)
def test_api_playlists_supports_sorting_and_exposes_collected_song_count(self):
client, db_path, _ = self._build_client()
self._seed_playlist(
db_path,
platform="qq",
pool_kind="playlist_square",
remote_id="sort-api-001",
name="Zulu API Playlist",
play_count=100,
)
self._seed_playlist(
db_path,
platform="netease",
pool_kind="playlist_square",
remote_id="sort-api-002",
name="Alpha API Playlist",
play_count=300,
)
self._seed_playlist(
db_path,
platform="kuwo",
pool_kind="playlist_square",
remote_id="sort-api-003",
name="Collected API Playlist",
collected_song_count=42,
)
response = client.get("/api/playlists?page=1&page_size=20&sort_by=name&sort_dir=asc")
self.assertEqual(200, response.status_code)
payload = response.json()
self.assertEqual(
["Alpha API Playlist", "Collected API Playlist", "Zulu API Playlist"],
[item["name"] for item in payload["items"]],
)
collected_row = next(item for item in payload["items"] if item["name"] == "Collected API Playlist")
self.assertEqual(42, collected_row["collected_song_count"])
self.assertEqual(42, collected_row["display_song_count"])
self.assertTrue(collected_row["is_song_count_estimated"])
play_count_response = client.get("/api/playlists?page=1&page_size=20&sort_by=play_count&sort_dir=desc")
self.assertEqual(200, play_count_response.status_code)
play_count_payload = play_count_response.json()
self.assertEqual(
["Alpha API Playlist", "Zulu API Playlist", "Collected API Playlist"],
[item["name"] for item in play_count_payload["items"]],
)
def test_playlists_page_renders_management_controls_and_filters(self):
client, db_path, _ = self._build_client()
playlist_id = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="page-render-1",
name="Playlist Render Seed",
play_count=7654321,
)
song_a = self._seed_song(
db_path,
platform="qq",
remote_id="page-render-song-1",
name="Playlist Render Song 1",
)
song_b = self._seed_song(
db_path,
platform="qq",
remote_id="page-render-song-2",
name="Playlist Render Song 2",
)
self._link_playlist_song(db_path, playlist_id=playlist_id, song_id=song_a, position=1)
self._link_playlist_song(db_path, playlist_id=playlist_id, song_id=song_b, position=2)
self._mark_local_downloaded(
db_path,
song_id=song_a,
relative_path="qq/Singer A/page-render-song-1.mp3",
)
response = client.get("/playlists")
self.assertEqual(200, response.status_code)
html = response.text
self.assertIn('name="status"', html)
self.assertIn('name="page_size"', html)
self.assertIn('name="wanted_only"', html)
self.assertIn("data-playlist-select-all", html)
self.assertIn("data-playlist-clear-selection", html)
self.assertIn('data-playlist-action="download"', html)
self.assertNotIn('data-playlist-action="sync-download"', html)
self.assertIn('data-playlist-action="export-selected"', html)
self.assertIn("Export Selected", html)
self.assertIn('data-playlist-action="mark-wanted"', html)
self.assertIn('data-playlist-action="unmark-wanted"', html)
self.assertIn('value="collect_only"', html)
self.assertIn("Collect Playlist Sources", html)
self.assertIn("data-playlist-pagination", html)
self.assertIn("data-playlists-page", html)
self.assertIn("data-playlist-checkbox", html)
self.assertIn("sort_by=platform", html)
self.assertIn("sort_by=play_count", html)
self.assertIn("Progress | ", html)
self.assertIn("Status | ", html)
self.assertIn("Downloaded | ", html)
self.assertIn("1 / 2", html)
self.assertIn("50%", html)
self.assertIn("7654321", html)
self.assertIn("Playlist Render Seed", html)
self.assertNotIn("Idle", html)
def test_playlists_page_renders_sortable_headers_and_collected_song_count_hint(self):
client, db_path, _ = self._build_client()
self._seed_playlist(
db_path,
platform="qq",
pool_kind="playlist_square",
remote_id="playlist-render-collected",
name="Collected Render Playlist",
collected_song_count=42,
)
response = client.get("/playlists?sort_by=name&sort_dir=asc")
self.assertEqual(200, response.status_code)
html = response.text
self.assertIn("sort_by=id", html)
self.assertIn("sort_by=platform", html)
self.assertIn("sort_by=name", html)
self.assertIn("sort_by=play_count", html)
self.assertIn('data-playlist-sort-link="id"', html)
self.assertIn('data-playlist-sort-link="platform"', html)
self.assertIn('data-playlist-sort-link="name"', html)
self.assertIn('data-playlist-sort-link="play_count"', html)
self.assertIn('data-playlist-sort-indicator="name">^', html)
self.assertIn("Collected 42", html)
def test_playlists_page_applies_name_sort_order_to_rendered_rows(self):
client, db_path, _ = self._build_client()
self._seed_playlist(
db_path,
platform="qq",
pool_kind="playlist_square",
remote_id="playlist-sort-page-1",
name="Zulu Page Playlist",
play_count=100,
)
self._seed_playlist(
db_path,
platform="netease",
pool_kind="playlist_square",
remote_id="playlist-sort-page-2",
name="Alpha Page Playlist",
play_count=300,
)
self._seed_playlist(
db_path,
platform="kuwo",
pool_kind="playlist_square",
remote_id="playlist-sort-page-3",
name="Middle Page Playlist",
play_count=200,
)
response = client.get("/playlists?sort_by=name&sort_dir=asc")
self.assertEqual(200, response.status_code)
html = response.text
alpha_index = html.index("Alpha Page Playlist")
middle_index = html.index("Middle Page Playlist")
zulu_index = html.index("Zulu Page Playlist")
self.assertLess(alpha_index, middle_index)
self.assertLess(middle_index, zulu_index)
def test_playlists_page_renders_sync_selected_playlists_action(self):
client, db_path, _ = self._build_client()
self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="playlist-render-sync",
name="Playlist Render Sync",
)
response = client.get("/playlists")
self.assertEqual(200, response.status_code)
html = response.text
self.assertIn("data-playlists-page", html)
self.assertIn('data-playlist-action="sync"', html)
self.assertIn("data-playlist-select-all", html)
def test_playlists_page_renders_clickable_synced_playlist_name_and_song_modal_shell(self):
client, db_path, _ = self._build_client()
synced_playlist_id = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="playlist-preview-ready",
name="Playlist Preview Ready",
)
empty_playlist_id = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="playlist-preview-empty",
name="Playlist Preview Empty",
)
synced_song_id = self._seed_song(
db_path,
platform="qq",
remote_id="playlist-preview-song-1",
name="Playlist Preview Song 1",
)
self._link_playlist_song(
db_path,
playlist_id=synced_playlist_id,
song_id=synced_song_id,
position=1,
)
response = client.get("/playlists")
self.assertEqual(200, response.status_code)
html = response.text
self.assertIn(f'data-playlist-open-songs="{synced_playlist_id}"', html)
self.assertNotIn(f'data-playlist-open-songs="{empty_playlist_id}"', html)
self.assertIn("data-playlist-songs-modal", html)
self.assertIn("data-playlist-export", html)
def test_api_playlist_songs_exposes_export_ready_song_metadata_and_locations(self):
client, db_path, _ = self._build_client()
playlist_id = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="playlist-export-ready",
name="Playlist Export Ready",
play_count=987654,
)
song_id = self._seed_song(
db_path,
platform="qq",
remote_id="playlist-export-song-1",
name="Playlist Export Song 1",
singers="Singer Export",
)
self._link_playlist_song(db_path, playlist_id=playlist_id, song_id=song_id, position=1)
self._mark_remote_uploaded(
db_path,
song_id=song_id,
relative_path="qq/Singer Export/playlist-export-song-1.mp3",
public_url="https://cdn.example.invalid/qq/Singer%20Export/playlist-export-song-1.mp3",
download_url="https://download.example.invalid/qq/Singer%20Export/playlist-export-song-1.mp3",
)
response = client.get(f"/api/playlists/{playlist_id}/songs")
self.assertEqual(200, response.status_code)
payload = response.json()
self.assertIn("playlist", payload)
self.assertIn("items", payload)
self.assertEqual(playlist_id, payload["playlist"]["id"])
self.assertEqual("Playlist Export Ready", payload["playlist"]["name"])
self.assertEqual("qq", payload["playlist"]["platform"])
self.assertEqual(987654, payload["playlist"]["play_count"])
self.assertEqual(1, len(payload["items"]))
row = payload["items"][0]
self.assertEqual(song_id, row["song_id"])
self.assertEqual("Playlist Export Song 1", row["name"])
self.assertEqual("Singer Export", row["singers"])
self.assertEqual("mp3", row["ext"])
self.assertEqual(128, row["file_size_bytes"])
self.assertTrue(str(row["local_file_path"]).endswith("qq\\Singer Export\\playlist-export-song-1.mp3"))
self.assertEqual(1, len(row["uploaded_locations"]))
uploaded_row = row["uploaded_locations"][0]
self.assertEqual("catalog-cloud", uploaded_row["backend_name"])
self.assertEqual("object_storage", uploaded_row["backend_type"])
self.assertEqual(
"https://cdn.example.invalid/qq/Singer%20Export/playlist-export-song-1.mp3",
uploaded_row["url"],
)
def test_api_playlist_export_folder_returns_existing_playlist_directory(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.web import create_app
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
root = Path(tmpdir)
db_path = root / "catalogsync.db"
env_path = root / "catalogsync.env"
env_path.write_text(
f"ROOT_DIR={root.as_posix()}\nDOWNLOAD_SOURCES=qq\n",
encoding="utf-8",
)
initialize_database(db_path, default_library_root=root / "library").close()
playlist_id = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="playlist-export-folder",
name="Playlist Export Folder",
play_count=456789,
)
playlist_dir = root / "playlists" / f"Playlist Export Folder_{playlist_id}"
covers_dir = playlist_dir / "covers"
covers_dir.mkdir(parents=True, exist_ok=True)
(playlist_dir / "playlist.yaml").write_text(
"\n".join(
[
f"playlist_id: {playlist_id}",
"playlist_name: Playlist Export Folder",
"platform: qq",
"play_count: 456789",
"",
]
),
encoding="utf-8",
)
(playlist_dir / ".playlist_meta.json").write_text(
json.dumps(
{
"playlist_id": playlist_id,
"platform": "qq",
"remote_playlist_id": "playlist-export-folder",
"name": "Playlist Export Folder",
},
ensure_ascii=False,
),
encoding="utf-8",
)
(covers_dir / "playlist-cover.jpg").write_bytes(b"fake-cover")
app = create_app(db_path=db_path, env_path=env_path)
client = TestClient(app)
self.addCleanup(client.close)
response = client.get(f"/api/playlists/{playlist_id}/export-folder")
self.assertEqual(200, response.status_code)
payload = response.json()
self.assertEqual(playlist_id, payload["playlist"]["id"])
self.assertTrue(payload["folder"]["exists"])
self.assertTrue(str(payload["folder"]["path"]).endswith(f"Playlist Export Folder_{playlist_id}"))
relative_paths = {str(row.get("relative_path") or "") for row in payload["folder"]["files"]}
self.assertIn("playlist.yaml", relative_paths)
self.assertIn("covers/playlist-cover.jpg", relative_paths)
def test_api_playlist_export_folder_refreshes_yaml_from_latest_db_state(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.web import create_app
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
root = Path(tmpdir)
db_path = root / "catalogsync.db"
env_path = root / "catalogsync.env"
env_path.write_text(
f"ROOT_DIR={root.as_posix()}\nDOWNLOAD_SOURCES=qq\n",
encoding="utf-8",
)
initialize_database(db_path, default_library_root=root / "library").close()
playlist_id = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="playlist-export-refresh",
name="Playlist Export Refresh",
play_count=456789,
)
song_id = self._seed_song(
db_path,
platform="qq",
remote_id="playlist-export-refresh-song",
name="Playlist Export Refresh Song",
singers="Singer Refresh",
)
self._link_playlist_song(db_path, playlist_id=playlist_id, song_id=song_id, position=1)
self._mark_local_downloaded(
db_path,
song_id=song_id,
relative_path="qq/Singer Refresh/playlist-export-refresh-song.mp3",
)
playlist_dir = root / "playlists" / f"Playlist Export Refresh_{playlist_id}"
covers_dir = playlist_dir / "covers"
covers_dir.mkdir(parents=True, exist_ok=True)
stale_yaml_path = playlist_dir / "playlist.yaml"
stale_yaml_path.write_text(
"\n".join(
[
f"playlist_id: {playlist_id}",
"playlist_name: Playlist Export Refresh",
"platform: qq",
"songs:",
" - local_song_id: 1",
" local_file_path: null",
"",
]
),
encoding="utf-8",
)
(playlist_dir / ".playlist_meta.json").write_text(
json.dumps(
{
"playlist_id": playlist_id,
"platform": "qq",
"remote_playlist_id": "playlist-export-refresh",
"name": "Playlist Export Refresh",
},
ensure_ascii=False,
),
encoding="utf-8",
)
app = create_app(db_path=db_path, env_path=env_path)
client = TestClient(app)
self.addCleanup(client.close)
response = client.get(f"/api/playlists/{playlist_id}/export-folder")
refreshed_yaml = stale_yaml_path.read_text(encoding="utf-8")
self.assertEqual(200, response.status_code)
self.assertIn("playlist-export-refresh-song.mp3", refreshed_yaml)
self.assertNotIn("local_file_path: null", refreshed_yaml)
def test_job_detail_page_and_api_include_playlist_progress(self):
from musicdl.catalogsync.ops.models import ItemStatus
from musicdl.catalogsync.ops.repository import OpsRepository
client, db_path, _ = self._build_client()
playlist_id = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="job-detail-progress-1",
name="Job Detail Playlist",
)
song_a = self._seed_song(
db_path,
platform="qq",
remote_id="job-detail-song-1",
name="Job Detail Song 1",
)
song_b = self._seed_song(
db_path,
platform="qq",
remote_id="job-detail-song-2",
name="Job Detail Song 2",
)
self._link_playlist_song(db_path, playlist_id=playlist_id, song_id=song_a, position=1)
self._link_playlist_song(db_path, playlist_id=playlist_id, song_id=song_b, position=2)
self._mark_local_downloaded(
db_path,
song_id=song_a,
relative_path="qq/Singer A/job-detail-song-1.mp3",
)
repo = OpsRepository(db_path)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={},
playlist_scope={"playlist_ids": [playlist_id]},
)
stage_id = repo.create_stage(job_run_id=job_id, stage_type="download", seq_no=1)
repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key=f"song:{song_b}",
song_id=song_b,
status=ItemStatus.PENDING,
)
api_response = client.get(f"/api/jobs/{job_id}")
self.assertEqual(200, api_response.status_code)
api_payload = api_response.json()
self.assertIn("playlist_progress", api_payload)
self.assertEqual(1, len(api_payload["playlist_progress"]))
row = api_payload["playlist_progress"][0]
self.assertEqual(playlist_id, row["playlist_id"])
self.assertEqual("Job Detail Playlist", row["playlist_name"])
self.assertEqual(2, row["total_songs"])
self.assertEqual(1, row["downloaded_songs"])
self.assertEqual(1, row["pending_songs"])
self.assertEqual(50, row["progress_percent"])
page_response = client.get(f"/jobs/{job_id}")
self.assertEqual(200, page_response.status_code)
html = page_response.text
self.assertIn("Playlist Progress", html)
self.assertIn("Job Detail Playlist", html)
self.assertIn("1 / 2", html)
self.assertIn("50%", html)
def test_api_job_playlist_songs_exposes_non_music_resource_note(self):
from musicdl.catalogsync.ops.models import ItemStatus
from musicdl.catalogsync.ops.repository import OpsRepository
client, db_path, _ = self._build_client()
playlist_id = self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id="job-detail-non-music-1",
name="Job Detail Non Music Playlist",
)
song_id = self._seed_song(
db_path,
platform="qq",
remote_id="qqtop_75_non_music_seed",
name="Audio Program",
singers="Narrator",
metadata={
"snapshot": {
"raw_data": {
"search": {
"qq_toplist_fallback": True,
}
}
}
},
)
self._link_playlist_song(db_path, playlist_id=playlist_id, song_id=song_id, position=1)
repo = OpsRepository(db_path)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={},
playlist_scope={"playlist_ids": [playlist_id]},
)
stage_id = repo.create_stage(job_run_id=job_id, stage_type="download", seq_no=1)
repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key=f"song:{song_id}",
song_id=song_id,
status=ItemStatus.SKIPPED,
payload={"row": {"id": song_id}},
)
response = client.get(f"/api/jobs/{job_id}/playlists/{playlist_id}/songs")
self.assertEqual(200, response.status_code)
payload = response.json()
self.assertIn("items", payload)
self.assertEqual(1, len(payload["items"]))
row = payload["items"][0]
self.assertEqual(song_id, row["song_id"])
self.assertEqual("skipped", row["status"])
self.assertTrue(row["is_non_music_resource"])
self.assertIn("非音乐资源", row["status_note"])
def test_playlists_pagination_links_encode_keyword_query(self):
client, db_path, _ = self._build_client()
keyword = "A & B ="
for index in range(1, 42):
self._seed_playlist(
db_path,
platform="qq",
pool_kind="manual_file",
remote_id=f"page-encoded-{index}",
name=f"{keyword} Playlist {index}",
)
response = client.get(
"/playlists",
params={"page": 2, "page_size": 20, "keyword": keyword},
)
self.assertEqual(200, response.status_code)
html = response.text
self.assertIn("Previous", html)
self.assertIn("Next", html)
self.assertTrue(
"keyword=A+%26+B+%3D" in html or "keyword=A%20%26%20B%20%3D" in html
)
def test_api_playlist_bulk_endpoints_reject_empty_playlist_ids(self):
client, _, _ = self._build_client()
for path in (
"/api/playlists/mark-wanted",
"/api/playlists/unmark-wanted",
"/api/playlists/sync",
"/api/playlists/download",
"/api/playlists/sync-download",
):
response = client.post(path, json={"playlist_ids": []})
self.assertEqual(422, response.status_code, msg=path)
def test_embedded_runner_recovers_from_transient_loop_failure(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import JobStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.web import create_app
tmpdir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True)
self.addCleanup(tmpdir.cleanup)
root = Path(tmpdir.name)
db_path = root / "catalogsync.db"
env_path = root / "catalogsync.env"
env_path.write_text("ROOT_DIR=/music\nDOWNLOAD_SOURCES=qq\n", encoding="utf-8")
initialize_database(db_path).close()
repo = OpsRepository(db_path)
job_id = repo.create_job(job_type="collect_only", config_snapshot={})
attempts = {"count": 0}
def flaky_loop_once(self):
attempts["count"] += 1
if attempts["count"] == 1:
raise RuntimeError("transient embedded runner failure")
job = self.repository.claim_and_mark_next_runnable_job()
if job is None:
return False
self.repository.mark_job_finished(job.id, status=JobStatus.COMPLETED)
return True
app = create_app(
db_path=db_path,
env_path=env_path,
start_runner=True,
runner_sleep_seconds=0.01,
)
with patch("musicdl.catalogsync.ops.runner.OpsRunner.loop_once", new=flaky_loop_once):
with TestClient(app):
deadline = time.time() + 1.5
while time.time() < deadline:
if repo.get_job(job_id).status == JobStatus.COMPLETED:
break
time.sleep(0.05)
job = repo.get_job(job_id)
self.assertGreaterEqual(attempts["count"], 2)
self.assertEqual(JobStatus.COMPLETED, job.status)
if __name__ == "__main__":
unittest.main()