26 KiB
Resolver Source Ranking Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Add a persistent resolver side database that learns fallback success rates by original source and reorders fallback sources after warmup without touching the main catalog business tables.
Architecture: Create a dedicated resolver_stats.db side database and repository, then wire MultiSourceSongResolver to ask that repository for ranked fallback order. Keep preferred-source resolution first, record only fallback attempts and successes, and continue trying later sources if the learned top two fail.
Tech Stack: Python, sqlite3, unittest, Click CLI, FastAPI ops web
File Map
- Create:
musicdl/catalogsync/resolver_stats.pyDedicated resolver side-database bootstrap, default path helper, and ranking repository. - Modify:
musicdl/catalogsync/resolver.pyPreferred-source-first resolver flow plus ranked fallback traversal and resilient stats recording. - Modify:
musicdl/catalogsync/downloader.pyConstructMultiSourceSongResolverwith aResolverStatsRepositoryderived from the main database path. - Modify:
musicdl/catalogsync/cli.pyInitialize the resolver side database during CLI app startup andinit-db. - Modify:
musicdl/catalogsync/ops/web.pyInitialize the resolver side database during web app startup. - Create:
tests/catalogsync/test_resolver_stats.pyUnit tests for side-database schema, warmup, ranking, and grouping. - Modify:
tests/catalogsync/test_resolver.pyIntegration-style resolver tests for warmup behavior, ranked top-two traversal, continuation, and graceful fallback. - Modify:
tests/catalogsync/test_cli.pyCLI startup tests for side-database creation. - Modify:
tests/catalogsync/test_ops_api.pyWeb startup test for side-database creation.
Task 1: Add The Resolver Stats Side Database
Files:
-
Create:
musicdl/catalogsync/resolver_stats.py -
Create:
tests/catalogsync/test_resolver_stats.py -
Step 1: Write the failing side-database tests
import tempfile
import unittest
from pathlib import Path
class ResolverStatsRepositoryTests(unittest.TestCase):
def test_initialize_resolver_stats_database_creates_stats_table(self):
from musicdl.catalogsync.resolver_stats import initialize_resolver_stats_database
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "resolver_stats.db"
conn = initialize_resolver_stats_database(db_path)
try:
table_names = {
row["name"]
for row in conn.execute(
"SELECT name FROM sqlite_master WHERE type = 'table'"
).fetchall()
}
finally:
conn.close()
self.assertIn("resolver_source_stats", table_names)
def test_rank_fallback_sources_keeps_config_order_before_warmup(self):
from musicdl.catalogsync.resolver_stats import ResolverStatsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
repo = ResolverStatsRepository(Path(tmpdir) / "resolver_stats.db")
repo.record_fallback_result("qq", "kuwo", succeeded=True)
ranked = repo.rank_fallback_sources(
"qq",
["kuwo", "migu", "qianqian"],
warmup_attempts=1000,
)
self.assertEqual(["kuwo", "migu", "qianqian"], ranked)
def test_rank_fallback_sources_reorders_after_warmup_per_origin_source(self):
from musicdl.catalogsync.resolver_stats import ResolverStatsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
repo = ResolverStatsRepository(Path(tmpdir) / "resolver_stats.db")
for _ in range(800):
repo.record_fallback_result("qq", "migu", succeeded=True)
for _ in range(200):
repo.record_fallback_result("qq", "kuwo", succeeded=False)
ranked = repo.rank_fallback_sources(
"qq",
["kuwo", "migu", "qianqian"],
warmup_attempts=1000,
)
self.assertEqual(["migu", "kuwo", "qianqian"], ranked)
- Step 2: Run the focused side-database tests and verify they fail
Run: python -m pytest tests/catalogsync/test_resolver_stats.py -q
Expected: ModuleNotFoundError or missing symbol failures for musicdl.catalogsync.resolver_stats.
- Step 3: Write the minimal side-database implementation
from __future__ import annotations
import sqlite3
from contextlib import suppress
from pathlib import Path
SQLITE_BUSY_TIMEOUT_MS = 30000
RESOLVER_FALLBACK_WARMUP_ATTEMPTS = 1000
SCHEMA_STATEMENTS = [
"""
CREATE TABLE IF NOT EXISTS resolver_source_stats (
origin_source TEXT NOT NULL,
candidate_source TEXT NOT NULL,
attempt_count INTEGER NOT NULL DEFAULT 0,
resolve_success_count INTEGER NOT NULL DEFAULT 0,
last_attempt_at TEXT,
last_success_at TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(origin_source, candidate_source)
)
""",
"""
CREATE INDEX IF NOT EXISTS idx_resolver_source_stats_origin
ON resolver_source_stats (origin_source)
""",
]
def default_resolver_stats_db_path(db_path: str | Path) -> Path:
return Path(db_path).resolve().with_name("resolver_stats.db")
def connect_resolver_stats_database(db_path: str | Path) -> sqlite3.Connection:
path = Path(db_path)
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(path, timeout=SQLITE_BUSY_TIMEOUT_MS / 1000)
conn.row_factory = sqlite3.Row
conn.execute(f"PRAGMA busy_timeout = {SQLITE_BUSY_TIMEOUT_MS}")
with suppress(sqlite3.OperationalError):
conn.execute("PRAGMA journal_mode = WAL")
with suppress(sqlite3.OperationalError):
conn.execute("PRAGMA synchronous = NORMAL")
return conn
def initialize_resolver_stats_database(db_path: str | Path) -> sqlite3.Connection:
conn = connect_resolver_stats_database(db_path)
for statement in SCHEMA_STATEMENTS:
conn.execute(statement)
conn.commit()
return conn
class ResolverStatsRepository:
def __init__(self, db_path: str | Path):
self.db_path = Path(db_path)
initialize_resolver_stats_database(self.db_path).close()
def record_fallback_result(self, origin_source: str, candidate_source: str, *, succeeded: bool) -> None:
with connect_resolver_stats_database(self.db_path) as conn:
conn.execute(
"""
INSERT INTO resolver_source_stats (
origin_source, candidate_source, attempt_count, resolve_success_count,
last_attempt_at, last_success_at
)
VALUES (?, ?, 1, ?, CURRENT_TIMESTAMP, CASE WHEN ? THEN CURRENT_TIMESTAMP ELSE NULL END)
ON CONFLICT(origin_source, candidate_source) DO UPDATE SET
attempt_count = attempt_count + 1,
resolve_success_count = resolve_success_count + excluded.resolve_success_count,
last_attempt_at = CURRENT_TIMESTAMP,
last_success_at = CASE
WHEN excluded.resolve_success_count > 0 THEN CURRENT_TIMESTAMP
ELSE resolver_source_stats.last_success_at
END,
updated_at = CURRENT_TIMESTAMP
""",
(origin_source, candidate_source, 1 if succeeded else 0, 1 if succeeded else 0),
)
def rank_fallback_sources(
self,
origin_source: str,
fallback_sources: list[str],
*,
warmup_attempts: int = RESOLVER_FALLBACK_WARMUP_ATTEMPTS,
) -> list[str]:
ordered = list(fallback_sources)
if not ordered:
return []
with connect_resolver_stats_database(self.db_path) as conn:
rows = conn.execute(
"""
SELECT candidate_source, attempt_count, resolve_success_count
FROM resolver_source_stats
WHERE origin_source = ?
""",
(origin_source,),
).fetchall()
total_attempts = sum(int(row["attempt_count"]) for row in rows)
if total_attempts < warmup_attempts:
return ordered
stats = {
str(row["candidate_source"]): (
(int(row["resolve_success_count"]) + 1) / (int(row["attempt_count"]) + 2)
)
for row in rows
}
return sorted(ordered, key=lambda source: (-stats.get(source, 0.5), ordered.index(source)))
- Step 4: Run the side-database tests and verify they pass
Run: python -m pytest tests/catalogsync/test_resolver_stats.py -q
Expected: 3 passed
- Step 5: Commit the side-database foundation
git add musicdl/catalogsync/resolver_stats.py tests/catalogsync/test_resolver_stats.py
git commit -m "feat: add resolver stats side database"
Task 2: Teach The Resolver To Use Ranked Fallback Sources
Files:
-
Modify:
musicdl/catalogsync/resolver.py -
Modify:
musicdl/catalogsync/downloader.py -
Modify:
tests/catalogsync/test_resolver.py -
Step 1: Write the failing resolver behavior tests
def test_resolver_uses_ranked_top_two_fallback_sources_after_warmup(self):
from musicdl.catalogsync.resolver import MultiSourceSongResolver
from musicdl.modules.utils.data import SongInfo
class FakeStatsRepo:
def rank_fallback_sources(self, origin_source, fallback_sources, warmup_attempts=1000):
self.rank_call = (origin_source, list(fallback_sources), warmup_attempts)
return ["migu", "kuwo", "qianqian"]
def record_fallback_result(self, origin_source, candidate_source, *, succeeded):
self.records.append((origin_source, candidate_source, succeeded))
def __init__(self):
self.records = []
class FakeClient:
def __init__(self, source, result=None, calls=None):
self.source = source
self.result = list(result or [])
self.calls = calls
def search(self, keyword, num_threadings=1, request_overrides=None, rule=None, main_process_context=None):
self.calls.append(self.source)
return list(self.result)
snapshot = SongInfo(
source="QQMusicClient",
identifier="song-1",
song_name="Song 1",
singers="Singer 1",
raw_data={"search": {"id": "song-1"}},
download_url=None,
download_url_status={},
)
migu_hit = SongInfo(
source="MiguMusicClient",
identifier="migu-song-1",
song_name="Song 1",
singers="Singer 1",
ext="mp3",
download_url="https://example.com/song-1.mp3",
download_url_status={"ok": True},
)
search_calls = []
stats_repo = FakeStatsRepo()
resolver = MultiSourceSongResolver(
client_factory=lambda platform: {
"qq": FakeClient("qq", [], search_calls),
"kuwo": FakeClient("kuwo", [], search_calls),
"migu": FakeClient("migu", [migu_hit], search_calls),
"qianqian": FakeClient("qianqian", [], search_calls),
}[platform],
resolver_stats_repo=stats_repo,
)
resolved = resolver.resolve_song_info(
row={"platform": "qq", "name": "Song 1", "singers": "Singer 1", "remote_song_id": "song-1"},
snapshot_song_info=snapshot,
download_sources=["qq", "kuwo", "migu", "qianqian"],
)
self.assertEqual(["qq", "migu"], search_calls)
self.assertEqual(
[("qq", "migu", True)],
stats_repo.records,
)
self.assertEqual("MiguMusicClient", resolved.source)
def test_resolver_continues_after_ranked_top_two_fail(self):
from musicdl.catalogsync.resolver import MultiSourceSongResolver
from musicdl.modules.utils.data import SongInfo
class FakeStatsRepo:
def rank_fallback_sources(self, origin_source, fallback_sources, warmup_attempts=1000):
return ["migu", "kuwo", "qianqian"]
def record_fallback_result(self, origin_source, candidate_source, *, succeeded):
self.records.append((candidate_source, succeeded))
def __init__(self):
self.records = []
class FakeClient:
def __init__(self, source, result, calls):
self.source = source
self.result = list(result)
self.calls = calls
def search(self, keyword, num_threadings=1, request_overrides=None, rule=None, main_process_context=None):
self.calls.append(self.source)
return list(self.result)
snapshot = SongInfo(
source="QQMusicClient",
identifier="song-2",
song_name="Song 2",
singers="Singer 2",
raw_data={"search": {"id": "song-2"}},
download_url=None,
download_url_status={},
)
qianqian_hit = SongInfo(
source="QianqianMusicClient",
identifier="qianqian-song-2",
song_name="Song 2",
singers="Singer 2",
ext="mp3",
download_url="https://example.com/song-2.mp3",
download_url_status={"ok": True},
)
calls = []
stats_repo = FakeStatsRepo()
resolver = MultiSourceSongResolver(
client_factory=lambda platform: {
"qq": FakeClient("qq", [], calls),
"migu": FakeClient("migu", [], calls),
"kuwo": FakeClient("kuwo", [], calls),
"qianqian": FakeClient("qianqian", [qianqian_hit], calls),
}[platform],
resolver_stats_repo=stats_repo,
)
resolved = resolver.resolve_song_info(
row={"platform": "qq", "name": "Song 2", "singers": "Singer 2", "remote_song_id": "song-2"},
snapshot_song_info=snapshot,
download_sources=["qq", "kuwo", "migu", "qianqian"],
)
self.assertEqual(["qq", "migu", "kuwo", "qianqian"], calls)
self.assertEqual("QianqianMusicClient", resolved.source)
- Step 2: Run the focused resolver tests and verify they fail
Run: python -m pytest tests/catalogsync/test_resolver.py -q
Expected: failures for unexpected source order and missing resolver_stats_repo support.
- Step 3: Write the minimal resolver and downloader integration
class MultiSourceSongResolver:
def __init__(
self,
client_factory,
request_overrides_factory=None,
resolver_stats_repo=None,
warmup_attempts: int = RESOLVER_FALLBACK_WARMUP_ATTEMPTS,
):
self.client_factory = client_factory
self.request_overrides_factory = request_overrides_factory or (lambda timeout: {"timeout": timeout})
self.resolver_stats_repo = resolver_stats_repo
self.warmup_attempts = int(warmup_attempts)
def _rank_fallback_sources(self, origin_source: str, fallback_sources: list[str]) -> list[str]:
if self.resolver_stats_repo is None:
return list(fallback_sources)
try:
return self.resolver_stats_repo.rank_fallback_sources(
origin_source,
list(fallback_sources),
warmup_attempts=self.warmup_attempts,
)
except Exception:
return list(fallback_sources)
def _record_fallback_result(self, origin_source: str, candidate_source: str, *, succeeded: bool) -> None:
if self.resolver_stats_repo is None:
return
try:
self.resolver_stats_repo.record_fallback_result(
origin_source,
candidate_source,
succeeded=succeeded,
)
except Exception:
return
def resolve_song_info(self, row, snapshot_song_info, download_sources=None, progress_callback=None):
target_song_info = self._build_target_song_info(row=row, snapshot_song_info=snapshot_song_info)
preferred_source = normalize_source_name(getattr(target_song_info, "source", None) or row.get("platform"))
ordered_sources = dedupe_preserve_order(list(download_sources or DEFAULT_DOWNLOAD_SOURCES))
fallback_sources = [source for source in ordered_sources if source != preferred_source]
ranked_fallback_sources = self._rank_fallback_sources(preferred_source, fallback_sources)
candidate_rows = []
if preferred_source in ordered_sources:
self._emit_progress(progress_callback, f"resolving source {preferred_source} (1/{len(ordered_sources)})")
client = self.client_factory(preferred_source)
refreshed_song = self._refresh_song_info(client, target_song_info)
if self._has_valid_download_url(refreshed_song):
merged_refreshed = merge_resolved_song_info(target_song_info, refreshed_song)
refreshed_match_priority = song_info_match_priority(merged_refreshed, target_song_info)
candidate_rows.append((merged_refreshed, refreshed_match_priority, 0))
if is_high_confidence_match(refreshed_match_priority):
return merged_refreshed
search_candidates = self._search_source_candidates(preferred_source, build_resolve_keyword(target_song_info, row))
best_candidate = self._pick_best_candidate(search_candidates, target_song_info, 0)
if best_candidate is not None:
merged_candidate = merge_resolved_song_info(target_song_info, best_candidate)
match_priority = song_info_match_priority(merged_candidate, target_song_info)
candidate_rows.append((merged_candidate, match_priority, 0))
if is_high_confidence_match(match_priority):
return merged_candidate
for offset, source in enumerate(ranked_fallback_sources, start=2):
self._emit_progress(progress_callback, f"resolving source {source} ({offset}/{len(ordered_sources)})")
search_candidates = self._search_source_candidates(source, build_resolve_keyword(target_song_info, row))
best_candidate = self._pick_best_candidate(search_candidates, target_song_info, offset)
succeeded = best_candidate is not None
self._record_fallback_result(preferred_source, source, succeeded=succeeded)
if not succeeded:
continue
return merge_resolved_song_info(target_song_info, best_candidate)
if candidate_rows:
candidate_rows.sort(key=lambda item: (match_priority_group(item[1]), search_result_quality_group(item[0]), -candidate_file_size_bytes(item[0]), item[2], item[1]))
return candidate_rows[0][0]
return target_song_info
class CatalogDownloader:
def __init__(self, repository, work_dir="musicdl_outputs/catalogsync", worker_count=DEFAULT_DOWNLOAD_WORKERS):
self.repository = repository
resolver_stats_repo = ResolverStatsRepository(default_resolver_stats_db_path(self.repository.db_path))
self._resolver = MultiSourceSongResolver(
client_factory=lambda platform: self.get_client(platform),
request_overrides_factory=lambda timeout: self._request_overrides(timeout),
resolver_stats_repo=resolver_stats_repo,
)
- Step 4: Run the resolver-focused tests and verify they pass
Run: python -m pytest tests/catalogsync/test_resolver.py tests/catalogsync/test_resolver_stats.py -q
Expected: all resolver and resolver-stats tests pass.
- Step 5: Commit the ranked resolver behavior
git add musicdl/catalogsync/resolver.py musicdl/catalogsync/downloader.py tests/catalogsync/test_resolver.py tests/catalogsync/test_resolver_stats.py
git commit -m "feat: rank resolver fallback sources by origin"
Task 3: Initialize The Side Database At Startup Boundaries
Files:
-
Modify:
musicdl/catalogsync/cli.py -
Modify:
musicdl/catalogsync/ops/web.py -
Modify:
tests/catalogsync/test_cli.py -
Modify:
tests/catalogsync/test_ops_api.py -
Step 1: Write the failing startup initialization tests
def test_init_db_command_creates_resolver_stats_side_db(self):
from musicdl.catalogsync.cli import cli
runner = CliRunner()
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
side_db_path = Path(tmpdir) / "resolver_stats.db"
library_root = Path(tmpdir) / "library"
result = runner.invoke(
cli,
["init-db", "--db", str(db_path), "--library-root", str(library_root)],
)
self.assertEqual(0, result.exit_code, msg=result.output)
self.assertTrue(side_db_path.exists())
def test_create_app_initializes_resolver_stats_side_db(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.web import create_app
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
root = Path(tmpdir)
db_path = root / "catalogsync.db"
env_path = root / "catalogsync.env"
side_db_path = root / "resolver_stats.db"
env_path.write_text("ROOT_DIR=/music\nDOWNLOAD_SOURCES=qq\n", encoding="utf-8")
initialize_database(db_path).close()
app = create_app(db_path=db_path, env_path=env_path)
self.assertIsNotNone(app)
self.assertTrue(side_db_path.exists())
- Step 2: Run the startup tests and verify they fail
Run: python -m pytest tests/catalogsync/test_cli.py -k "resolver_stats_side_db" tests/catalogsync/test_ops_api.py -k "resolver_stats_side_db" -q
Expected: assertions fail because resolver_stats.db is not created yet.
- Step 3: Wire startup initialization to the side database
from .resolver_stats import default_resolver_stats_db_path, initialize_resolver_stats_database
class CatalogSyncApplication:
def __init__(self, db_path: str, library_root: str | None = None):
self.db_path = db_path
self.library_root = library_root
initialize_database(db_path, default_library_root=library_root).close()
initialize_resolver_stats_database(default_resolver_stats_db_path(db_path)).close()
self.repository = CatalogRepository(db_path)
self.service = CatalogSyncService(self.repository)
self.downloader = CatalogDownloader(self.repository)
def init_db(self):
initialize_database(self.db_path, default_library_root=self.library_root).close()
initialize_resolver_stats_database(default_resolver_stats_db_path(self.db_path)).close()
def create_app(db_path: str | Path, env_path: str | Path, *, start_runner: bool = False, runner_sleep_seconds: float = 1.0) -> FastAPI:
db_file = Path(db_path)
initialize_database(db_file).close()
initialize_resolver_stats_database(default_resolver_stats_db_path(db_file)).close()
...
- Step 4: Run the startup tests and verify they pass
Run: python -m pytest tests/catalogsync/test_cli.py -k "resolver_stats_side_db" tests/catalogsync/test_ops_api.py -k "resolver_stats_side_db" -q
Expected: 2 passed
- Step 5: Commit the startup wiring
git add musicdl/catalogsync/cli.py musicdl/catalogsync/ops/web.py tests/catalogsync/test_cli.py tests/catalogsync/test_ops_api.py
git commit -m "feat: initialize resolver stats database on startup"
Task 4: Run Full Verification And NAS Validation
Files:
-
Modify:
musicdl/catalogsync/resolver.py -
Modify:
musicdl/catalogsync/downloader.py -
Modify:
musicdl/catalogsync/cli.py -
Modify:
musicdl/catalogsync/ops/web.py -
Modify:
musicdl/catalogsync/resolver_stats.py -
Modify:
tests/catalogsync/test_resolver.py -
Modify:
tests/catalogsync/test_resolver_stats.py -
Modify:
tests/catalogsync/test_cli.py -
Modify:
tests/catalogsync/test_ops_api.py -
Step 1: Run the full local regression slice
Run: python -m pytest tests/catalogsync/test_resolver_stats.py tests/catalogsync/test_resolver.py tests/catalogsync/test_cli.py tests/catalogsync/test_services.py tests/catalogsync/test_ops_executors.py tests/catalogsync/test_ops_runner.py tests/catalogsync/test_ops_api.py tests/catalogsync/test_runtime.py -q
Expected: all tests pass, with only the existing known warning if it still appears.
- Step 2: Deploy to NAS
Run: powershell -ExecutionPolicy Bypass -File .\deploy-catalogsync.ps1
Expected: deploy completes successfully, health check passes, and single-instance check passes.
- Step 3: Sample NAS dashboard and confirm dual-download bursts still appear
Run:
powershell -ExecutionPolicy Bypass -File 'C:\Users\Administrator\.codex\skills\nas-ssh-192168543\scripts\run.ps1' "python3 - <<'PY'
import json, urllib.request
with urllib.request.urlopen('http://127.0.0.1:18080/api/dashboard?include_task_rows=false', timeout=10) as resp:
data = json.load(resp)
print(json.dumps({
'downloaded_songs': data['download_stats']['downloaded_songs'],
'speed_bps': data['transfer_stats']['download_speed_bytes_per_sec'],
'workers': [w['worker_name'] for w in data['workers'] if w.get('status') == 'running'],
}, ensure_ascii=False))
PY"
Expected: running workers still include download-1 and download-2 during active bursts, and downloaded_songs continues increasing.
- Step 4: Commit the verified end-to-end implementation
git add musicdl/catalogsync/resolver_stats.py musicdl/catalogsync/resolver.py musicdl/catalogsync/downloader.py musicdl/catalogsync/cli.py musicdl/catalogsync/ops/web.py tests/catalogsync/test_resolver_stats.py tests/catalogsync/test_resolver.py tests/catalogsync/test_cli.py tests/catalogsync/test_ops_api.py
git commit -m "feat: persist resolver fallback source rankings"