# Download Dual-Pool Pipeline Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Split the download stage into resolver workers and downloader workers so configured download concurrency is spent on real transfers instead of long source-resolution work. **Architecture:** Keep deferred snapshots and current database schema unchanged, refactor `CatalogDownloader` into explicit resolve-only and download-only phases, and add a runner-level in-memory `ready_queue` that connects a small resolver pool to a larger downloader pool during `download` stages. **Tech Stack:** Python, sqlite3, unittest, ThreadPoolExecutor, queue.Queue, FastAPI ops dashboard --- ## File Map ### Existing Files To Modify - `musicdl/catalogsync/downloader.py` - Split current mixed `resolve + download` flow into reusable resolve-only and download-only methods. - `musicdl/catalogsync/ops/executors.py` - Add download-stage helpers for resolve-only and download-only item handling without breaking existing item status semantics. - `musicdl/catalogsync/ops/runner.py` - Add the runner-level dual-pool execution path for `download` stages. - `tests/catalogsync/test_services.py` - Lock the downloader API refactor with unit tests. - `tests/catalogsync/test_ops_executors.py` - Lock download-stage executor behavior for resolved tasks and failure handling. - `tests/catalogsync/test_ops_runner.py` - Lock the dual-pool queueing model, worker split, and stage lifecycle. - `tests/catalogsync/test_ops_api.py` - Lock dashboard worker visibility for resolver and downloader worker families if any payload expectations need adjustment. ### New Files To Create - None required for the first implementation. Keep the change focused and schema-free. --- ### Task 1: Lock `CatalogDownloader` Into Resolve And Download Phases **Files:** - Modify: `tests/catalogsync/test_services.py` - Modify: `musicdl/catalogsync/downloader.py` - [ ] **Step 1: Write the failing tests for resolve-only and download-only behavior** Add tests near the existing downloader tests in `tests/catalogsync/test_services.py` for: ```python def test_catalog_downloader_resolve_song_row_returns_resolved_payload(): payload = downloader.resolve_song_row( row={ "id": song_id, "playlist_id": 123, "platform": "netease", "name": "Song Resolve", "singers": "Singer Resolve", "ext": "mp3", "file_size_bytes": 24, "metadata_json": '{"snapshot":{"identifier":"song-resolve"}}', }, library_root=library_root, download_sources=["qq", "kuwo"], worker_callback=lambda **state: worker_updates.append(dict(state)), ) assert payload is not None assert payload["row"]["id"] == song_id assert payload["display_text"] == "Song Resolve / Singer Resolve" assert payload["resolved_song_info"].source == "QQMusicClient" ``` ```python def test_catalog_downloader_download_resolved_song_reports_progress_and_records_file(): ok = downloader.download_resolved_song( resolved_payload=resolved_payload, worker_callback=lambda **state: worker_updates.append(dict(state)), ) assert ok is True assert any(int(state.get("downloaded_bytes") or 0) > 0 for state in worker_updates) assert repo.song_has_active_local_file(song_id) is True ``` ```python def test_catalog_downloader_download_song_row_remains_a_compatibility_wrapper(): ok = downloader.download_song_row( row=row, library_root=library_root, download_sources=["qq"], worker_callback=lambda **state: worker_updates.append(dict(state)), ) assert ok is True assert any("resolving source" in str(state.get("last_progress_text") or "") for state in worker_updates) assert any("starting download via" in str(state.get("last_progress_text") or "") for state in worker_updates) ``` - [ ] **Step 2: Run the focused tests and verify they fail for missing APIs** Run: ```bash python -m pytest tests/catalogsync/test_services.py -k "resolve_song_row or download_resolved_song or compatibility_wrapper" -q ``` Expected: - `FAILED` - failure mentions missing `resolve_song_row` or `download_resolved_song`, or existing wrapper behavior not matching the new tests - [ ] **Step 3: Implement minimal resolve-only and download-only APIs in `CatalogDownloader`** Refactor `musicdl/catalogsync/downloader.py` toward this shape: ```python @dataclass class ResolvedDownloadPayload: row: dict[str, object] display_text: str default_root: Path target_root: Path backend_id: int expected_bytes: int | None resolved_song_info: object def resolve_song_row( self, row, library_root: str | Path, download_sources: list[str] | None = None, worker_callback=None, ) -> ResolvedDownloadPayload | None: row_dict = dict(row) default_root = Path(library_root).resolve() self._current_library_root = default_root self.repository.ensure_local_backend(default_root, name="default-local", is_default=True) display_name = str(row_dict.get("name") or row_dict.get("id") or "") singers = str(row_dict.get("singers") or "").strip() display_text = f"{display_name} / {singers}".strip(" /") self._emit_worker_progress(row_dict, worker_callback, display_text=display_text) metadata = json.loads(row_dict["metadata_json"]) if row_dict.get("metadata_json") else {} song_info = deserialize_song_info(metadata.get("snapshot")) if song_info is None: return None resolve_progress_callback = None if worker_callback is not None: resolve_progress_callback = lambda message: self._emit_worker_progress( row_dict, worker_callback, display_text=display_text, last_progress_text=message, ) song_info = self.resolve_song_info_for_download( row=row_dict, song_info=song_info, download_sources=download_sources, progress_callback=resolve_progress_callback, ) download_platform = self._detect_download_platform(song_info, row_dict["platform"]) target_root = self.ensure_space( default_root, getattr(song_info, "file_size_bytes", None) or row_dict.get("file_size_bytes"), ) is_default_root = target_root.resolve() == default_root backend_id = self.repository.ensure_local_backend( target_root, name="default-local" if is_default_root else None, is_default=is_default_root, ) expected_bytes = int(getattr(song_info, "file_size_bytes", None) or row_dict.get("file_size_bytes") or 0) or None return ResolvedDownloadPayload( row=row_dict, display_text=display_text, default_root=default_root, target_root=target_root, backend_id=backend_id, expected_bytes=expected_bytes, resolved_song_info=song_info, ) ``` ```python def download_resolved_song( self, resolved_payload: ResolvedDownloadPayload, worker_callback=None, ) -> bool: row = resolved_payload.row song_info = resolved_payload.resolved_song_info download_platform = self._detect_download_platform(song_info, row["platform"]) client = self.get_client(download_platform) singers = self._normalize_singers(getattr(song_info, "singers", None)) or self._normalize_singers(row.get("singers")) relative_dir = build_download_relative_dir(platform=download_platform, singers=singers) target_dir = resolved_payload.target_root / relative_dir target_dir.mkdir(parents=True, exist_ok=True) song_info.work_dir = str(target_dir) if hasattr(song_info, "_save_path"): song_info._save_path = None self._emit_worker_progress( row, worker_callback, display_text=resolved_payload.display_text, last_progress_text=f"starting download via {download_platform}", ) # keep existing monitor, client.download, and record_local_file logic here ``` ```python def download_song_row(...): resolved_payload = self.resolve_song_row( row=row, library_root=library_root, download_sources=download_sources, worker_callback=worker_callback, ) if resolved_payload is None: return False return self.download_resolved_song( resolved_payload=resolved_payload, worker_callback=worker_callback, ) ``` - [ ] **Step 4: Run the focused tests and verify they pass** Run: ```bash python -m pytest tests/catalogsync/test_services.py -k "resolve_song_row or download_resolved_song or compatibility_wrapper" -q ``` Expected: - `PASS` - no regressions in existing progress tests around `resolving source ...` and `starting download via ...` - [ ] **Step 5: Commit** ```bash git add tests/catalogsync/test_services.py musicdl/catalogsync/downloader.py git commit -m "refactor: split downloader resolve and transfer phases" ``` ### Task 2: Lock Download Executor Helpers For Resolved Tasks **Files:** - Modify: `tests/catalogsync/test_ops_executors.py` - Modify: `musicdl/catalogsync/ops/executors.py` - [ ] **Step 1: Write the failing executor tests for resolve-only and download-only item handling** Add focused tests in `tests/catalogsync/test_ops_executors.py`: ```python def test_download_executor_resolve_item_marks_failed_when_resolution_returns_none(): with patch.object(CatalogDownloader, "resolve_song_row", return_value=None): executor.process_resolve_item( item_id=item_id, worker_name="resolve-1", ready_queue=ready_queue, ) item = ops_repo.get_item(item_id) assert item.status == ItemStatus.FAILED assert ready_queue.empty() ``` ```python def test_download_executor_resolve_item_enqueues_resolved_payload(): resolved_payload = SimpleNamespace(row=row, display_text="Song A / Singer A") with patch.object(CatalogDownloader, "resolve_song_row", return_value=resolved_payload): executor.process_resolve_item( item_id=item_id, worker_name="resolve-1", ready_queue=ready_queue, ) queued = ready_queue.get_nowait() assert queued.item_id == item_id assert queued.resolved_payload is resolved_payload ``` ```python def test_download_executor_download_resolved_item_marks_item_succeeded(): with patch.object(CatalogDownloader, "download_resolved_song", return_value=True): executor.process_download_task( task=resolved_task, worker_name="download-1", ) item = ops_repo.get_item(item_id) assert item.status == ItemStatus.SUCCEEDED ``` - [ ] **Step 2: Run the focused executor tests and verify they fail** Run: ```bash python -m pytest tests/catalogsync/test_ops_executors.py -k "process_resolve_item or process_download_task" -q ``` Expected: - `FAILED` - failure mentions missing `process_resolve_item` or `process_download_task` - [ ] **Step 3: Add minimal executor helpers without removing current compatibility path** Extend `musicdl/catalogsync/ops/executors.py` around `DownloadStageExecutor` with helpers shaped like: ```python @dataclass class ResolvedStageDownloadTask: item_id: int playlist_id: int | None row: dict[str, object] resolved_payload: object def process_resolve_item(self, item_id: int, worker_name: str, *, ready_queue) -> None: row = self.ops_repo.build_download_row(item_id=item_id) song_id = int(row.get("id") or row.get("song_id") or 0) if song_id > 0 and self.catalog_repo.song_has_active_local_file(song_id): self.ops_repo.update_worker_state( worker_name=worker_name, current_job_item_id=item_id, status="running", current_song_id=song_id, current_playlist_id=row.get("playlist_id"), current_display_text=str(row.get("name") or row.get("id") or song_id), last_progress_text="already downloaded", ) _ensure_transition_applied( self.ops_repo.mark_item_succeeded( item_id=item_id, result_payload={"already_downloaded": True}, ), item_id=item_id, action="mark_item_succeeded", ) return resolved_payload = self.downloader.resolve_song_row( row=row, library_root=self.library_root, download_sources=self.download_sources, worker_callback=lambda **state: self.ops_repo.update_worker_state( worker_name=worker_name, current_job_item_id=item_id, status="running", **state, ), ) if resolved_payload is None: _ensure_transition_applied( self.ops_repo.mark_item_failed(item_id=item_id, error_message="resolve returned no downloadable song"), item_id=item_id, action="mark_item_failed", ) return ready_queue.put( ResolvedStageDownloadTask( item_id=item_id, playlist_id=row.get("playlist_id"), row=row, resolved_payload=resolved_payload, ) ) ``` ```python def process_download_task(self, task: ResolvedStageDownloadTask, worker_name: str) -> None: succeeded = self.downloader.download_resolved_song( resolved_payload=task.resolved_payload, worker_callback=lambda **state: self.ops_repo.update_worker_state( worker_name=worker_name, current_job_item_id=task.item_id, status="running", **state, ), ) if succeeded: _ensure_transition_applied( self.ops_repo.mark_item_succeeded(item_id=task.item_id), item_id=task.item_id, action="mark_item_succeeded", ) return _ensure_transition_applied( self.ops_repo.mark_item_failed(item_id=task.item_id, error_message="download returned no file"), item_id=task.item_id, action="mark_item_failed", ) ``` Keep `process_item(...)` as the compatibility path by delegating to `download_song_row(...)`. - [ ] **Step 4: Run the focused executor tests and verify they pass** Run: ```bash python -m pytest tests/catalogsync/test_ops_executors.py -k "process_resolve_item or process_download_task" -q ``` Expected: - `PASS` - existing `DownloadStageExecutor.process_item(...)` tests remain green - [ ] **Step 5: Commit** ```bash git add tests/catalogsync/test_ops_executors.py musicdl/catalogsync/ops/executors.py git commit -m "refactor: add staged download executor helpers" ``` ### Task 3: Lock The Runner-Level Dual-Pool Pipeline **Files:** - Modify: `tests/catalogsync/test_ops_runner.py` - Modify: `musicdl/catalogsync/ops/runner.py` - [ ] **Step 1: Write the failing runner tests for worker splitting and queue-driven execution** Add tests in `tests/catalogsync/test_ops_runner.py` for: ```python def test_download_stage_splits_workers_into_resolve_and_download_pools(): resolver_workers, download_workers = runner._download_stage_worker_split(total_workers=10) assert resolver_workers == 3 assert download_workers == 7 ``` ```python def test_download_stage_pipeline_processes_items_through_ready_queue(): processed = [] class FakeDownloadExecutor: def process_resolve_item(self, item_id, worker_name, *, ready_queue): ready_queue.put(SimpleNamespace(item_id=item_id, playlist_id=None, resolved_payload=f"payload-{item_id}")) def process_download_task(self, task, worker_name): processed.append((task.item_id, worker_name, task.resolved_payload)) repo.mark_item_succeeded(task.item_id) with patch.object(runner, "_build_executor", return_value=FakeDownloadExecutor()): runner._run_stage(job, stage) assert processed assert all(worker_name.startswith("download-") for _, worker_name, _ in processed) ``` ```python def test_download_stage_pipeline_uses_single_thread_compatibility_when_worker_count_is_one(): calls = [] class FakeDownloadExecutor: def process_item(self, item_id, worker_name, *, already_claimed=False): calls.append((item_id, worker_name, already_claimed)) repo.mark_item_succeeded(item_id) with patch.object(runner, "_build_executor", return_value=FakeDownloadExecutor()): runner._run_stage(job, stage) assert calls assert calls[0][1] == "download-1" ``` - [ ] **Step 2: Run the focused runner tests and verify they fail** Run: ```bash python -m pytest tests/catalogsync/test_ops_runner.py -k "worker_split or ready_queue or single_thread_compatibility" -q ``` Expected: - `FAILED` - failure mentions missing `_download_stage_worker_split` or the existing `_run_stage(...)` shape not matching the new behavior - [ ] **Step 3: Implement the runner-level dual-pool path for download stages** Refactor `musicdl/catalogsync/ops/runner.py` so `download` stages use a specialized path: ```python def _download_stage_worker_split(self, total_workers: int) -> tuple[int, int]: normalized_total = max(int(total_workers or 0), 1) if normalized_total == 1: return 1, 0 if normalized_total == 2: return 1, 1 resolver_workers = max(1, min(3, normalized_total // 3)) download_workers = max(1, normalized_total - resolver_workers) return resolver_workers, download_workers ``` ```python def _run_download_stage_pipeline(self, job, stage, executor, worker_count: int) -> None: resolver_workers, download_workers = self._download_stage_worker_split(worker_count) if download_workers == 0: self._run_stage_with_single_pool(job, stage, executor, worker_count) return ready_queue: Queue = Queue(maxsize=max(1, download_workers * 2)) stop_event = threading.Event() sentinel = object() def resolver_loop(worker_index: int) -> None: worker_name = f"resolve-{worker_index + 1}" while not stop_event.is_set(): active_job = self.repository.get_job(job.id) if active_job is None or active_job.status in {JobStatus.PAUSE_REQUESTED, JobStatus.CANCELED}: stop_event.set() return item = self.repository.claim_next_stage_item(stage.id, worker_name) if item is None: return executor.process_resolve_item(item.id, worker_name, ready_queue=ready_queue) def download_loop(worker_index: int) -> None: worker_name = f"download-{worker_index + 1}" while True: task = ready_queue.get() if task is sentinel: return executor.process_download_task(task, worker_name) with ThreadPoolExecutor(max_workers=resolver_workers + download_workers) as pool: resolver_futures = [pool.submit(resolver_loop, index) for index in range(resolver_workers)] download_futures = [pool.submit(download_loop, index) for index in range(download_workers)] for future in resolver_futures: future.result() for _ in range(download_workers): ready_queue.put(sentinel) for future in download_futures: future.result() ``` Then update `_run_stage(...)` to branch: ```python if refreshed_stage.stage_type == StageType.DOWNLOAD.value: self._run_download_stage_pipeline(job, refreshed_stage, executor, worker_count) else: self._run_stage_with_single_pool(job, refreshed_stage, executor, worker_count) ``` - [ ] **Step 4: Run the focused runner tests and verify they pass** Run: ```bash python -m pytest tests/catalogsync/test_ops_runner.py -k "worker_split or ready_queue or single_thread_compatibility" -q ``` Expected: - `PASS` - current worker-count tests still pass - [ ] **Step 5: Commit** ```bash git add tests/catalogsync/test_ops_runner.py musicdl/catalogsync/ops/runner.py git commit -m "feat: run download stage through dual-pool pipeline" ``` ### Task 4: Lock Dashboard And Worker-State Expectations **Files:** - Modify: `tests/catalogsync/test_ops_api.py` - Modify: `musicdl/catalogsync/ops/web.py` - Modify: `musicdl/catalogsync/ops/repository.py` - [ ] **Step 1: Write the failing dashboard regression for resolver and downloader worker families** Add a test in `tests/catalogsync/test_ops_api.py` shaped like: ```python def test_dashboard_exposes_resolver_and_downloader_workers_during_download_stage(): repo.update_worker_state( worker_name="resolve-1", current_job_item_id=item_id_a, status="running", current_song_id=song_a_id, current_display_text="Song A / Singer A", last_progress_text="resolving source qq (1/6)", ) repo.update_worker_state( worker_name="download-1", current_job_item_id=item_id_b, status="running", current_song_id=song_b_id, current_display_text="Song B / Singer B", last_progress_text="12.00MB/48.00MB", downloaded_bytes=12 * 1024 * 1024, total_bytes=48 * 1024 * 1024, speed_bytes_per_sec=3 * 1024 * 1024, progress_percent=25, ) payload = client.get("/api/dashboard?include_task_rows=false").json() worker_names = [worker["worker_name"] for worker in payload["workers"]] assert "resolve-1" in worker_names assert "download-1" in worker_names assert payload["transfer_stats"]["download_speed_bytes_per_sec"] == 3 * 1024 * 1024 ``` - [ ] **Step 2: Run the focused dashboard test and verify it fails only if payload logic needs adjustment** Run: ```bash python -m pytest tests/catalogsync/test_ops_api.py -k "resolver_and_downloader_workers" -q ``` Expected: - either `FAIL` because payload assumptions need adjustment - or `PASS`, which means no production code change is needed here - [ ] **Step 3: Apply the minimal payload or repository changes only if the test requires them** If worker lookup or naming assumptions need tightening, keep the code minimal, for example: ```python # no-op if existing worker queries already behave correctly # only adjust helper logic if it filters by a fixed "download-" prefix anywhere ``` The target is not to redesign dashboard data, only to ensure resolver workers remain visible and transfer stats still reflect downloader workers only. - [ ] **Step 4: Re-run the focused dashboard test and verify it passes** Run: ```bash python -m pytest tests/catalogsync/test_ops_api.py -k "resolver_and_downloader_workers" -q ``` Expected: - `PASS` - [ ] **Step 5: Commit** ```bash git add tests/catalogsync/test_ops_api.py musicdl/catalogsync/ops/web.py musicdl/catalogsync/ops/repository.py git commit -m "test: cover resolver and downloader worker visibility" ``` ### Task 5: Final Verification And NAS Reality Check **Files:** - Modify: `tests/catalogsync/test_services.py` - Modify: `tests/catalogsync/test_ops_executors.py` - Modify: `tests/catalogsync/test_ops_runner.py` - Modify: `tests/catalogsync/test_ops_api.py` - Modify: `musicdl/catalogsync/downloader.py` - Modify: `musicdl/catalogsync/ops/executors.py` - Modify: `musicdl/catalogsync/ops/runner.py` - [ ] **Step 1: Run the full targeted regression slice** Run: ```bash python -m pytest tests/catalogsync/test_services.py tests/catalogsync/test_ops_executors.py tests/catalogsync/test_ops_runner.py tests/catalogsync/test_ops_api.py -k "download or transfer_stats or resolver" -q ``` Expected: - `PASS` - existing download progress tests still green - new dual-pool runner tests green - [ ] **Step 2: Run a slightly wider ops regression slice** Run: ```bash python -m pytest tests/catalogsync/test_ops_repository.py tests/catalogsync/test_ops_frontend.py -k "worker or transfer or dashboard" -q ``` Expected: - `PASS` - no unexpected worker-state regressions - [ ] **Step 3: Deploy to NAS** Run: ```bash powershell -ExecutionPolicy Bypass -File .\deploy-catalogsync.ps1 ``` Expected: - deploy completes successfully - health check passes for `http://127.0.0.1:18080/dashboard` - [ ] **Step 4: Verify production behavior on NAS** Run: ```powershell $env:NAS_192168543_PASSWORD='Nie@159357' powershell -ExecutionPolicy Bypass -File 'C:\Users\Administrator\.codex\skills\nas-ssh-192168543\scripts\run.ps1' "curl -fsS http://127.0.0.1:18080/api/dashboard?include_task_rows=false | python3 -m json.tool | head -n 160" ``` Expected: - worker list includes both `resolve-*` and `download-*` - at least some `download-*` workers show non-zero transfer stats simultaneously under active load - `resolve-*` workers show `resolving source ...` text instead of pretending to be downloading - [ ] **Step 5: Commit final integration changes** ```bash git add tests/catalogsync/test_services.py tests/catalogsync/test_ops_executors.py tests/catalogsync/test_ops_runner.py tests/catalogsync/test_ops_api.py musicdl/catalogsync/downloader.py musicdl/catalogsync/ops/executors.py musicdl/catalogsync/ops/runner.py musicdl/catalogsync/ops/web.py musicdl/catalogsync/ops/repository.py git commit -m "feat: split download stage into resolver and transfer pools" ``` --- ## Self-Review ### Spec Coverage - dual-pool architecture: covered by Task 3 - downloader API split: covered by Task 1 - executor support for resolved tasks: covered by Task 2 - worker-family visibility: covered by Task 4 - NAS verification of real concurrency: covered by Task 5 No spec gaps remain for this iteration. ### Placeholder Scan - no `TODO`, `TBD`, or “implement later” placeholders remain - each code-changing task includes concrete method names, commands, and code shapes ### Type Consistency - `ResolvedDownloadPayload` is used consistently between Task 1 and Task 2 - `ResolvedStageDownloadTask` is used consistently between Task 2 and Task 3 - runner dual-pool entry point is consistently named `_run_download_stage_pipeline(...)`