Files

26 KiB

Resolver Source Ranking Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Add a persistent resolver side database that learns fallback success rates by original source and reorders fallback sources after warmup without touching the main catalog business tables.

Architecture: Create a dedicated resolver_stats.db side database and repository, then wire MultiSourceSongResolver to ask that repository for ranked fallback order. Keep preferred-source resolution first, record only fallback attempts and successes, and continue trying later sources if the learned top two fail.

Tech Stack: Python, sqlite3, unittest, Click CLI, FastAPI ops web


File Map

  • Create: musicdl/catalogsync/resolver_stats.py Dedicated resolver side-database bootstrap, default path helper, and ranking repository.
  • Modify: musicdl/catalogsync/resolver.py Preferred-source-first resolver flow plus ranked fallback traversal and resilient stats recording.
  • Modify: musicdl/catalogsync/downloader.py Construct MultiSourceSongResolver with a ResolverStatsRepository derived from the main database path.
  • Modify: musicdl/catalogsync/cli.py Initialize the resolver side database during CLI app startup and init-db.
  • Modify: musicdl/catalogsync/ops/web.py Initialize the resolver side database during web app startup.
  • Create: tests/catalogsync/test_resolver_stats.py Unit tests for side-database schema, warmup, ranking, and grouping.
  • Modify: tests/catalogsync/test_resolver.py Integration-style resolver tests for warmup behavior, ranked top-two traversal, continuation, and graceful fallback.
  • Modify: tests/catalogsync/test_cli.py CLI startup tests for side-database creation.
  • Modify: tests/catalogsync/test_ops_api.py Web startup test for side-database creation.

Task 1: Add The Resolver Stats Side Database

Files:

  • Create: musicdl/catalogsync/resolver_stats.py

  • Create: tests/catalogsync/test_resolver_stats.py

  • Step 1: Write the failing side-database tests

import tempfile
import unittest
from pathlib import Path


class ResolverStatsRepositoryTests(unittest.TestCase):
    def test_initialize_resolver_stats_database_creates_stats_table(self):
        from musicdl.catalogsync.resolver_stats import initialize_resolver_stats_database

        with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
            db_path = Path(tmpdir) / "resolver_stats.db"
            conn = initialize_resolver_stats_database(db_path)
            try:
                table_names = {
                    row["name"]
                    for row in conn.execute(
                        "SELECT name FROM sqlite_master WHERE type = 'table'"
                    ).fetchall()
                }
            finally:
                conn.close()

        self.assertIn("resolver_source_stats", table_names)

    def test_rank_fallback_sources_keeps_config_order_before_warmup(self):
        from musicdl.catalogsync.resolver_stats import ResolverStatsRepository

        with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
            repo = ResolverStatsRepository(Path(tmpdir) / "resolver_stats.db")
            repo.record_fallback_result("qq", "kuwo", succeeded=True)
            ranked = repo.rank_fallback_sources(
                "qq",
                ["kuwo", "migu", "qianqian"],
                warmup_attempts=1000,
            )

        self.assertEqual(["kuwo", "migu", "qianqian"], ranked)

    def test_rank_fallback_sources_reorders_after_warmup_per_origin_source(self):
        from musicdl.catalogsync.resolver_stats import ResolverStatsRepository

        with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
            repo = ResolverStatsRepository(Path(tmpdir) / "resolver_stats.db")
            for _ in range(800):
                repo.record_fallback_result("qq", "migu", succeeded=True)
            for _ in range(200):
                repo.record_fallback_result("qq", "kuwo", succeeded=False)
            ranked = repo.rank_fallback_sources(
                "qq",
                ["kuwo", "migu", "qianqian"],
                warmup_attempts=1000,
            )

        self.assertEqual(["migu", "kuwo", "qianqian"], ranked)
  • Step 2: Run the focused side-database tests and verify they fail

Run: python -m pytest tests/catalogsync/test_resolver_stats.py -q

Expected: ModuleNotFoundError or missing symbol failures for musicdl.catalogsync.resolver_stats.

  • Step 3: Write the minimal side-database implementation
from __future__ import annotations

import sqlite3
from contextlib import suppress
from pathlib import Path

SQLITE_BUSY_TIMEOUT_MS = 30000
RESOLVER_FALLBACK_WARMUP_ATTEMPTS = 1000

SCHEMA_STATEMENTS = [
    """
    CREATE TABLE IF NOT EXISTS resolver_source_stats (
        origin_source TEXT NOT NULL,
        candidate_source TEXT NOT NULL,
        attempt_count INTEGER NOT NULL DEFAULT 0,
        resolve_success_count INTEGER NOT NULL DEFAULT 0,
        last_attempt_at TEXT,
        last_success_at TEXT,
        created_at TEXT DEFAULT CURRENT_TIMESTAMP,
        updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
        PRIMARY KEY(origin_source, candidate_source)
    )
    """,
    """
    CREATE INDEX IF NOT EXISTS idx_resolver_source_stats_origin
    ON resolver_source_stats (origin_source)
    """,
]


def default_resolver_stats_db_path(db_path: str | Path) -> Path:
    return Path(db_path).resolve().with_name("resolver_stats.db")


def connect_resolver_stats_database(db_path: str | Path) -> sqlite3.Connection:
    path = Path(db_path)
    path.parent.mkdir(parents=True, exist_ok=True)
    conn = sqlite3.connect(path, timeout=SQLITE_BUSY_TIMEOUT_MS / 1000)
    conn.row_factory = sqlite3.Row
    conn.execute(f"PRAGMA busy_timeout = {SQLITE_BUSY_TIMEOUT_MS}")
    with suppress(sqlite3.OperationalError):
        conn.execute("PRAGMA journal_mode = WAL")
    with suppress(sqlite3.OperationalError):
        conn.execute("PRAGMA synchronous = NORMAL")
    return conn


def initialize_resolver_stats_database(db_path: str | Path) -> sqlite3.Connection:
    conn = connect_resolver_stats_database(db_path)
    for statement in SCHEMA_STATEMENTS:
        conn.execute(statement)
    conn.commit()
    return conn


class ResolverStatsRepository:
    def __init__(self, db_path: str | Path):
        self.db_path = Path(db_path)
        initialize_resolver_stats_database(self.db_path).close()

    def record_fallback_result(self, origin_source: str, candidate_source: str, *, succeeded: bool) -> None:
        with connect_resolver_stats_database(self.db_path) as conn:
            conn.execute(
                """
                INSERT INTO resolver_source_stats (
                    origin_source, candidate_source, attempt_count, resolve_success_count,
                    last_attempt_at, last_success_at
                )
                VALUES (?, ?, 1, ?, CURRENT_TIMESTAMP, CASE WHEN ? THEN CURRENT_TIMESTAMP ELSE NULL END)
                ON CONFLICT(origin_source, candidate_source) DO UPDATE SET
                    attempt_count = attempt_count + 1,
                    resolve_success_count = resolve_success_count + excluded.resolve_success_count,
                    last_attempt_at = CURRENT_TIMESTAMP,
                    last_success_at = CASE
                        WHEN excluded.resolve_success_count > 0 THEN CURRENT_TIMESTAMP
                        ELSE resolver_source_stats.last_success_at
                    END,
                    updated_at = CURRENT_TIMESTAMP
                """,
                (origin_source, candidate_source, 1 if succeeded else 0, 1 if succeeded else 0),
            )

    def rank_fallback_sources(
        self,
        origin_source: str,
        fallback_sources: list[str],
        *,
        warmup_attempts: int = RESOLVER_FALLBACK_WARMUP_ATTEMPTS,
    ) -> list[str]:
        ordered = list(fallback_sources)
        if not ordered:
            return []
        with connect_resolver_stats_database(self.db_path) as conn:
            rows = conn.execute(
                """
                SELECT candidate_source, attempt_count, resolve_success_count
                FROM resolver_source_stats
                WHERE origin_source = ?
                """,
                (origin_source,),
            ).fetchall()
        total_attempts = sum(int(row["attempt_count"]) for row in rows)
        if total_attempts < warmup_attempts:
            return ordered
        stats = {
            str(row["candidate_source"]): (
                (int(row["resolve_success_count"]) + 1) / (int(row["attempt_count"]) + 2)
            )
            for row in rows
        }
        return sorted(ordered, key=lambda source: (-stats.get(source, 0.5), ordered.index(source)))
  • Step 4: Run the side-database tests and verify they pass

Run: python -m pytest tests/catalogsync/test_resolver_stats.py -q

Expected: 3 passed

  • Step 5: Commit the side-database foundation
git add musicdl/catalogsync/resolver_stats.py tests/catalogsync/test_resolver_stats.py
git commit -m "feat: add resolver stats side database"

Task 2: Teach The Resolver To Use Ranked Fallback Sources

Files:

  • Modify: musicdl/catalogsync/resolver.py

  • Modify: musicdl/catalogsync/downloader.py

  • Modify: tests/catalogsync/test_resolver.py

  • Step 1: Write the failing resolver behavior tests

def test_resolver_uses_ranked_top_two_fallback_sources_after_warmup(self):
    from musicdl.catalogsync.resolver import MultiSourceSongResolver
    from musicdl.modules.utils.data import SongInfo

    class FakeStatsRepo:
        def rank_fallback_sources(self, origin_source, fallback_sources, warmup_attempts=1000):
            self.rank_call = (origin_source, list(fallback_sources), warmup_attempts)
            return ["migu", "kuwo", "qianqian"]

        def record_fallback_result(self, origin_source, candidate_source, *, succeeded):
            self.records.append((origin_source, candidate_source, succeeded))

        def __init__(self):
            self.records = []

    class FakeClient:
        def __init__(self, source, result=None, calls=None):
            self.source = source
            self.result = list(result or [])
            self.calls = calls

        def search(self, keyword, num_threadings=1, request_overrides=None, rule=None, main_process_context=None):
            self.calls.append(self.source)
            return list(self.result)

    snapshot = SongInfo(
        source="QQMusicClient",
        identifier="song-1",
        song_name="Song 1",
        singers="Singer 1",
        raw_data={"search": {"id": "song-1"}},
        download_url=None,
        download_url_status={},
    )
    migu_hit = SongInfo(
        source="MiguMusicClient",
        identifier="migu-song-1",
        song_name="Song 1",
        singers="Singer 1",
        ext="mp3",
        download_url="https://example.com/song-1.mp3",
        download_url_status={"ok": True},
    )
    search_calls = []
    stats_repo = FakeStatsRepo()
    resolver = MultiSourceSongResolver(
        client_factory=lambda platform: {
            "qq": FakeClient("qq", [], search_calls),
            "kuwo": FakeClient("kuwo", [], search_calls),
            "migu": FakeClient("migu", [migu_hit], search_calls),
            "qianqian": FakeClient("qianqian", [], search_calls),
        }[platform],
        resolver_stats_repo=stats_repo,
    )

    resolved = resolver.resolve_song_info(
        row={"platform": "qq", "name": "Song 1", "singers": "Singer 1", "remote_song_id": "song-1"},
        snapshot_song_info=snapshot,
        download_sources=["qq", "kuwo", "migu", "qianqian"],
    )

    self.assertEqual(["qq", "migu"], search_calls)
    self.assertEqual(
        [("qq", "migu", True)],
        stats_repo.records,
    )
    self.assertEqual("MiguMusicClient", resolved.source)

def test_resolver_continues_after_ranked_top_two_fail(self):
    from musicdl.catalogsync.resolver import MultiSourceSongResolver
    from musicdl.modules.utils.data import SongInfo

    class FakeStatsRepo:
        def rank_fallback_sources(self, origin_source, fallback_sources, warmup_attempts=1000):
            return ["migu", "kuwo", "qianqian"]

        def record_fallback_result(self, origin_source, candidate_source, *, succeeded):
            self.records.append((candidate_source, succeeded))

        def __init__(self):
            self.records = []

    class FakeClient:
        def __init__(self, source, result, calls):
            self.source = source
            self.result = list(result)
            self.calls = calls

        def search(self, keyword, num_threadings=1, request_overrides=None, rule=None, main_process_context=None):
            self.calls.append(self.source)
            return list(self.result)

    snapshot = SongInfo(
        source="QQMusicClient",
        identifier="song-2",
        song_name="Song 2",
        singers="Singer 2",
        raw_data={"search": {"id": "song-2"}},
        download_url=None,
        download_url_status={},
    )
    qianqian_hit = SongInfo(
        source="QianqianMusicClient",
        identifier="qianqian-song-2",
        song_name="Song 2",
        singers="Singer 2",
        ext="mp3",
        download_url="https://example.com/song-2.mp3",
        download_url_status={"ok": True},
    )
    calls = []
    stats_repo = FakeStatsRepo()
    resolver = MultiSourceSongResolver(
        client_factory=lambda platform: {
            "qq": FakeClient("qq", [], calls),
            "migu": FakeClient("migu", [], calls),
            "kuwo": FakeClient("kuwo", [], calls),
            "qianqian": FakeClient("qianqian", [qianqian_hit], calls),
        }[platform],
        resolver_stats_repo=stats_repo,
    )

    resolved = resolver.resolve_song_info(
        row={"platform": "qq", "name": "Song 2", "singers": "Singer 2", "remote_song_id": "song-2"},
        snapshot_song_info=snapshot,
        download_sources=["qq", "kuwo", "migu", "qianqian"],
    )

    self.assertEqual(["qq", "migu", "kuwo", "qianqian"], calls)
    self.assertEqual("QianqianMusicClient", resolved.source)
  • Step 2: Run the focused resolver tests and verify they fail

Run: python -m pytest tests/catalogsync/test_resolver.py -q

Expected: failures for unexpected source order and missing resolver_stats_repo support.

  • Step 3: Write the minimal resolver and downloader integration
class MultiSourceSongResolver:
    def __init__(
        self,
        client_factory,
        request_overrides_factory=None,
        resolver_stats_repo=None,
        warmup_attempts: int = RESOLVER_FALLBACK_WARMUP_ATTEMPTS,
    ):
        self.client_factory = client_factory
        self.request_overrides_factory = request_overrides_factory or (lambda timeout: {"timeout": timeout})
        self.resolver_stats_repo = resolver_stats_repo
        self.warmup_attempts = int(warmup_attempts)

    def _rank_fallback_sources(self, origin_source: str, fallback_sources: list[str]) -> list[str]:
        if self.resolver_stats_repo is None:
            return list(fallback_sources)
        try:
            return self.resolver_stats_repo.rank_fallback_sources(
                origin_source,
                list(fallback_sources),
                warmup_attempts=self.warmup_attempts,
            )
        except Exception:
            return list(fallback_sources)

    def _record_fallback_result(self, origin_source: str, candidate_source: str, *, succeeded: bool) -> None:
        if self.resolver_stats_repo is None:
            return
        try:
            self.resolver_stats_repo.record_fallback_result(
                origin_source,
                candidate_source,
                succeeded=succeeded,
            )
        except Exception:
            return

    def resolve_song_info(self, row, snapshot_song_info, download_sources=None, progress_callback=None):
        target_song_info = self._build_target_song_info(row=row, snapshot_song_info=snapshot_song_info)
        preferred_source = normalize_source_name(getattr(target_song_info, "source", None) or row.get("platform"))
        ordered_sources = dedupe_preserve_order(list(download_sources or DEFAULT_DOWNLOAD_SOURCES))
        fallback_sources = [source for source in ordered_sources if source != preferred_source]
        ranked_fallback_sources = self._rank_fallback_sources(preferred_source, fallback_sources)

        candidate_rows = []
        if preferred_source in ordered_sources:
            self._emit_progress(progress_callback, f"resolving source {preferred_source} (1/{len(ordered_sources)})")
            client = self.client_factory(preferred_source)
            refreshed_song = self._refresh_song_info(client, target_song_info)
            if self._has_valid_download_url(refreshed_song):
                merged_refreshed = merge_resolved_song_info(target_song_info, refreshed_song)
                refreshed_match_priority = song_info_match_priority(merged_refreshed, target_song_info)
                candidate_rows.append((merged_refreshed, refreshed_match_priority, 0))
                if is_high_confidence_match(refreshed_match_priority):
                    return merged_refreshed
            search_candidates = self._search_source_candidates(preferred_source, build_resolve_keyword(target_song_info, row))
            best_candidate = self._pick_best_candidate(search_candidates, target_song_info, 0)
            if best_candidate is not None:
                merged_candidate = merge_resolved_song_info(target_song_info, best_candidate)
                match_priority = song_info_match_priority(merged_candidate, target_song_info)
                candidate_rows.append((merged_candidate, match_priority, 0))
                if is_high_confidence_match(match_priority):
                    return merged_candidate

        for offset, source in enumerate(ranked_fallback_sources, start=2):
            self._emit_progress(progress_callback, f"resolving source {source} ({offset}/{len(ordered_sources)})")
            search_candidates = self._search_source_candidates(source, build_resolve_keyword(target_song_info, row))
            best_candidate = self._pick_best_candidate(search_candidates, target_song_info, offset)
            succeeded = best_candidate is not None
            self._record_fallback_result(preferred_source, source, succeeded=succeeded)
            if not succeeded:
                continue
            return merge_resolved_song_info(target_song_info, best_candidate)

        if candidate_rows:
            candidate_rows.sort(key=lambda item: (match_priority_group(item[1]), search_result_quality_group(item[0]), -candidate_file_size_bytes(item[0]), item[2], item[1]))
            return candidate_rows[0][0]
        return target_song_info


class CatalogDownloader:
    def __init__(self, repository, work_dir="musicdl_outputs/catalogsync", worker_count=DEFAULT_DOWNLOAD_WORKERS):
        self.repository = repository
        resolver_stats_repo = ResolverStatsRepository(default_resolver_stats_db_path(self.repository.db_path))
        self._resolver = MultiSourceSongResolver(
            client_factory=lambda platform: self.get_client(platform),
            request_overrides_factory=lambda timeout: self._request_overrides(timeout),
            resolver_stats_repo=resolver_stats_repo,
        )
  • Step 4: Run the resolver-focused tests and verify they pass

Run: python -m pytest tests/catalogsync/test_resolver.py tests/catalogsync/test_resolver_stats.py -q

Expected: all resolver and resolver-stats tests pass.

  • Step 5: Commit the ranked resolver behavior
git add musicdl/catalogsync/resolver.py musicdl/catalogsync/downloader.py tests/catalogsync/test_resolver.py tests/catalogsync/test_resolver_stats.py
git commit -m "feat: rank resolver fallback sources by origin"

Task 3: Initialize The Side Database At Startup Boundaries

Files:

  • Modify: musicdl/catalogsync/cli.py

  • Modify: musicdl/catalogsync/ops/web.py

  • Modify: tests/catalogsync/test_cli.py

  • Modify: tests/catalogsync/test_ops_api.py

  • Step 1: Write the failing startup initialization tests

def test_init_db_command_creates_resolver_stats_side_db(self):
    from musicdl.catalogsync.cli import cli

    runner = CliRunner()
    with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
        db_path = Path(tmpdir) / "catalogsync.db"
        side_db_path = Path(tmpdir) / "resolver_stats.db"
        library_root = Path(tmpdir) / "library"

        result = runner.invoke(
            cli,
            ["init-db", "--db", str(db_path), "--library-root", str(library_root)],
        )

    self.assertEqual(0, result.exit_code, msg=result.output)
    self.assertTrue(side_db_path.exists())

def test_create_app_initializes_resolver_stats_side_db(self):
    from musicdl.catalogsync.db import initialize_database
    from musicdl.catalogsync.ops.web import create_app

    with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
        root = Path(tmpdir)
        db_path = root / "catalogsync.db"
        env_path = root / "catalogsync.env"
        side_db_path = root / "resolver_stats.db"
        env_path.write_text("ROOT_DIR=/music\nDOWNLOAD_SOURCES=qq\n", encoding="utf-8")
        initialize_database(db_path).close()

        app = create_app(db_path=db_path, env_path=env_path)

    self.assertIsNotNone(app)
    self.assertTrue(side_db_path.exists())
  • Step 2: Run the startup tests and verify they fail

Run: python -m pytest tests/catalogsync/test_cli.py -k "resolver_stats_side_db" tests/catalogsync/test_ops_api.py -k "resolver_stats_side_db" -q

Expected: assertions fail because resolver_stats.db is not created yet.

  • Step 3: Wire startup initialization to the side database
from .resolver_stats import default_resolver_stats_db_path, initialize_resolver_stats_database


class CatalogSyncApplication:
    def __init__(self, db_path: str, library_root: str | None = None):
        self.db_path = db_path
        self.library_root = library_root
        initialize_database(db_path, default_library_root=library_root).close()
        initialize_resolver_stats_database(default_resolver_stats_db_path(db_path)).close()
        self.repository = CatalogRepository(db_path)
        self.service = CatalogSyncService(self.repository)
        self.downloader = CatalogDownloader(self.repository)

    def init_db(self):
        initialize_database(self.db_path, default_library_root=self.library_root).close()
        initialize_resolver_stats_database(default_resolver_stats_db_path(self.db_path)).close()


def create_app(db_path: str | Path, env_path: str | Path, *, start_runner: bool = False, runner_sleep_seconds: float = 1.0) -> FastAPI:
    db_file = Path(db_path)
    initialize_database(db_file).close()
    initialize_resolver_stats_database(default_resolver_stats_db_path(db_file)).close()
    ...
  • Step 4: Run the startup tests and verify they pass

Run: python -m pytest tests/catalogsync/test_cli.py -k "resolver_stats_side_db" tests/catalogsync/test_ops_api.py -k "resolver_stats_side_db" -q

Expected: 2 passed

  • Step 5: Commit the startup wiring
git add musicdl/catalogsync/cli.py musicdl/catalogsync/ops/web.py tests/catalogsync/test_cli.py tests/catalogsync/test_ops_api.py
git commit -m "feat: initialize resolver stats database on startup"

Task 4: Run Full Verification And NAS Validation

Files:

  • Modify: musicdl/catalogsync/resolver.py

  • Modify: musicdl/catalogsync/downloader.py

  • Modify: musicdl/catalogsync/cli.py

  • Modify: musicdl/catalogsync/ops/web.py

  • Modify: musicdl/catalogsync/resolver_stats.py

  • Modify: tests/catalogsync/test_resolver.py

  • Modify: tests/catalogsync/test_resolver_stats.py

  • Modify: tests/catalogsync/test_cli.py

  • Modify: tests/catalogsync/test_ops_api.py

  • Step 1: Run the full local regression slice

Run: python -m pytest tests/catalogsync/test_resolver_stats.py tests/catalogsync/test_resolver.py tests/catalogsync/test_cli.py tests/catalogsync/test_services.py tests/catalogsync/test_ops_executors.py tests/catalogsync/test_ops_runner.py tests/catalogsync/test_ops_api.py tests/catalogsync/test_runtime.py -q

Expected: all tests pass, with only the existing known warning if it still appears.

  • Step 2: Deploy to NAS

Run: powershell -ExecutionPolicy Bypass -File .\deploy-catalogsync.ps1

Expected: deploy completes successfully, health check passes, and single-instance check passes.

  • Step 3: Sample NAS dashboard and confirm dual-download bursts still appear

Run:

powershell -ExecutionPolicy Bypass -File 'C:\Users\Administrator\.codex\skills\nas-ssh-192168543\scripts\run.ps1' "python3 - <<'PY'
import json, urllib.request
with urllib.request.urlopen('http://127.0.0.1:18080/api/dashboard?include_task_rows=false', timeout=10) as resp:
    data = json.load(resp)
print(json.dumps({
    'downloaded_songs': data['download_stats']['downloaded_songs'],
    'speed_bps': data['transfer_stats']['download_speed_bytes_per_sec'],
    'workers': [w['worker_name'] for w in data['workers'] if w.get('status') == 'running'],
}, ensure_ascii=False))
PY"

Expected: running workers still include download-1 and download-2 during active bursts, and downloaded_songs continues increasing.

  • Step 4: Commit the verified end-to-end implementation
git add musicdl/catalogsync/resolver_stats.py musicdl/catalogsync/resolver.py musicdl/catalogsync/downloader.py musicdl/catalogsync/cli.py musicdl/catalogsync/ops/web.py tests/catalogsync/test_resolver_stats.py tests/catalogsync/test_resolver.py tests/catalogsync/test_cli.py tests/catalogsync/test_ops_api.py
git commit -m "feat: persist resolver fallback source rankings"