import json import tempfile import io import time import unittest import warnings import zipfile from pathlib import Path from unittest.mock import patch from fastapi.testclient import TestClient class OperationsApiTests(unittest.TestCase): def _build_client(self) -> tuple[TestClient, Path, Path]: from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.ops.web import create_app tmpdir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True) self.addCleanup(tmpdir.cleanup) root = Path(tmpdir.name) db_path = root / "catalogsync.db" env_path = root / "catalogsync.env" env_path.write_text("ROOT_DIR=/music\nDOWNLOAD_SOURCES=qq\n", encoding="utf-8") initialize_database(db_path).close() app = create_app(db_path=db_path, env_path=env_path) client = TestClient(app) self.addCleanup(client.close) return client, db_path, env_path def test_create_app_start_runner_emits_no_deprecation_warning(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.ops.web import create_app with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: root = Path(tmpdir) db_path = root / "catalogsync.db" env_path = root / "catalogsync.env" env_path.write_text("ROOT_DIR=/music\nDOWNLOAD_SOURCES=qq\n", encoding="utf-8") initialize_database(db_path).close() with warnings.catch_warnings(record=True) as caught: warnings.simplefilter("always", DeprecationWarning) create_app( db_path=db_path, env_path=env_path, start_runner=True, runner_sleep_seconds=0.01, ) deprecations = [warning for warning in caught if issubclass(warning.category, DeprecationWarning)] self.assertEqual([], deprecations) def test_create_app_initializes_resolver_stats_side_db(self): from musicdl.catalogsync.ops.web import create_app from musicdl.catalogsync.resolver_stats import default_resolver_stats_db_path with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: root = Path(tmpdir) db_path = root / "catalogsync.db" env_path = root / "catalogsync.env" env_path.write_text("ROOT_DIR=/music\nDOWNLOAD_SOURCES=qq\n", encoding="utf-8") create_app(db_path=db_path, env_path=env_path) resolver_stats_db_path = default_resolver_stats_db_path(db_path) self.assertTrue(resolver_stats_db_path.exists()) def test_events_stream_interval_is_tuned_for_more_realtime_snapshots(self): from musicdl.catalogsync.ops import web self.assertLessEqual(web.LIVE_DASHBOARD_SNAPSHOT_INTERVAL_SECONDS, 0.5) def _seed_playlist( self, db_path: Path, *, platform: str, pool_kind: str, remote_id: str, name: str, play_count: int | None = None, collected_song_count: int | None = None, ) -> int: from musicdl.catalogsync.models import PlaylistCandidate from musicdl.catalogsync.repository import CatalogRepository repo = CatalogRepository(db_path) playlist_id = repo.upsert_playlist( PlaylistCandidate( platform=platform, pool_kind=pool_kind, remote_id=remote_id, name=name, url=f"https://example.invalid/{platform}/{remote_id}", play_count=play_count, collected_song_count=collected_song_count, ) ) pool_id = repo.upsert_playlist_pool( platform=platform, pool_kind=pool_kind, external_id=f"{pool_kind}:{remote_id}", name=f"{pool_kind}-{platform}", url=f"https://example.invalid/pool/{pool_kind}/{remote_id}", ) repo.link_pool_playlist(pool_id, playlist_id) return playlist_id def _seed_song( self, db_path: Path, *, platform: str, remote_id: str, name: str, singers: str = "Singer A", metadata: dict | None = None, ) -> int: from musicdl.catalogsync.models import CatalogSong from musicdl.catalogsync.repository import CatalogRepository repo = CatalogRepository(db_path) return repo.upsert_song( CatalogSong( platform=platform, remote_song_id=remote_id, name=name, singers=singers, ext="mp3", file_size_bytes=128, quality_label="standard", metadata=metadata or {}, ) ) def _link_playlist_song(self, db_path: Path, *, playlist_id: int, song_id: int, position: int) -> None: from musicdl.catalogsync.repository import CatalogRepository repo = CatalogRepository(db_path) repo.link_playlist_song(playlist_id, song_id, position) def _mark_local_downloaded(self, db_path: Path, *, song_id: int, relative_path: str) -> None: from musicdl.catalogsync.repository import CatalogRepository repo = CatalogRepository(db_path) backend_id = repo.ensure_local_backend( Path(db_path).parent / "library", name="default-local", is_default=True, ) repo.record_local_file( song_id=song_id, backend_id=backend_id, relative_path=relative_path, file_size_bytes=128, ext="mp3", quality_label="standard", ) def _mark_remote_uploaded( self, db_path: Path, *, song_id: int, relative_path: str, public_url: str, download_url: str | None = None, ) -> None: from musicdl.catalogsync.repository import CatalogRepository repo = CatalogRepository(db_path) local_backend_id = repo.ensure_local_backend( Path(db_path).parent / "library", name="default-local", is_default=True, ) asset_id = repo.record_local_file( song_id=song_id, backend_id=local_backend_id, relative_path=relative_path, file_size_bytes=128, ext="mp3", quality_label="standard", ) remote_backend_id = repo.upsert_object_storage_backend( name="catalog-cloud", container_name="music-bucket", endpoint="https://s3.example.invalid", region=None, base_prefix="catalogsync", credential_env_prefix="CATALOGSYNC_TEST", public_base_url="https://cdn.example.invalid", ) repo.record_remote_file( file_asset_id=asset_id, backend_id=remote_backend_id, container_name="music-bucket", locator=relative_path, public_url=public_url, download_url=download_url, ) def test_pages_and_jobs_endpoint_return_200(self): client, _, _ = self._build_client() dashboard_response = client.get("/dashboard") self.assertEqual(200, dashboard_response.status_code) self.assertIn("data-sse-url", dashboard_response.text) jobs_page_response = client.get("/jobs") self.assertEqual(200, jobs_page_response.status_code) self.assertNotIn("data-sse-url", jobs_page_response.text) for path in ( "/playlists", "/songs", "/logs", "/config", ): response = client.get(path) self.assertEqual(200, response.status_code) jobs = client.get("/api/jobs") self.assertEqual(200, jobs.status_code) payload = jobs.json() self.assertIn("items", payload) self.assertIsInstance(payload["items"], list) invalid_limit = client.get("/api/jobs?limit=0") self.assertEqual(422, invalid_limit.status_code) def test_create_job_get_job_and_create_command(self): client, _, _ = self._build_client() create_response = client.post( "/api/jobs", json={ "job_type": "catalog_sync", "requested_by": "unittest", "sources": ["qq", "netease"], }, ) self.assertEqual(201, create_response.status_code) created = create_response.json() self.assertIn("job", created) job_id = int(created["job"]["id"]) get_response = client.get(f"/api/jobs/{job_id}") self.assertEqual(200, get_response.status_code) fetched = get_response.json() self.assertEqual(job_id, int(fetched["job"]["id"])) self.assertEqual("catalog_sync", fetched["job"]["job_type"]) self.assertEqual(["qq", "netease"], fetched["job"]["sources"]) command_response = client.post( f"/api/jobs/{job_id}/commands", json={"command_type": "pause", "payload": {"reason": "unit-test"}}, ) self.assertEqual(201, command_response.status_code) command_payload = command_response.json() self.assertEqual(job_id, int(command_payload["job_id"])) self.assertIn("command_id", command_payload) invalid_command_response = client.post( f"/api/jobs/{job_id}/commands", json={"command_type": "invalid-command"}, ) self.assertEqual(422, invalid_command_response.status_code) retry_without_target_response = client.post( f"/api/jobs/{job_id}/commands", json={"command_type": "retry_item"}, ) self.assertEqual(422, retry_without_target_response.status_code) dedupe_response = client.post( "/api/jobs", json={ "job_type": "collect_only", "requested_by": "unittest", "sources": ["qq", "qq", "netease"], "download_sources": ["qq", "qq", "netease"], }, ) self.assertEqual(201, dedupe_response.status_code) deduped_job = dedupe_response.json()["job"] self.assertEqual(["qq", "netease"], deduped_job["sources"]) self.assertEqual(["qq", "netease"], deduped_job["download_sources"]) def test_job_detail_page_exposes_job_command_entry(self): from musicdl.catalogsync.ops.repository import OpsRepository client, db_path, _ = self._build_client() repo = OpsRepository(db_path) job_id = repo.create_job(job_type="catalog_sync", config_snapshot={}) response = client.get(f"/jobs/{job_id}") self.assertEqual(200, response.status_code) self.assertIn(f"/api/jobs/{job_id}/commands", response.text) self.assertIn('name="command_type"', response.text) self.assertIn('value="cancel"', response.text) self.assertIn('value="pause"', response.text) def test_job_detail_and_dashboard_snapshot_expose_worker_and_download_stats(self): from musicdl.catalogsync.ops.models import StageStatus from musicdl.catalogsync.ops.repository import OpsRepository client, db_path, _ = self._build_client() repo = OpsRepository(db_path) job_id = repo.create_job( job_type="catalog_sync", config_snapshot={}, ) stage_id = repo.create_stage( job_run_id=job_id, stage_type="download", seq_no=1, status=StageStatus.RUNNING, ) item_id = repo.create_item( job_stage_id=stage_id, item_type="song_download", item_key="song:101", song_id=101, payload={"row": {"id": 101, "platform": "qq", "name": "Song 101"}}, ) repo.claim_item(item_id=item_id, worker_name="download-1") repo.update_worker_state( "download-1", current_song_id=101, current_display_text="Song 101", last_progress_text="downloading", ) detail_response = client.get(f"/api/jobs/{job_id}") self.assertEqual(200, detail_response.status_code) detail_payload = detail_response.json() self.assertIn("workers", detail_payload) self.assertIn("running_items", detail_payload) self.assertIn("download_stats", detail_payload) stream_response = client.get("/api/events/stream?once=true") self.assertEqual(200, stream_response.status_code) data_line = next( line for line in stream_response.text.splitlines() if line.startswith("data: ") ) snapshot_payload = json.loads(data_line.removeprefix("data: ")) self.assertIn("workers", snapshot_payload) self.assertIn("running_items", snapshot_payload) self.assertIn("download_stats", snapshot_payload) self.assertIn("playlist_sources", snapshot_payload) self.assertIn("active_job", snapshot_payload) def test_dashboard_exposes_resolver_and_downloader_workers_during_download_stage(self): from musicdl.catalogsync.ops.models import JobStatus, StageStatus from musicdl.catalogsync.ops.repository import OpsRepository client, db_path, _ = self._build_client() repo = OpsRepository(db_path) job_id = repo.create_job( job_type="download_only", config_snapshot={}, status=JobStatus.RUNNING, ) stage_id = repo.create_stage( job_run_id=job_id, stage_type="download", seq_no=1, status=StageStatus.RUNNING, ) item_a = repo.create_item( job_stage_id=stage_id, item_type="song_download", item_key="song:201", song_id=201, payload={"row": {"id": 201, "platform": "qq", "name": "Song A / Singer A"}}, ) item_b = repo.create_item( job_stage_id=stage_id, item_type="song_download", item_key="song:202", song_id=202, payload={"row": {"id": 202, "platform": "qq", "name": "Song B / Singer B"}}, ) repo.claim_item(item_id=item_a, worker_name="resolve-1") repo.update_worker_state( worker_name="resolve-1", current_job_item_id=item_a, status="running", current_song_id=201, current_display_text="Song A / Singer A", last_progress_text="resolving source qq (1/6)", ) repo.claim_item(item_id=item_b, worker_name="resolve-2") repo.update_worker_state( worker_name="download-1", current_job_item_id=item_b, status="running", current_song_id=202, current_display_text="Song B / Singer B", last_progress_text="12.00MB/48.00MB", downloaded_bytes=12 * 1024 * 1024, total_bytes=48 * 1024 * 1024, speed_bytes_per_sec=3 * 1024 * 1024, progress_percent=25, ) response = client.get("/api/dashboard?include_task_rows=false") self.assertEqual(200, response.status_code) payload = response.json() worker_names = [worker["worker_name"] for worker in payload["workers"]] self.assertIn("resolve-1", worker_names) self.assertIn("download-1", worker_names) self.assertEqual(3 * 1024 * 1024, payload["transfer_stats"]["download_speed_bytes_per_sec"]) def test_dashboard_page_renders_task_tree_shell_without_detail_tables(self): from musicdl.catalogsync.ops.models import JobStatus from musicdl.catalogsync.ops.repository import OpsRepository client, db_path, _ = self._build_client() repo = OpsRepository(db_path) doing_job_id = repo.create_job( job_type="download_only", config_snapshot={}, status=JobStatus.RUNNING, ) done_job_id = repo.create_job( job_type="sync_only", config_snapshot={}, status=JobStatus.COMPLETED, ) response = client.get("/dashboard") self.assertEqual(200, response.status_code) html = response.text self.assertIn('data-task-tree-root="doing"', html) self.assertIn('data-task-tree-root="done"', html) self.assertIn(f'data-task-node="{doing_job_id}"', html) self.assertIn(f'data-task-toggle="{doing_job_id}"', html) self.assertIn(f'data-task-node="{done_job_id}"', html) self.assertIn(f'data-task-toggle="{done_job_id}"', html) self.assertIn("data-task-command-toggle", html) self.assertIn("data-task-command-cancel", html) self.assertIn("data-task-meta-inline", html) self.assertNotIn("data-task-subtitle", html) self.assertNotIn("data-playlist-toggle", html) self.assertNotIn("

Summary

", html) self.assertNotIn("

Stages

", html) self.assertNotIn("

Workers

", html) self.assertNotIn("

Running Items

", html) def test_dashboard_page_orders_done_tree_by_latest_finished_at(self): from musicdl.catalogsync.ops.models import JobStatus from musicdl.catalogsync.ops.repository import OpsRepository client, db_path, _ = self._build_client() repo = OpsRepository(db_path) older_job_id = repo.create_job( job_type="sync_only", config_snapshot={}, status=JobStatus.COMPLETED, ) newer_job_id = repo.create_job( job_type="download_only", config_snapshot={}, status=JobStatus.COMPLETED, ) with repo._connection() as conn: conn.execute( "UPDATE job_runs SET ended_at = ? WHERE id = ?", ("2026-04-18 09:00:00", older_job_id), ) conn.execute( "UPDATE job_runs SET ended_at = ? WHERE id = ?", ("2026-04-18 12:00:00", newer_job_id), ) response = client.get("/dashboard") self.assertEqual(200, response.status_code) html = response.text newer_index = html.index(f'data-task-node="{newer_job_id}"') older_index = html.index(f'data-task-node="{older_job_id}"') self.assertLess(newer_index, older_index) def test_dashboard_page_limits_done_tree_to_recent_ten(self): from musicdl.catalogsync.ops.models import JobStatus from musicdl.catalogsync.ops.repository import OpsRepository client, db_path, _ = self._build_client() repo = OpsRepository(db_path) created_ids: list[int] = [] for index in range(1, 13): job_id = repo.create_job( job_type="sync_only", config_snapshot={}, status=JobStatus.COMPLETED, ) created_ids.append(job_id) with repo._connection() as conn: conn.execute( "UPDATE job_runs SET ended_at = ? WHERE id = ?", (f"2026-04-18 {index:02d}:00:00", job_id), ) response = client.get("/dashboard") self.assertEqual(200, response.status_code) html = response.text self.assertEqual(10, html.count('data-task-node="')) self.assertNotIn(f'data-task-node="{created_ids[0]}"', html) self.assertNotIn(f'data-task-node="{created_ids[1]}"', html) self.assertIn(f'data-task-node="{created_ids[-1]}"', html) def test_dashboard_page_renders_local_duplicate_maintenance_controls(self): client, _, _ = self._build_client() response = client.get("/dashboard") self.assertEqual(200, response.status_code) html = response.text self.assertIn('data-maintenance-panel="local-duplicates"', html) self.assertIn('data-maintenance-action="scan"', html) self.assertIn('data-maintenance-action="dedupe"', html) self.assertIn("data-maintenance-status", html) self.assertIn("data-maintenance-result", html) def test_config_env_revision_list_and_apply_flow(self): client, _, _ = self._build_client() first_content = "ROOT_DIR=/music-a\nDOWNLOAD_SOURCES=qq,netease\n" second_content = "ROOT_DIR=/music-b\nDOWNLOAD_SOURCES=kuwo\n" put_first = client.put( "/api/config/env", json={"content": first_content, "note": "first"}, ) self.assertEqual(200, put_first.status_code) first_revision_id = int(put_first.json()["revision"]["id"]) put_second = client.put( "/api/config/env", json={"content": second_content, "note": "second"}, ) self.assertEqual(200, put_second.status_code) second_revision_id = int(put_second.json()["revision"]["id"]) self.assertNotEqual(first_revision_id, second_revision_id) revisions_response = client.get("/api/config/revisions") self.assertEqual(200, revisions_response.status_code) revisions = revisions_response.json()["items"] revision_ids = {int(item["id"]) for item in revisions} self.assertIn(first_revision_id, revision_ids) self.assertIn(second_revision_id, revision_ids) apply_response = client.post(f"/api/config/revisions/{first_revision_id}/apply") self.assertEqual(200, apply_response.status_code) env_response = client.get("/api/config/env") self.assertEqual(200, env_response.status_code) env_payload = env_response.json() self.assertEqual(first_content, env_payload["content"]) self.assertEqual("/music-a", env_payload["parsed"]["ROOT_DIR"]) apply_unknown = client.post("/api/config/revisions/999999/apply") self.assertEqual(404, apply_unknown.status_code) def test_events_stream_returns_sse_content_type(self): client, _, _ = self._build_client() response = client.get("/api/events/stream?once=true") self.assertEqual(200, response.status_code) self.assertIn("text/event-stream", response.headers.get("content-type", "")) self.assertIn("event: snapshot", response.text) self.assertIn("data:", response.text) def test_events_stream_snapshot_omits_task_rows(self): client, _, _ = self._build_client() response = client.get("/api/events/stream?once=true") self.assertEqual(200, response.status_code) data_line = next( line for line in response.text.splitlines() if line.startswith("data: ") ) payload = json.loads(data_line.removeprefix("data: ")) self.assertNotIn("task_rows", payload) def test_api_playlists_mark_wanted_and_filter_wanted_only(self): client, db_path, _ = self._build_client() playlist_id = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="wanted-api-1", name="Wanted API Playlist", ) mark_response = client.post( "/api/playlists/mark-wanted", json={"playlist_ids": [playlist_id], "marked_by": "api-test"}, ) self.assertEqual(200, mark_response.status_code) wanted_response = client.get("/api/playlists?wanted_only=1") self.assertEqual(200, wanted_response.status_code) wanted_payload = wanted_response.json() self.assertEqual(1, wanted_payload["total_count"]) self.assertEqual(playlist_id, wanted_payload["items"][0]["id"]) self.assertEqual(1, wanted_payload["items"][0]["is_wanted"]) self.assertEqual("api-test", wanted_payload["items"][0]["marked_by"]) def test_api_playlists_unmark_wanted(self): client, db_path, _ = self._build_client() playlist_id = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="wanted-api-2", name="Wanted API Playlist 2", ) mark_response = client.post( "/api/playlists/mark-wanted", json={"playlist_ids": [playlist_id], "marked_by": "api-test"}, ) self.assertEqual(200, mark_response.status_code) unmark_response = client.post( "/api/playlists/unmark-wanted", json={"playlist_ids": [playlist_id]}, ) self.assertEqual(200, unmark_response.status_code) wanted_response = client.get("/api/playlists?wanted_only=1") self.assertEqual(200, wanted_response.status_code) wanted_payload = wanted_response.json() self.assertEqual(0, wanted_payload["total_count"]) self.assertEqual([], wanted_payload["items"]) def test_api_playlists_download_creates_download_only_job_with_playlist_scope(self): client, db_path, _ = self._build_client() playlist_a = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="download-api-a", name="Download API A", ) playlist_b = self._seed_playlist( db_path, platform="netease", pool_kind="playlist_square", remote_id="download-api-b", name="Download API B", ) song_a = self._seed_song( db_path, platform="qq", remote_id="download-api-song-a", name="Download API Song A", ) song_b = self._seed_song( db_path, platform="netease", remote_id="download-api-song-b", name="Download API Song B", ) self._link_playlist_song(db_path, playlist_id=playlist_a, song_id=song_a, position=1) self._link_playlist_song(db_path, playlist_id=playlist_b, song_id=song_b, position=1) response = client.post( "/api/playlists/download", json={ "playlist_ids": [playlist_a, str(playlist_b), playlist_a, -1, 0], "requested_by": "api-test", }, ) self.assertEqual(201, response.status_code) payload = response.json()["job"] self.assertEqual("download_only", payload["job_type"]) self.assertEqual([playlist_a, playlist_b], payload["playlist_scope"]["playlist_ids"]) def test_api_playlists_download_adaptive_routes_mixed_states_to_two_jobs(self): client, db_path, _ = self._build_client() ready_for_download_playlist_id = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="download-adaptive-ready", name="Download Adaptive Ready", ) unsynced_playlist_id = self._seed_playlist( db_path, platform="netease", pool_kind="playlist_square", remote_id="download-adaptive-unsynced", name="Download Adaptive Unsynced", ) ready_song_id = self._seed_song( db_path, platform="qq", remote_id="download-adaptive-song-ready", name="Download Adaptive Song Ready", ) self._link_playlist_song( db_path, playlist_id=ready_for_download_playlist_id, song_id=ready_song_id, position=1, ) response = client.post( "/api/playlists/download", json={ "playlist_ids": [ready_for_download_playlist_id, unsynced_playlist_id], "requested_by": "api-test", }, ) self.assertEqual(201, response.status_code) payload = response.json() self.assertEqual( [ready_for_download_playlist_id], payload["download_job"]["playlist_scope"]["playlist_ids"], ) self.assertEqual( [unsynced_playlist_id], payload["sync_download_job"]["playlist_scope"]["playlist_ids"], ) self.assertIsNone(payload.get("job")) def test_api_playlists_download_returns_404_when_any_playlist_id_is_missing(self): client, db_path, _ = self._build_client() playlist_id = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="download-missing-known", name="Download Missing Known", ) song_id = self._seed_song( db_path, platform="qq", remote_id="download-missing-song", name="Download Missing Song", ) self._link_playlist_song(db_path, playlist_id=playlist_id, song_id=song_id, position=1) response = client.post( "/api/playlists/download", json={"playlist_ids": [playlist_id, 999999]}, ) self.assertEqual(404, response.status_code) payload = response.json() self.assertEqual([999999], payload["detail"]["missing_playlist_ids"]) def test_api_playlists_sync_download_creates_job_with_playlist_scope(self): client, db_path, _ = self._build_client() playlist_a = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="sync-download-api-a", name="Sync Download API A", ) playlist_b = self._seed_playlist( db_path, platform="qq", pool_kind="playlist_square", remote_id="sync-download-api-b", name="Sync Download API B", ) response = client.post( "/api/playlists/sync-download", json={"playlist_ids": [playlist_b, playlist_a]}, ) self.assertEqual(201, response.status_code) payload = response.json()["job"] self.assertEqual("sync_download", payload["job_type"]) self.assertEqual([playlist_b, playlist_a], payload["playlist_scope"]["playlist_ids"]) def test_api_playlists_sync_creates_sync_only_job_with_scope(self): client, db_path, _ = self._build_client() playlist_a = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="sync-api-a", name="Sync API A", ) playlist_b = self._seed_playlist( db_path, platform="netease", pool_kind="playlist_square", remote_id="sync-api-b", name="Sync API B", ) response = client.post( "/api/playlists/sync", json={"playlist_ids": [playlist_b, playlist_a]}, ) self.assertEqual(201, response.status_code) payload = response.json()["job"] self.assertEqual("sync_only", payload["job_type"]) self.assertEqual([playlist_b, playlist_a], payload["playlist_scope"]["playlist_ids"]) def test_api_playlists_export_routes_selected_playlists_by_state(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.ops.web import create_app with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: root = Path(tmpdir) db_path = root / "catalogsync.db" env_path = root / "catalogsync.env" env_path.write_text( f"ROOT_DIR={root.as_posix()}\nDOWNLOAD_SOURCES=qq,kuwo\n", encoding="utf-8", ) initialize_database(db_path, default_library_root=root / "library").close() downloaded_playlist_id = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="export-downloaded", name="Export Downloaded Playlist", ) downloaded_song_id = self._seed_song( db_path, platform="qq", remote_id="export-downloaded-song-1", name="Export Downloaded Song 1", ) self._link_playlist_song( db_path, playlist_id=downloaded_playlist_id, song_id=downloaded_song_id, position=1, ) self._mark_local_downloaded( db_path, song_id=downloaded_song_id, relative_path="qq/Singer A/export-downloaded-song-1.mp3", ) download_playlist_id = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="export-not-downloaded", name="Export Not Downloaded Playlist", ) download_song_id = self._seed_song( db_path, platform="qq", remote_id="export-not-downloaded-song-1", name="Export Not Downloaded Song 1", ) self._link_playlist_song( db_path, playlist_id=download_playlist_id, song_id=download_song_id, position=1, ) sync_download_playlist_id = self._seed_playlist( db_path, platform="netease", pool_kind="playlist_square", remote_id="export-unsynced", name="Export Unsynced Playlist", ) app = create_app(db_path=db_path, env_path=env_path) client = TestClient(app) self.addCleanup(client.close) response = client.post( "/api/playlists/export", json={ "playlist_ids": [ sync_download_playlist_id, downloaded_playlist_id, download_playlist_id, ], "requested_by": "api-test", }, ) playlist_dir = root / "playlists" / f"Export Downloaded Playlist_{downloaded_playlist_id}" playlist_dir_exists = playlist_dir.exists() playlist_yaml_exists = (playlist_dir / "playlist.yaml").exists() self.assertEqual(200, response.status_code) payload = response.json() self.assertEqual([downloaded_playlist_id], payload["exported_playlist_ids"]) self.assertEqual(1, payload["exported_count"]) self.assertEqual( [download_playlist_id], payload["download_job"]["playlist_scope"]["playlist_ids"], ) self.assertEqual( [sync_download_playlist_id], payload["sync_download_job"]["playlist_scope"]["playlist_ids"], ) self.assertTrue(playlist_dir_exists) self.assertTrue(playlist_yaml_exists) def test_api_playlist_export_zip_returns_zip_for_downloaded_playlist(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.ops.web import create_app with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: root = Path(tmpdir) db_path = root / "catalogsync.db" env_path = root / "catalogsync.env" env_path.write_text( f"ROOT_DIR={root.as_posix()}\nDOWNLOAD_SOURCES=qq\n", encoding="utf-8", ) initialize_database(db_path, default_library_root=root / "library").close() playlist_id = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="playlist-export-zip-ready", name="Playlist Export Zip Ready", ) song_id = self._seed_song( db_path, platform="qq", remote_id="playlist-export-zip-song-1", name="Playlist Export Zip Song 1", ) self._link_playlist_song(db_path, playlist_id=playlist_id, song_id=song_id, position=1) self._mark_local_downloaded( db_path, song_id=song_id, relative_path="qq/Singer A/playlist-export-zip-song-1.mp3", ) playlist_dir = root / "playlists" / f"Playlist Export Zip Ready_{playlist_id}" covers_dir = playlist_dir / "covers" covers_dir.mkdir(parents=True, exist_ok=True) (covers_dir / "playlist-cover.jpg").write_bytes(b"fake-cover") app = create_app(db_path=db_path, env_path=env_path) client = TestClient(app) self.addCleanup(client.close) response = client.get(f"/api/playlists/{playlist_id}/export.zip") self.assertEqual(200, response.status_code) self.assertIn("application/zip", response.headers.get("content-type", "")) self.assertIn(".zip", response.headers.get("content-disposition", "")) with zipfile.ZipFile(io.BytesIO(response.content), "r") as archive: members = set(archive.namelist()) self.assertTrue(any(name.endswith("/playlist.yaml") for name in members)) self.assertTrue(any(name.endswith("/.playlist_meta.json") for name in members)) self.assertTrue(any(name.endswith("/covers/playlist-cover.jpg") for name in members)) def test_api_playlist_export_zip_returns_409_payload_for_unsynced_playlist(self): client, db_path, _ = self._build_client() playlist_id = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="playlist-export-zip-unsynced", name="Playlist Export Zip Unsynced", ) response = client.get(f"/api/playlists/{playlist_id}/export.zip") self.assertEqual(409, response.status_code) payload = response.json() self.assertEqual("unsynced", payload["state_code"]) self.assertEqual(playlist_id, payload["playlist_id"]) self.assertIn("message", payload) def test_api_export_zip_returns_download_url_and_bundle_downloads_when_all_ready(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.ops.web import create_app with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: root = Path(tmpdir) db_path = root / "catalogsync.db" env_path = root / "catalogsync.env" env_path.write_text( f"ROOT_DIR={root.as_posix()}\nDOWNLOAD_SOURCES=qq\n", encoding="utf-8", ) initialize_database(db_path, default_library_root=root / "library").close() playlist_a = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="export-zip-all-ready-a", name="Export Zip All Ready A", ) playlist_b = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="export-zip-all-ready-b", name="Export Zip All Ready B", ) song_a = self._seed_song( db_path, platform="qq", remote_id="export-zip-all-ready-song-a", name="Export Zip All Ready Song A", ) song_b = self._seed_song( db_path, platform="qq", remote_id="export-zip-all-ready-song-b", name="Export Zip All Ready Song B", ) self._link_playlist_song(db_path, playlist_id=playlist_a, song_id=song_a, position=1) self._link_playlist_song(db_path, playlist_id=playlist_b, song_id=song_b, position=1) self._mark_local_downloaded( db_path, song_id=song_a, relative_path="qq/Singer A/export-zip-all-ready-song-a.mp3", ) self._mark_local_downloaded( db_path, song_id=song_b, relative_path="qq/Singer A/export-zip-all-ready-song-b.mp3", ) app = create_app(db_path=db_path, env_path=env_path) client = TestClient(app) self.addCleanup(client.close) prepare_response = client.post( "/api/playlists/export-zip", json={"playlist_ids": [playlist_a, playlist_b]}, ) prepare_payload = prepare_response.json() download_url = prepare_payload["download_url"] download_response = client.get(download_url) self.assertEqual(200, prepare_response.status_code) self.assertEqual("ready", prepare_payload["status"]) self.assertEqual([playlist_a, playlist_b], prepare_payload["playlist_ids"]) self.assertTrue(str(download_url).startswith("/api/exports/bundles/")) self.assertEqual(200, download_response.status_code) self.assertIn("application/zip", download_response.headers.get("content-type", "")) with zipfile.ZipFile(io.BytesIO(download_response.content), "r") as archive: members = set(archive.namelist()) self.assertTrue(any(name.startswith("playlists/") for name in members)) self.assertTrue(any(name.endswith("/playlist.yaml") for name in members)) def test_api_export_zip_returns_queued_when_selection_contains_unsynced_playlist(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.ops.web import create_app with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: root = Path(tmpdir) db_path = root / "catalogsync.db" env_path = root / "catalogsync.env" env_path.write_text( f"ROOT_DIR={root.as_posix()}\nDOWNLOAD_SOURCES=qq\n", encoding="utf-8", ) initialize_database(db_path, default_library_root=root / "library").close() ready_playlist_id = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="export-zip-queued-ready", name="Export Zip Queued Ready", ) ready_song_id = self._seed_song( db_path, platform="qq", remote_id="export-zip-queued-song-ready", name="Export Zip Queued Song Ready", ) self._link_playlist_song(db_path, playlist_id=ready_playlist_id, song_id=ready_song_id, position=1) self._mark_local_downloaded( db_path, song_id=ready_song_id, relative_path="qq/Singer A/export-zip-queued-song-ready.mp3", ) unsynced_playlist_id = self._seed_playlist( db_path, platform="netease", pool_kind="playlist_square", remote_id="export-zip-queued-unsynced", name="Export Zip Queued Unsynced", ) app = create_app(db_path=db_path, env_path=env_path) client = TestClient(app) self.addCleanup(client.close) response = client.post( "/api/playlists/export-zip", json={"playlist_ids": [ready_playlist_id, unsynced_playlist_id]}, ) self.assertEqual(200, response.status_code) payload = response.json() self.assertEqual("queued", payload["status"]) self.assertEqual([ready_playlist_id], payload["ready_playlist_ids"]) self.assertEqual([unsynced_playlist_id], payload["blocked_playlist_ids"]) self.assertIsNotNone(payload["sync_download_job"]) self.assertEqual( [unsynced_playlist_id], payload["sync_download_job"]["playlist_scope"]["playlist_ids"], ) def test_api_export_zip_returns_404_when_any_playlist_id_is_missing(self): client, db_path, _ = self._build_client() playlist_id = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="export-zip-missing-known", name="Export Zip Missing Known", ) song_id = self._seed_song( db_path, platform="qq", remote_id="export-zip-missing-song", name="Export Zip Missing Song", ) self._link_playlist_song(db_path, playlist_id=playlist_id, song_id=song_id, position=1) self._mark_local_downloaded( db_path, song_id=song_id, relative_path="qq/Singer A/export-zip-missing-song.mp3", ) response = client.post( "/api/playlists/export-zip", json={"playlist_ids": [playlist_id, 999999]}, ) self.assertEqual(404, response.status_code) payload = response.json() self.assertEqual([999999], payload["detail"]["missing_playlist_ids"]) def test_api_export_bundle_download_returns_404_for_missing_bundle(self): client, _, _ = self._build_client() response = client.get("/api/exports/bundles/non-existent-bundle.zip") self.assertEqual(404, response.status_code) def test_api_dashboard_returns_task_center_rows_with_lane_fields(self): from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus from musicdl.catalogsync.ops.repository import OpsRepository client, db_path, _ = self._build_client() repo = OpsRepository(db_path) running_job = repo.create_job( job_type="download_only", config_snapshot={}, status=JobStatus.RUNNING, ) queued_job = repo.create_job( job_type="catalog_sync", config_snapshot={}, status=JobStatus.QUEUED, ) stage_id = repo.create_stage( job_run_id=running_job, stage_type="download", seq_no=1, status=StageStatus.RUNNING, ) repo.create_item( job_stage_id=stage_id, item_type="song_download", item_key="song:done", song_id=1, status=ItemStatus.SUCCEEDED, ) running_item = repo.create_item( job_stage_id=stage_id, item_type="song_download", item_key="song:running", song_id=2, status=ItemStatus.PENDING, payload={"row": {"id": 2, "name": "Song 2"}}, ) repo.claim_item(item_id=running_item, worker_name="download-1") repo.update_worker_state( "download-1", status="running", current_job_item_id=running_item, current_song_id=2, current_display_text="Song 2", downloaded_bytes=5 * 1024 * 1024, total_bytes=10 * 1024 * 1024, speed_bytes_per_sec=2 * 1024 * 1024, progress_percent=50.0, ) response = client.get("/api/dashboard") self.assertEqual(200, response.status_code) payload = response.json() self.assertIn("task_rows", payload) by_id = {int(row["id"]): row for row in payload["task_rows"]} self.assertEqual("download", by_id[running_job]["lane_type"]) self.assertEqual("queued #1", by_id[queued_job]["queue_label"]) self.assertEqual("2.0 MB/s", by_id[running_job]["speed_text"]) self.assertIn("queued_download_jobs", payload["summary"]) self.assertEqual(1, payload["summary"]["queued_download_jobs"]) def test_dashboard_transfer_stats_exposes_download_speed_and_upload_placeholder(self): from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus from musicdl.catalogsync.ops.repository import OpsRepository client, db_path, _ = self._build_client() repo = OpsRepository(db_path) job_id = repo.create_job( job_type="download_only", config_snapshot={}, status=JobStatus.RUNNING, ) stage_id = repo.create_stage( job_run_id=job_id, stage_type="download", seq_no=1, status=StageStatus.RUNNING, ) item_id = repo.create_item( job_stage_id=stage_id, item_type="song_download", item_key="song:bandwidth", song_id=9, status=ItemStatus.PENDING, payload={"row": {"id": 9, "name": "Bandwidth Song"}}, ) repo.claim_item(item_id=item_id, worker_name="download-bandwidth-1") repo.update_worker_state( "download-bandwidth-1", status="running", current_job_item_id=item_id, current_song_id=9, current_display_text="Bandwidth Song", downloaded_bytes=4 * 1024 * 1024, total_bytes=8 * 1024 * 1024, speed_bytes_per_sec=2 * 1024 * 1024, progress_percent=50.0, ) api_response = client.get("/api/dashboard") self.assertEqual(200, api_response.status_code) payload = api_response.json() self.assertEqual("2.0 MB/s", payload["transfer_stats"]["download_speed_text"]) self.assertEqual("-", payload["transfer_stats"]["upload_speed_text"]) page_response = client.get("/dashboard") self.assertEqual(200, page_response.status_code) html = page_response.text self.assertIn("Task Center", html) self.assertIn("Down 2.0 MB/s", html) self.assertIn("Up -", html) def test_dashboard_transfer_stats_reset_when_worker_claims_new_item(self): from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus from musicdl.catalogsync.ops.repository import OpsRepository client, db_path, _ = self._build_client() repo = OpsRepository(db_path) job_id = repo.create_job( job_type="download_only", config_snapshot={}, status=JobStatus.RUNNING, ) stage_id = repo.create_stage( job_run_id=job_id, stage_type="download", seq_no=1, status=StageStatus.RUNNING, ) first_item_id = repo.create_item( job_stage_id=stage_id, item_type="song_download", item_key="song:first-speed", song_id=19, status=ItemStatus.PENDING, payload={"row": {"id": 19, "name": "First Speed Song"}}, ) repo.claim_item(item_id=first_item_id, worker_name="download-1") repo.update_worker_state( "download-1", status="running", current_job_item_id=first_item_id, current_song_id=19, current_display_text="First Speed Song", downloaded_bytes=4 * 1024 * 1024, total_bytes=8 * 1024 * 1024, speed_bytes_per_sec=2 * 1024 * 1024, progress_percent=50.0, ) self.assertTrue(repo.mark_item_succeeded(first_item_id)) second_item_id = repo.create_item( job_stage_id=stage_id, item_type="song_download", item_key="song:second-speed", song_id=20, status=ItemStatus.PENDING, payload={"row": {"id": 20, "name": "Second Speed Song"}}, ) repo.claim_item(item_id=second_item_id, worker_name="download-1") api_response = client.get("/api/dashboard") self.assertEqual(200, api_response.status_code) payload = api_response.json() self.assertEqual("0 B/s", payload["transfer_stats"]["download_speed_text"]) worker = next(worker for worker in payload["workers"] if worker["worker_name"] == "download-1") self.assertEqual(second_item_id, int(worker["current_job_item_id"])) self.assertEqual("0 B/s", worker["speed_text"]) def test_dashboard_transfer_stats_ignore_stale_historical_workers(self): from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus from musicdl.catalogsync.ops.repository import OpsRepository client, db_path, _ = self._build_client() repo = OpsRepository(db_path) live_job_id = repo.create_job( job_type="download_only", config_snapshot={}, status=JobStatus.RUNNING, ) live_stage_id = repo.create_stage( job_run_id=live_job_id, stage_type="download", seq_no=1, status=StageStatus.RUNNING, ) live_item_id = repo.create_item( job_stage_id=live_stage_id, item_type="song_download", item_key="song:live-bandwidth", song_id=91, status=ItemStatus.PENDING, payload={"row": {"id": 91, "name": "Live Speed Song"}}, ) repo.claim_item(item_id=live_item_id, worker_name="download-live-1") repo.update_worker_state( "download-live-1", status="running", current_job_item_id=live_item_id, current_song_id=91, current_display_text="Live Speed Song", downloaded_bytes=4 * 1024 * 1024, total_bytes=8 * 1024 * 1024, speed_bytes_per_sec=2 * 1024 * 1024, progress_percent=50.0, ) stale_job_id = repo.create_job( job_type="download_only", config_snapshot={}, status=JobStatus.COMPLETED, ) stale_stage_id = repo.create_stage( job_run_id=stale_job_id, stage_type="download", seq_no=1, status=StageStatus.COMPLETED, ) stale_item_id = repo.create_item( job_stage_id=stale_stage_id, item_type="song_download", item_key="song:stale-bandwidth", song_id=92, status=ItemStatus.PENDING, payload={"row": {"id": 92, "name": "Stale Speed Song"}}, ) repo.claim_item(item_id=stale_item_id, worker_name="download-stale-1") repo.update_worker_state( "download-stale-1", status="running", current_job_item_id=stale_item_id, current_song_id=92, current_display_text="Stale Speed Song", downloaded_bytes=80 * 1024 * 1024, total_bytes=80 * 1024 * 1024, speed_bytes_per_sec=96 * 1024 * 1024, progress_percent=100.0, ) self.assertTrue(repo.mark_item_succeeded(stale_item_id)) with repo._connection() as conn: conn.execute( """ UPDATE job_workers SET status = 'running', current_job_item_id = ?, current_song_id = ?, current_display_text = ?, downloaded_bytes = ?, total_bytes = ?, speed_bytes_per_sec = ?, progress_percent = ? WHERE worker_name = ? """, ( stale_item_id, 92, "Stale Speed Song", 80 * 1024 * 1024, 80 * 1024 * 1024, 96 * 1024 * 1024, 100.0, "download-stale-1", ), ) api_response = client.get("/api/dashboard") self.assertEqual(200, api_response.status_code) payload = api_response.json() self.assertEqual("2.0 MB/s", payload["transfer_stats"]["download_speed_text"]) self.assertEqual( ["download-live-1"], [worker["worker_name"] for worker in payload["workers"]], ) def test_api_dashboard_includes_paused_jobs_in_task_center_rows(self): from musicdl.catalogsync.ops.models import JobStatus from musicdl.catalogsync.ops.repository import OpsRepository client, db_path, _ = self._build_client() repo = OpsRepository(db_path) paused_job = repo.create_job( job_type="download_only", config_snapshot={}, status=JobStatus.PAUSED, playlist_scope={"playlist_ids": [77]}, ) response = client.get("/api/dashboard") self.assertEqual(200, response.status_code) payload = response.json() self.assertIn("task_rows", payload) by_id = {int(row["id"]): row for row in payload["task_rows"]} self.assertIn(paused_job, by_id) self.assertEqual("paused", by_id[paused_job]["status"]) def test_api_dashboard_includes_completed_jobs_in_task_center_rows(self): from musicdl.catalogsync.ops.models import JobStatus from musicdl.catalogsync.ops.repository import OpsRepository client, db_path, _ = self._build_client() repo = OpsRepository(db_path) completed_job = repo.create_job( job_type="download_only", config_snapshot={}, status=JobStatus.COMPLETED, playlist_scope={"playlist_ids": [88]}, ) response = client.get("/api/dashboard") self.assertEqual(200, response.status_code) payload = response.json() self.assertIn("task_rows", payload) by_id = {int(row["id"]): row for row in payload["task_rows"]} self.assertIn(completed_job, by_id) self.assertEqual("completed", by_id[completed_job]["status"]) def test_api_dashboard_can_omit_task_rows_for_lightweight_refresh(self): from musicdl.catalogsync.ops.models import JobStatus from musicdl.catalogsync.ops.repository import OpsRepository client, db_path, _ = self._build_client() repo = OpsRepository(db_path) repo.create_job( job_type="download_only", config_snapshot={}, status=JobStatus.RUNNING, ) response = client.get("/api/dashboard?include_task_rows=false") self.assertEqual(200, response.status_code) payload = response.json() self.assertNotIn("task_rows", payload) self.assertIn("summary", payload) self.assertIn("workers", payload) def test_api_local_duplicates_scan_returns_summary_and_groups(self): from musicdl.catalogsync.models import CatalogSong from musicdl.catalogsync.repository import CatalogRepository client, db_path, _ = self._build_client() repo = CatalogRepository(db_path) song_id = repo.upsert_song( CatalogSong( platform="qq", remote_song_id="api-dup-song", name="API Duplicate Song", singers="Singer API", ext="flac", file_size_bytes=5, quality_label="lossless", metadata={}, ) ) library_root = Path(db_path).parent / "library" backend_id = repo.ensure_local_backend(library_root, name="default-local", is_default=True) repo.record_local_file( song_id=song_id, backend_id=backend_id, relative_path="Singer API/API Duplicate Song.flac", file_size_bytes=5, ext="flac", quality_label="lossless", ) repo.record_local_file( song_id=song_id, backend_id=backend_id, relative_path="Singer API/API Duplicate Song (1).flac", file_size_bytes=5, ext="flac", quality_label="lossless", ) (library_root / "Singer API").mkdir(parents=True, exist_ok=True) (library_root / "Singer API" / "API Duplicate Song.flac").write_bytes(b"12345") (library_root / "Singer API" / "API Duplicate Song (1).flac").write_bytes(b"12345") response = client.get("/api/maintenance/local-duplicates") self.assertEqual(200, response.status_code) payload = response.json() self.assertEqual(1, payload["summary"]["duplicate_group_count"]) self.assertEqual(1, payload["summary"]["duplicate_location_count"]) self.assertEqual(1, len(payload["groups"])) group = payload["groups"][0] self.assertEqual(song_id, group["song_id"]) self.assertEqual("Singer API/API Duplicate Song.flac", group["keep"]["locator"]) self.assertEqual( "Singer API/API Duplicate Song (1).flac", group["duplicates"][0]["locator"], ) def test_api_local_duplicates_dedupe_inactivates_duplicates_and_redirects_references(self): from musicdl.catalogsync.models import CatalogSong from musicdl.catalogsync.ops.models import JobStatus from musicdl.catalogsync.ops.repository import OpsRepository from musicdl.catalogsync.repository import CatalogRepository client, db_path, _ = self._build_client() repo = CatalogRepository(db_path) song_id = repo.upsert_song( CatalogSong( platform="qq", remote_song_id="api-dup-song-dedupe", name="API Dedupe Song", singers="Singer API", ext="flac", file_size_bytes=6, quality_label="lossless", metadata={}, ) ) library_root = Path(db_path).parent / "library" backend_id = repo.ensure_local_backend(library_root, name="default-local", is_default=True) asset_id = repo.record_local_file( song_id=song_id, backend_id=backend_id, relative_path="Singer API/API Dedupe Song.flac", file_size_bytes=6, ext="flac", quality_label="lossless", ) repo.record_local_file( song_id=song_id, backend_id=backend_id, relative_path="Singer API/API Dedupe Song (1).flac", file_size_bytes=6, ext="flac", quality_label="lossless", ) (library_root / "Singer API").mkdir(parents=True, exist_ok=True) (library_root / "Singer API" / "API Dedupe Song.flac").write_bytes(b"123456") duplicate_path = library_root / "Singer API" / "API Dedupe Song (1).flac" duplicate_path.write_bytes(b"123456") canonical_location = repo._fetchone( "SELECT id FROM file_locations WHERE locator = ?", ("Singer API/API Dedupe Song.flac",), ) duplicate_location = repo._fetchone( "SELECT id FROM file_locations WHERE locator = ?", ("Singer API/API Dedupe Song (1).flac",), ) remote_backend_id = repo.upsert_object_storage_backend( name="test-bucket", container_name="music", endpoint="https://s3.example.invalid", region=None, base_prefix="catalogsync", credential_env_prefix="CATALOGSYNC_TEST", public_base_url="https://cdn.example.invalid", ) upload_task_id = repo.enqueue_upload_task( file_asset_id=asset_id, source_location_id=int(duplicate_location["id"]), target_backend_id=remote_backend_id, target_container_name="music", target_locator="Singer API/API Dedupe Song.flac", ) ops_repo = OpsRepository(db_path) job_id = ops_repo.create_job( job_type="upload_only", config_snapshot={}, status=JobStatus.QUEUED, ) stage_id = ops_repo.create_stage(job_run_id=job_id, stage_type="upload", seq_no=1) item_id = ops_repo.create_item( job_stage_id=stage_id, item_type="song_upload", item_key="upload:api-dedupe-song", song_id=song_id, file_location_id=int(duplicate_location["id"]), ) response = client.post("/api/maintenance/local-duplicates/dedupe") self.assertEqual(200, response.status_code) payload = response.json() self.assertEqual(0, payload["summary"]["duplicate_group_count"]) self.assertEqual(1, payload["execution"]["inactive_location_count"]) self.assertEqual(1, payload["execution"]["deleted_file_count"]) self.assertEqual(1, payload["execution"]["repointed_upload_task_count"]) self.assertEqual(1, payload["execution"]["repointed_job_item_count"]) duplicate_row = repo._fetchone( "SELECT status FROM file_locations WHERE id = ?", (int(duplicate_location["id"]),), ) self.assertEqual("inactive", duplicate_row["status"]) upload_row = repo._fetchone( "SELECT source_location_id FROM upload_tasks WHERE id = ?", (upload_task_id,), ) self.assertEqual(int(canonical_location["id"]), int(upload_row["source_location_id"])) job_item = ops_repo._fetchone( "SELECT file_location_id FROM job_items WHERE id = ?", (item_id,), ) self.assertEqual(int(canonical_location["id"]), int(job_item["file_location_id"])) self.assertFalse(duplicate_path.exists()) def test_api_local_duplicates_dedupe_rejects_while_jobs_are_running(self): from musicdl.catalogsync.models import CatalogSong from musicdl.catalogsync.ops.models import JobStatus from musicdl.catalogsync.ops.repository import OpsRepository from musicdl.catalogsync.repository import CatalogRepository client, db_path, _ = self._build_client() repo = CatalogRepository(db_path) song_id = repo.upsert_song( CatalogSong( platform="qq", remote_song_id="api-dup-song-blocked", name="API Blocked Song", singers="Singer API", ext="flac", file_size_bytes=4, quality_label="lossless", metadata={}, ) ) library_root = Path(db_path).parent / "library" backend_id = repo.ensure_local_backend(library_root, name="default-local", is_default=True) repo.record_local_file( song_id=song_id, backend_id=backend_id, relative_path="Singer API/API Blocked Song.flac", file_size_bytes=4, ext="flac", quality_label="lossless", ) repo.record_local_file( song_id=song_id, backend_id=backend_id, relative_path="Singer API/API Blocked Song (1).flac", file_size_bytes=4, ext="flac", quality_label="lossless", ) (library_root / "Singer API").mkdir(parents=True, exist_ok=True) (library_root / "Singer API" / "API Blocked Song.flac").write_bytes(b"1234") (library_root / "Singer API" / "API Blocked Song (1).flac").write_bytes(b"1234") ops_repo = OpsRepository(db_path) ops_repo.create_job( job_type="download_only", config_snapshot={}, status=JobStatus.RUNNING, ) response = client.post("/api/maintenance/local-duplicates/dedupe") self.assertEqual(409, response.status_code) self.assertIn("running", response.json()["detail"]) def test_api_playlists_supports_pagination_and_filter_validation(self): client, db_path, _ = self._build_client() self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="page-1", name="Page 1", play_count=123456, ) self._seed_playlist( db_path, platform="qq", pool_kind="playlist_square", remote_id="page-2", name="Page 2", ) self._seed_playlist( db_path, platform="netease", pool_kind="manual_file", remote_id="page-3", name="Page 3", ) page_response = client.get("/api/playlists?page=1&page_size=20") self.assertEqual(200, page_response.status_code) page_payload = page_response.json() self.assertIn("items", page_payload) self.assertIn("total_count", page_payload) self.assertIn("page_size", page_payload) self.assertEqual(20, page_payload["page_size"]) self.assertEqual(3, page_payload["total_count"]) self.assertEqual(3, len(page_payload["items"])) by_name = {item["name"]: item for item in page_payload["items"]} self.assertEqual(123456, by_name["Page 1"]["play_count"]) empty_status_response = client.get("/api/playlists?status=&page=1&page_size=20") self.assertEqual(200, empty_status_response.status_code) empty_status_payload = empty_status_response.json() self.assertEqual(page_payload["total_count"], empty_status_payload["total_count"]) self.assertEqual(page_payload["page_size"], empty_status_payload["page_size"]) self.assertEqual( [item["id"] for item in page_payload["items"]], [item["id"] for item in empty_status_payload["items"]], ) empty_wanted_response = client.get("/api/playlists?wanted_only=&page=1&page_size=20") self.assertEqual(200, empty_wanted_response.status_code) empty_wanted_payload = empty_wanted_response.json() self.assertEqual(page_payload["total_count"], empty_wanted_payload["total_count"]) self.assertEqual(page_payload["page_size"], empty_wanted_payload["page_size"]) self.assertEqual( [item["id"] for item in page_payload["items"]], [item["id"] for item in empty_wanted_payload["items"]], ) playlists_page_with_empty_status = client.get("/playlists?status=") self.assertEqual(200, playlists_page_with_empty_status.status_code) self.assertIn("Page 1", playlists_page_with_empty_status.text) playlists_page_with_empty_wanted = client.get("/playlists?wanted_only=") self.assertEqual(200, playlists_page_with_empty_wanted.status_code) self.assertIn("Page 1", playlists_page_with_empty_wanted.text) invalid_status = client.get("/api/playlists?status=bad-status") self.assertEqual(422, invalid_status.status_code) invalid_wanted_only = client.get("/api/playlists?wanted_only=maybe") self.assertEqual(422, invalid_wanted_only.status_code) invalid_page_size = client.get("/api/playlists?page_size=30") self.assertEqual(422, invalid_page_size.status_code) def test_api_playlists_supports_sorting_and_exposes_collected_song_count(self): client, db_path, _ = self._build_client() self._seed_playlist( db_path, platform="qq", pool_kind="playlist_square", remote_id="sort-api-001", name="Zulu API Playlist", play_count=100, ) self._seed_playlist( db_path, platform="netease", pool_kind="playlist_square", remote_id="sort-api-002", name="Alpha API Playlist", play_count=300, ) self._seed_playlist( db_path, platform="kuwo", pool_kind="playlist_square", remote_id="sort-api-003", name="Collected API Playlist", collected_song_count=42, ) response = client.get("/api/playlists?page=1&page_size=20&sort_by=name&sort_dir=asc") self.assertEqual(200, response.status_code) payload = response.json() self.assertEqual( ["Alpha API Playlist", "Collected API Playlist", "Zulu API Playlist"], [item["name"] for item in payload["items"]], ) collected_row = next(item for item in payload["items"] if item["name"] == "Collected API Playlist") self.assertEqual(42, collected_row["collected_song_count"]) self.assertEqual(42, collected_row["display_song_count"]) self.assertTrue(collected_row["is_song_count_estimated"]) play_count_response = client.get("/api/playlists?page=1&page_size=20&sort_by=play_count&sort_dir=desc") self.assertEqual(200, play_count_response.status_code) play_count_payload = play_count_response.json() self.assertEqual( ["Alpha API Playlist", "Zulu API Playlist", "Collected API Playlist"], [item["name"] for item in play_count_payload["items"]], ) def test_playlists_page_renders_management_controls_and_filters(self): client, db_path, _ = self._build_client() playlist_id = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="page-render-1", name="Playlist Render Seed", play_count=7654321, ) song_a = self._seed_song( db_path, platform="qq", remote_id="page-render-song-1", name="Playlist Render Song 1", ) song_b = self._seed_song( db_path, platform="qq", remote_id="page-render-song-2", name="Playlist Render Song 2", ) self._link_playlist_song(db_path, playlist_id=playlist_id, song_id=song_a, position=1) self._link_playlist_song(db_path, playlist_id=playlist_id, song_id=song_b, position=2) self._mark_local_downloaded( db_path, song_id=song_a, relative_path="qq/Singer A/page-render-song-1.mp3", ) response = client.get("/playlists") self.assertEqual(200, response.status_code) html = response.text self.assertIn('name="status"', html) self.assertIn('name="page_size"', html) self.assertIn('name="wanted_only"', html) self.assertIn("data-playlist-select-all", html) self.assertIn("data-playlist-clear-selection", html) self.assertIn('data-playlist-action="download"', html) self.assertNotIn('data-playlist-action="sync-download"', html) self.assertIn('data-playlist-action="export-selected"', html) self.assertIn("Export Selected", html) self.assertIn('data-playlist-action="mark-wanted"', html) self.assertIn('data-playlist-action="unmark-wanted"', html) self.assertIn('value="collect_only"', html) self.assertIn("Collect Playlist Sources", html) self.assertIn("data-playlist-pagination", html) self.assertIn("data-playlists-page", html) self.assertIn("data-playlist-checkbox", html) self.assertIn("sort_by=platform", html) self.assertIn("sort_by=play_count", html) self.assertIn("Progress", html) self.assertIn("Status", html) self.assertIn("Downloaded", html) self.assertIn("1 / 2", html) self.assertIn("50%", html) self.assertIn("7654321", html) self.assertIn("Playlist Render Seed", html) self.assertNotIn("Idle", html) def test_playlists_page_renders_sortable_headers_and_collected_song_count_hint(self): client, db_path, _ = self._build_client() self._seed_playlist( db_path, platform="qq", pool_kind="playlist_square", remote_id="playlist-render-collected", name="Collected Render Playlist", collected_song_count=42, ) response = client.get("/playlists?sort_by=name&sort_dir=asc") self.assertEqual(200, response.status_code) html = response.text self.assertIn("sort_by=id", html) self.assertIn("sort_by=platform", html) self.assertIn("sort_by=name", html) self.assertIn("sort_by=play_count", html) self.assertIn('data-playlist-sort-link="id"', html) self.assertIn('data-playlist-sort-link="platform"', html) self.assertIn('data-playlist-sort-link="name"', html) self.assertIn('data-playlist-sort-link="play_count"', html) self.assertIn('data-playlist-sort-indicator="name">^', html) self.assertIn("Collected 42", html) def test_playlists_page_applies_name_sort_order_to_rendered_rows(self): client, db_path, _ = self._build_client() self._seed_playlist( db_path, platform="qq", pool_kind="playlist_square", remote_id="playlist-sort-page-1", name="Zulu Page Playlist", play_count=100, ) self._seed_playlist( db_path, platform="netease", pool_kind="playlist_square", remote_id="playlist-sort-page-2", name="Alpha Page Playlist", play_count=300, ) self._seed_playlist( db_path, platform="kuwo", pool_kind="playlist_square", remote_id="playlist-sort-page-3", name="Middle Page Playlist", play_count=200, ) response = client.get("/playlists?sort_by=name&sort_dir=asc") self.assertEqual(200, response.status_code) html = response.text alpha_index = html.index("Alpha Page Playlist") middle_index = html.index("Middle Page Playlist") zulu_index = html.index("Zulu Page Playlist") self.assertLess(alpha_index, middle_index) self.assertLess(middle_index, zulu_index) def test_playlists_page_renders_sync_selected_playlists_action(self): client, db_path, _ = self._build_client() self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="playlist-render-sync", name="Playlist Render Sync", ) response = client.get("/playlists") self.assertEqual(200, response.status_code) html = response.text self.assertIn("data-playlists-page", html) self.assertIn('data-playlist-action="sync"', html) self.assertIn("data-playlist-select-all", html) def test_playlists_page_renders_clickable_synced_playlist_name_and_song_modal_shell(self): client, db_path, _ = self._build_client() synced_playlist_id = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="playlist-preview-ready", name="Playlist Preview Ready", ) empty_playlist_id = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="playlist-preview-empty", name="Playlist Preview Empty", ) synced_song_id = self._seed_song( db_path, platform="qq", remote_id="playlist-preview-song-1", name="Playlist Preview Song 1", ) self._link_playlist_song( db_path, playlist_id=synced_playlist_id, song_id=synced_song_id, position=1, ) response = client.get("/playlists") self.assertEqual(200, response.status_code) html = response.text self.assertIn(f'data-playlist-open-songs="{synced_playlist_id}"', html) self.assertNotIn(f'data-playlist-open-songs="{empty_playlist_id}"', html) self.assertIn("data-playlist-songs-modal", html) self.assertIn("data-playlist-export", html) def test_api_playlist_songs_exposes_export_ready_song_metadata_and_locations(self): client, db_path, _ = self._build_client() playlist_id = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="playlist-export-ready", name="Playlist Export Ready", play_count=987654, ) song_id = self._seed_song( db_path, platform="qq", remote_id="playlist-export-song-1", name="Playlist Export Song 1", singers="Singer Export", ) self._link_playlist_song(db_path, playlist_id=playlist_id, song_id=song_id, position=1) self._mark_remote_uploaded( db_path, song_id=song_id, relative_path="qq/Singer Export/playlist-export-song-1.mp3", public_url="https://cdn.example.invalid/qq/Singer%20Export/playlist-export-song-1.mp3", download_url="https://download.example.invalid/qq/Singer%20Export/playlist-export-song-1.mp3", ) response = client.get(f"/api/playlists/{playlist_id}/songs") self.assertEqual(200, response.status_code) payload = response.json() self.assertIn("playlist", payload) self.assertIn("items", payload) self.assertEqual(playlist_id, payload["playlist"]["id"]) self.assertEqual("Playlist Export Ready", payload["playlist"]["name"]) self.assertEqual("qq", payload["playlist"]["platform"]) self.assertEqual(987654, payload["playlist"]["play_count"]) self.assertEqual(1, len(payload["items"])) row = payload["items"][0] self.assertEqual(song_id, row["song_id"]) self.assertEqual("Playlist Export Song 1", row["name"]) self.assertEqual("Singer Export", row["singers"]) self.assertEqual("mp3", row["ext"]) self.assertEqual(128, row["file_size_bytes"]) self.assertTrue(str(row["local_file_path"]).endswith("qq\\Singer Export\\playlist-export-song-1.mp3")) self.assertEqual(1, len(row["uploaded_locations"])) uploaded_row = row["uploaded_locations"][0] self.assertEqual("catalog-cloud", uploaded_row["backend_name"]) self.assertEqual("object_storage", uploaded_row["backend_type"]) self.assertEqual( "https://cdn.example.invalid/qq/Singer%20Export/playlist-export-song-1.mp3", uploaded_row["url"], ) def test_api_playlist_export_folder_returns_existing_playlist_directory(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.ops.web import create_app with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: root = Path(tmpdir) db_path = root / "catalogsync.db" env_path = root / "catalogsync.env" env_path.write_text( f"ROOT_DIR={root.as_posix()}\nDOWNLOAD_SOURCES=qq\n", encoding="utf-8", ) initialize_database(db_path, default_library_root=root / "library").close() playlist_id = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="playlist-export-folder", name="Playlist Export Folder", play_count=456789, ) playlist_dir = root / "playlists" / f"Playlist Export Folder_{playlist_id}" covers_dir = playlist_dir / "covers" covers_dir.mkdir(parents=True, exist_ok=True) (playlist_dir / "playlist.yaml").write_text( "\n".join( [ f"playlist_id: {playlist_id}", "playlist_name: Playlist Export Folder", "platform: qq", "play_count: 456789", "", ] ), encoding="utf-8", ) (playlist_dir / ".playlist_meta.json").write_text( json.dumps( { "playlist_id": playlist_id, "platform": "qq", "remote_playlist_id": "playlist-export-folder", "name": "Playlist Export Folder", }, ensure_ascii=False, ), encoding="utf-8", ) (covers_dir / "playlist-cover.jpg").write_bytes(b"fake-cover") app = create_app(db_path=db_path, env_path=env_path) client = TestClient(app) self.addCleanup(client.close) response = client.get(f"/api/playlists/{playlist_id}/export-folder") self.assertEqual(200, response.status_code) payload = response.json() self.assertEqual(playlist_id, payload["playlist"]["id"]) self.assertTrue(payload["folder"]["exists"]) self.assertTrue(str(payload["folder"]["path"]).endswith(f"Playlist Export Folder_{playlist_id}")) relative_paths = {str(row.get("relative_path") or "") for row in payload["folder"]["files"]} self.assertIn("playlist.yaml", relative_paths) self.assertIn("covers/playlist-cover.jpg", relative_paths) def test_api_playlist_export_folder_refreshes_yaml_from_latest_db_state(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.ops.web import create_app with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: root = Path(tmpdir) db_path = root / "catalogsync.db" env_path = root / "catalogsync.env" env_path.write_text( f"ROOT_DIR={root.as_posix()}\nDOWNLOAD_SOURCES=qq\n", encoding="utf-8", ) initialize_database(db_path, default_library_root=root / "library").close() playlist_id = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="playlist-export-refresh", name="Playlist Export Refresh", play_count=456789, ) song_id = self._seed_song( db_path, platform="qq", remote_id="playlist-export-refresh-song", name="Playlist Export Refresh Song", singers="Singer Refresh", ) self._link_playlist_song(db_path, playlist_id=playlist_id, song_id=song_id, position=1) self._mark_local_downloaded( db_path, song_id=song_id, relative_path="qq/Singer Refresh/playlist-export-refresh-song.mp3", ) playlist_dir = root / "playlists" / f"Playlist Export Refresh_{playlist_id}" covers_dir = playlist_dir / "covers" covers_dir.mkdir(parents=True, exist_ok=True) stale_yaml_path = playlist_dir / "playlist.yaml" stale_yaml_path.write_text( "\n".join( [ f"playlist_id: {playlist_id}", "playlist_name: Playlist Export Refresh", "platform: qq", "songs:", " - local_song_id: 1", " local_file_path: null", "", ] ), encoding="utf-8", ) (playlist_dir / ".playlist_meta.json").write_text( json.dumps( { "playlist_id": playlist_id, "platform": "qq", "remote_playlist_id": "playlist-export-refresh", "name": "Playlist Export Refresh", }, ensure_ascii=False, ), encoding="utf-8", ) app = create_app(db_path=db_path, env_path=env_path) client = TestClient(app) self.addCleanup(client.close) response = client.get(f"/api/playlists/{playlist_id}/export-folder") refreshed_yaml = stale_yaml_path.read_text(encoding="utf-8") self.assertEqual(200, response.status_code) self.assertIn("playlist-export-refresh-song.mp3", refreshed_yaml) self.assertNotIn("local_file_path: null", refreshed_yaml) def test_job_detail_page_and_api_include_playlist_progress(self): from musicdl.catalogsync.ops.models import ItemStatus from musicdl.catalogsync.ops.repository import OpsRepository client, db_path, _ = self._build_client() playlist_id = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="job-detail-progress-1", name="Job Detail Playlist", ) song_a = self._seed_song( db_path, platform="qq", remote_id="job-detail-song-1", name="Job Detail Song 1", ) song_b = self._seed_song( db_path, platform="qq", remote_id="job-detail-song-2", name="Job Detail Song 2", ) self._link_playlist_song(db_path, playlist_id=playlist_id, song_id=song_a, position=1) self._link_playlist_song(db_path, playlist_id=playlist_id, song_id=song_b, position=2) self._mark_local_downloaded( db_path, song_id=song_a, relative_path="qq/Singer A/job-detail-song-1.mp3", ) repo = OpsRepository(db_path) job_id = repo.create_job( job_type="download_only", config_snapshot={}, playlist_scope={"playlist_ids": [playlist_id]}, ) stage_id = repo.create_stage(job_run_id=job_id, stage_type="download", seq_no=1) repo.create_item( job_stage_id=stage_id, item_type="song_download", item_key=f"song:{song_b}", song_id=song_b, status=ItemStatus.PENDING, ) api_response = client.get(f"/api/jobs/{job_id}") self.assertEqual(200, api_response.status_code) api_payload = api_response.json() self.assertIn("playlist_progress", api_payload) self.assertEqual(1, len(api_payload["playlist_progress"])) row = api_payload["playlist_progress"][0] self.assertEqual(playlist_id, row["playlist_id"]) self.assertEqual("Job Detail Playlist", row["playlist_name"]) self.assertEqual(2, row["total_songs"]) self.assertEqual(1, row["downloaded_songs"]) self.assertEqual(1, row["pending_songs"]) self.assertEqual(50, row["progress_percent"]) page_response = client.get(f"/jobs/{job_id}") self.assertEqual(200, page_response.status_code) html = page_response.text self.assertIn("Playlist Progress", html) self.assertIn("Job Detail Playlist", html) self.assertIn("1 / 2", html) self.assertIn("50%", html) def test_api_job_playlist_songs_exposes_non_music_resource_note(self): from musicdl.catalogsync.ops.models import ItemStatus from musicdl.catalogsync.ops.repository import OpsRepository client, db_path, _ = self._build_client() playlist_id = self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id="job-detail-non-music-1", name="Job Detail Non Music Playlist", ) song_id = self._seed_song( db_path, platform="qq", remote_id="qqtop_75_non_music_seed", name="Audio Program", singers="Narrator", metadata={ "snapshot": { "raw_data": { "search": { "qq_toplist_fallback": True, } } } }, ) self._link_playlist_song(db_path, playlist_id=playlist_id, song_id=song_id, position=1) repo = OpsRepository(db_path) job_id = repo.create_job( job_type="download_only", config_snapshot={}, playlist_scope={"playlist_ids": [playlist_id]}, ) stage_id = repo.create_stage(job_run_id=job_id, stage_type="download", seq_no=1) repo.create_item( job_stage_id=stage_id, item_type="song_download", item_key=f"song:{song_id}", song_id=song_id, status=ItemStatus.SKIPPED, payload={"row": {"id": song_id}}, ) response = client.get(f"/api/jobs/{job_id}/playlists/{playlist_id}/songs") self.assertEqual(200, response.status_code) payload = response.json() self.assertIn("items", payload) self.assertEqual(1, len(payload["items"])) row = payload["items"][0] self.assertEqual(song_id, row["song_id"]) self.assertEqual("skipped", row["status"]) self.assertTrue(row["is_non_music_resource"]) self.assertIn("非音乐资源", row["status_note"]) def test_playlists_pagination_links_encode_keyword_query(self): client, db_path, _ = self._build_client() keyword = "A & B =" for index in range(1, 42): self._seed_playlist( db_path, platform="qq", pool_kind="manual_file", remote_id=f"page-encoded-{index}", name=f"{keyword} Playlist {index}", ) response = client.get( "/playlists", params={"page": 2, "page_size": 20, "keyword": keyword}, ) self.assertEqual(200, response.status_code) html = response.text self.assertIn("Previous", html) self.assertIn("Next", html) self.assertTrue( "keyword=A+%26+B+%3D" in html or "keyword=A%20%26%20B%20%3D" in html ) def test_api_playlist_bulk_endpoints_reject_empty_playlist_ids(self): client, _, _ = self._build_client() for path in ( "/api/playlists/mark-wanted", "/api/playlists/unmark-wanted", "/api/playlists/sync", "/api/playlists/download", "/api/playlists/sync-download", ): response = client.post(path, json={"playlist_ids": []}) self.assertEqual(422, response.status_code, msg=path) def test_embedded_runner_recovers_from_transient_loop_failure(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.ops.models import JobStatus from musicdl.catalogsync.ops.repository import OpsRepository from musicdl.catalogsync.ops.web import create_app tmpdir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True) self.addCleanup(tmpdir.cleanup) root = Path(tmpdir.name) db_path = root / "catalogsync.db" env_path = root / "catalogsync.env" env_path.write_text("ROOT_DIR=/music\nDOWNLOAD_SOURCES=qq\n", encoding="utf-8") initialize_database(db_path).close() repo = OpsRepository(db_path) job_id = repo.create_job(job_type="collect_only", config_snapshot={}) attempts = {"count": 0} def flaky_loop_once(self): attempts["count"] += 1 if attempts["count"] == 1: raise RuntimeError("transient embedded runner failure") job = self.repository.claim_and_mark_next_runnable_job() if job is None: return False self.repository.mark_job_finished(job.id, status=JobStatus.COMPLETED) return True app = create_app( db_path=db_path, env_path=env_path, start_runner=True, runner_sleep_seconds=0.01, ) with patch("musicdl.catalogsync.ops.runner.OpsRunner.loop_once", new=flaky_loop_once): with TestClient(app): deadline = time.time() + 1.5 while time.time() < deadline: if repo.get_job(job_id).status == JobStatus.COMPLETED: break time.sleep(0.05) job = repo.get_job(job_id) self.assertGreaterEqual(attempts["count"], 2) self.assertEqual(JobStatus.COMPLETED, job.status) if __name__ == "__main__": unittest.main()