import sqlite3 import tempfile import unittest from contextlib import closing import json from pathlib import Path class DatabaseSchemaTests(unittest.TestCase): def test_connect_database_enables_sqlite_busy_timeout_and_wal(self): from musicdl.catalogsync.db import SQLITE_BUSY_TIMEOUT_MS, connect_database with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" with closing(connect_database(db_path)) as conn: busy_timeout_ms = conn.execute("PRAGMA busy_timeout").fetchone()[0] foreign_keys_enabled = conn.execute("PRAGMA foreign_keys").fetchone()[0] journal_mode = str(conn.execute("PRAGMA journal_mode").fetchone()[0]).lower() self.assertEqual(SQLITE_BUSY_TIMEOUT_MS, busy_timeout_ms) self.assertEqual(1, foreign_keys_enabled) self.assertEqual("wal", journal_mode) def test_initialize_database_creates_expected_tables_and_default_backend(self): from musicdl.catalogsync.db import REQUIRED_TABLES, initialize_database with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" library_root = Path(tmpdir) / "library" conn = initialize_database(db_path, default_library_root=library_root) conn.close() del conn with closing(sqlite3.connect(db_path)) as verify_conn: table_rows = verify_conn.execute( "SELECT name FROM sqlite_master WHERE type = 'table'" ).fetchall() tables = {row[0] for row in table_rows} self.assertTrue(REQUIRED_TABLES.issubset(tables)) backend_row = verify_conn.execute( """ SELECT backend_type, base_path, is_default FROM storage_backends WHERE name = ? """, ("default-local",), ).fetchone() del verify_conn self.assertEqual(("local_fs", str(library_root.resolve()), 1), backend_row) def test_initialize_database_creates_upload_tables(self): from musicdl.catalogsync.db import initialize_database with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" conn = initialize_database(db_path) conn.close() with closing(sqlite3.connect(db_path)) as verify_conn: tables = { row[0] for row in verify_conn.execute( "SELECT name FROM sqlite_master WHERE type = 'table'" ).fetchall() } self.assertIn("song_backend_presence", tables) self.assertIn("upload_tasks", tables) def test_initialize_database_creates_playlist_download_preferences_table_and_indexes(self): from musicdl.catalogsync.db import initialize_database with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" conn = initialize_database(db_path) conn.close() with closing(sqlite3.connect(db_path)) as verify_conn: table_names = { row[0] for row in verify_conn.execute( "SELECT name FROM sqlite_master WHERE type = 'table'" ).fetchall() } index_names = { row[0] for row in verify_conn.execute( "SELECT name FROM sqlite_master WHERE type = 'index'" ).fetchall() } self.assertIn("playlist_download_preferences", table_names) self.assertIn("idx_playlist_download_preferences_is_wanted", index_names) self.assertIn("idx_pool_playlists_playlist_id", index_names) self.assertIn("idx_playlist_songs_song_id", index_names) self.assertIn("idx_file_assets_song_id", index_names) self.assertIn("idx_job_items_running_song_id", index_names) def test_initialize_database_is_idempotent_for_default_backend(self): from musicdl.catalogsync.db import initialize_database with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" library_root = Path(tmpdir) / "library" initialize_database(db_path, default_library_root=library_root).close() initialize_database(db_path, default_library_root=library_root).close() with closing(sqlite3.connect(db_path)) as conn: backend_count = conn.execute( "SELECT COUNT(*) FROM storage_backends WHERE name = ?", ("default-local",), ).fetchone()[0] del conn self.assertEqual(1, backend_count) def test_initialize_database_upgrades_job_workers_with_throughput_columns(self): from musicdl.catalogsync.db import initialize_database with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" with closing(sqlite3.connect(db_path)) as seed_conn: seed_conn.execute( """ CREATE TABLE job_workers ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_run_id INTEGER, job_stage_id INTEGER, worker_name TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'idle', current_job_item_id INTEGER, current_song_id INTEGER, current_playlist_id INTEGER, current_display_text TEXT, heartbeat_at TEXT, last_progress_text TEXT, processed_count INTEGER NOT NULL DEFAULT 0, error_count INTEGER NOT NULL DEFAULT 0 ) """ ) seed_conn.commit() initialize_database(db_path).close() with closing(sqlite3.connect(db_path)) as verify_conn: columns = { row[1] for row in verify_conn.execute("PRAGMA table_info(job_workers)").fetchall() } self.assertIn("downloaded_bytes", columns) self.assertIn("total_bytes", columns) self.assertIn("speed_bytes_per_sec", columns) self.assertIn("progress_percent", columns) def test_initialize_database_upgrades_playlists_with_play_count_column(self): from musicdl.catalogsync.db import initialize_database with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" with closing(sqlite3.connect(db_path)) as seed_conn: seed_conn.execute( """ CREATE TABLE playlists ( id INTEGER PRIMARY KEY AUTOINCREMENT, platform TEXT NOT NULL, remote_playlist_id TEXT NOT NULL, name TEXT NOT NULL, url TEXT NOT NULL, parse_strategy TEXT NOT NULL DEFAULT 'playlist_url', cover_url TEXT, creator_name TEXT, metadata_json TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, UNIQUE(platform, remote_playlist_id) ) """ ) seed_conn.commit() initialize_database(db_path).close() with closing(sqlite3.connect(db_path)) as verify_conn: columns = { row[1] for row in verify_conn.execute("PRAGMA table_info(playlists)").fetchall() } self.assertIn("play_count", columns) def test_initialize_database_upgrades_playlists_with_collected_song_count_column(self): from musicdl.catalogsync.db import initialize_database with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" with closing(sqlite3.connect(db_path)) as seed_conn: seed_conn.execute( """ CREATE TABLE playlists ( id INTEGER PRIMARY KEY AUTOINCREMENT, platform TEXT NOT NULL, remote_playlist_id TEXT NOT NULL, name TEXT NOT NULL, url TEXT NOT NULL, parse_strategy TEXT NOT NULL DEFAULT 'playlist_url', cover_url TEXT, creator_name TEXT, play_count INTEGER, metadata_json TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, UNIQUE(platform, remote_playlist_id) ) """ ) seed_conn.commit() initialize_database(db_path).close() with closing(sqlite3.connect(db_path)) as verify_conn: columns = { row[1] for row in verify_conn.execute("PRAGMA table_info(playlists)").fetchall() } self.assertIn("collected_song_count", columns) class CatalogRepositoryUploadTests(unittest.TestCase): def test_upsert_object_storage_backend_inserts_and_updates_config(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.repository import CatalogRepository with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" initialize_database(db_path).close() repo = CatalogRepository(db_path) backend_id = repo.upsert_object_storage_backend( name="main-s3", container_name="bucket-a", endpoint="https://s3.example.com", region="auto", base_prefix="music", credential_env_prefix="CATALOGSYNC_MAIN_S3", ) repo.upsert_object_storage_backend( name="main-s3", container_name="bucket-b", endpoint="https://s3.example.com", region="cn-shanghai", base_prefix="archive", credential_env_prefix="CATALOGSYNC_MAIN_S3", ) backend = repo.get_backend_by_name("main-s3") config = json.loads(backend["config_json"]) self.assertEqual(backend_id, int(backend["id"])) self.assertEqual("object_storage", backend["backend_type"]) self.assertEqual("bucket-b", backend["container_name"]) self.assertEqual("https://s3.example.com", config["endpoint"]) self.assertEqual("cn-shanghai", config["region"]) self.assertEqual("archive", config["base_prefix"]) def test_record_remote_file_creates_updates_location_and_refreshes_presence(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.models import CatalogSong from musicdl.catalogsync.repository import CatalogRepository with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" library_root = Path(tmpdir) / "library" initialize_database(db_path, default_library_root=library_root).close() repo = CatalogRepository(db_path) song_id = repo.upsert_song( CatalogSong( platform="qq", remote_song_id="song-a", name="Song A", ext="mp3", file_size_bytes=80, quality_label="standard", ) ) local_backend_id = repo.get_default_backend_id() file_asset_id = repo.record_local_file( song_id=song_id, backend_id=local_backend_id, relative_path="qq/Singer A/song-a.mp3", file_size_bytes=80, ext="mp3", quality_label="standard", ) object_backend_id = repo.upsert_object_storage_backend( name="main-s3", container_name="bucket-a", endpoint="https://s3.example.com", region="auto", base_prefix="music", credential_env_prefix="CATALOGSYNC_MAIN_S3", ) repo.record_remote_file( file_asset_id=file_asset_id, backend_id=object_backend_id, container_name="bucket-a", locator="music/qq/Singer A/song-a.mp3", public_url="https://cdn.example.com/music/qq/Singer A/song-a.mp3", download_url=None, ) repo.record_remote_file( file_asset_id=file_asset_id, backend_id=object_backend_id, container_name="bucket-a", locator="music/qq/Singer A/song-a.mp3", public_url="https://cdn.example.com/music/qq/Singer A/song-a-v2.mp3", download_url=None, ) remote_row = repo._fetchone( """ SELECT * FROM file_locations WHERE file_asset_id = ? AND backend_id = ? AND locator = ? """, (file_asset_id, object_backend_id, "music/qq/Singer A/song-a.mp3"), ) presence_row = repo.get_song_backend_presence(song_id=song_id, backend_id=object_backend_id) self.assertIsNone(remote_row["absolute_path"]) self.assertEqual(0, int(remote_row["is_primary"])) self.assertEqual("active", remote_row["status"]) self.assertEqual( "https://cdn.example.com/music/qq/Singer A/song-a-v2.mp3", remote_row["public_url"], ) self.assertEqual(1, int(presence_row["has_active_file"])) self.assertEqual(1, int(presence_row["active_file_count"])) self.assertEqual(int(remote_row["id"]), int(presence_row["primary_file_location_id"])) def test_enqueue_upload_task_deduplicates_and_list_pending_supports_status_update(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.models import CatalogSong from musicdl.catalogsync.repository import CatalogRepository with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" library_root = Path(tmpdir) / "library" initialize_database(db_path, default_library_root=library_root).close() repo = CatalogRepository(db_path) song_id = repo.upsert_song(CatalogSong(platform="qq", remote_song_id="song-a", name="Song A", ext="mp3")) local_backend_id = repo.get_default_backend_id() file_asset_id = repo.record_local_file( song_id=song_id, backend_id=local_backend_id, relative_path="qq/Singer A/song-a.mp3", file_size_bytes=80, ext="mp3", quality_label="standard", ) source_location = repo._fetchone( """ SELECT id FROM file_locations WHERE file_asset_id = ? AND backend_id = ? ORDER BY id ASC LIMIT 1 """, (file_asset_id, local_backend_id), ) object_backend_id = repo.upsert_object_storage_backend( name="main-s3", container_name="bucket-a", endpoint="https://s3.example.com", region="auto", base_prefix="music", credential_env_prefix="CATALOGSYNC_MAIN_S3", ) first_task_id = repo.enqueue_upload_task( file_asset_id=file_asset_id, source_location_id=int(source_location["id"]), target_backend_id=object_backend_id, target_container_name="bucket-a", target_locator="music/qq/Singer A/song-a.mp3", ) second_task_id = repo.enqueue_upload_task( file_asset_id=file_asset_id, source_location_id=int(source_location["id"]), target_backend_id=object_backend_id, target_container_name="bucket-a", target_locator="music/qq/Singer A/song-a.mp3", ) pending_before = repo.list_pending_upload_tasks(target_backend_id=object_backend_id) repo.claim_next_upload_task(target_backend_id=object_backend_id) repo.mark_upload_task_status(task_id=first_task_id, status="succeeded", last_error=None) pending_after = repo.list_pending_upload_tasks(target_backend_id=object_backend_id) self.assertEqual(first_task_id, second_task_id) self.assertEqual(1, len(pending_before)) self.assertEqual(0, len(pending_after)) def test_enqueue_upload_task_requeues_failed_task_as_pending(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.models import CatalogSong from musicdl.catalogsync.repository import CatalogRepository with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" library_root = Path(tmpdir) / "library" initialize_database(db_path, default_library_root=library_root).close() repo = CatalogRepository(db_path) song_id = repo.upsert_song(CatalogSong(platform="qq", remote_song_id="song-a", name="Song A", ext="mp3")) local_backend_id = repo.get_default_backend_id() file_asset_id = repo.record_local_file( song_id=song_id, backend_id=local_backend_id, relative_path="qq/Singer A/song-a.mp3", file_size_bytes=80, ext="mp3", quality_label="standard", ) source_location = repo._fetchone( """ SELECT id FROM file_locations WHERE file_asset_id = ? AND backend_id = ? ORDER BY id ASC LIMIT 1 """, (file_asset_id, local_backend_id), ) object_backend_id = repo.upsert_object_storage_backend( name="main-s3", container_name="bucket-a", endpoint="https://s3.example.com", region="auto", base_prefix="music", credential_env_prefix="CATALOGSYNC_MAIN_S3", ) task_id = repo.enqueue_upload_task( file_asset_id=file_asset_id, source_location_id=int(source_location["id"]), target_backend_id=object_backend_id, target_container_name="bucket-a", target_locator="music/qq/Singer A/song-a.mp3", ) repo.claim_next_upload_task(target_backend_id=object_backend_id) repo.mark_upload_task_status(task_id=task_id, status="failed", last_error="network error") repo.enqueue_upload_task( file_asset_id=file_asset_id, source_location_id=int(source_location["id"]), target_backend_id=object_backend_id, target_container_name="bucket-a", target_locator="music/qq/Singer A/song-a.mp3", ) task_row = repo._fetchone("SELECT status, last_error FROM upload_tasks WHERE id = ?", (task_id,)) self.assertEqual("pending", task_row["status"]) self.assertIsNone(task_row["last_error"]) def test_claim_next_upload_task_marks_row_uploading_and_clears_finished_at(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.models import CatalogSong from musicdl.catalogsync.repository import CatalogRepository with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" library_root = Path(tmpdir) / "library" initialize_database(db_path, default_library_root=library_root).close() repo = CatalogRepository(db_path) song_id = repo.upsert_song(CatalogSong(platform="qq", remote_song_id="song-a", name="Song A", ext="mp3")) local_backend_id = repo.get_default_backend_id() file_asset_id = repo.record_local_file( song_id=song_id, backend_id=local_backend_id, relative_path="qq/Singer A/song-a.mp3", file_size_bytes=80, ext="mp3", quality_label="standard", ) source_location = repo._fetchone( """ SELECT id FROM file_locations WHERE file_asset_id = ? AND backend_id = ? ORDER BY id ASC LIMIT 1 """, (file_asset_id, local_backend_id), ) object_backend_id = repo.upsert_object_storage_backend( name="main-s3", container_name="bucket-a", endpoint="https://s3.example.com", region="auto", base_prefix="music", credential_env_prefix="CATALOGSYNC_MAIN_S3", ) task_id = repo.enqueue_upload_task( file_asset_id=file_asset_id, source_location_id=int(source_location["id"]), target_backend_id=object_backend_id, target_container_name="bucket-a", target_locator="music/qq/Singer A/song-a.mp3", ) claimed_first = repo.claim_next_upload_task(target_backend_id=object_backend_id) repo.mark_upload_task_status(task_id=task_id, status="failed", last_error="network error") repo.enqueue_upload_task( file_asset_id=file_asset_id, source_location_id=int(source_location["id"]), target_backend_id=object_backend_id, target_container_name="bucket-a", target_locator="music/qq/Singer A/song-a.mp3", ) claimed_second = repo.claim_next_upload_task(target_backend_id=object_backend_id) task_row = repo._fetchone( "SELECT status, attempts, started_at, finished_at FROM upload_tasks WHERE id = ?", (task_id,), ) self.assertEqual(task_id, int(claimed_first["id"])) self.assertEqual(task_id, int(claimed_second["id"])) self.assertEqual("uploading", task_row["status"]) self.assertEqual(2, int(task_row["attempts"])) self.assertIsNotNone(task_row["started_at"]) self.assertIsNone(task_row["finished_at"]) def test_mark_upload_task_status_rejects_invalid_transition(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.models import CatalogSong from musicdl.catalogsync.repository import CatalogRepository with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" library_root = Path(tmpdir) / "library" initialize_database(db_path, default_library_root=library_root).close() repo = CatalogRepository(db_path) song_id = repo.upsert_song(CatalogSong(platform="qq", remote_song_id="song-a", name="Song A", ext="mp3")) local_backend_id = repo.get_default_backend_id() file_asset_id = repo.record_local_file( song_id=song_id, backend_id=local_backend_id, relative_path="qq/Singer A/song-a.mp3", file_size_bytes=80, ext="mp3", quality_label="standard", ) source_location = repo._fetchone( """ SELECT id FROM file_locations WHERE file_asset_id = ? AND backend_id = ? ORDER BY id ASC LIMIT 1 """, (file_asset_id, local_backend_id), ) object_backend_id = repo.upsert_object_storage_backend( name="main-s3", container_name="bucket-a", endpoint="https://s3.example.com", region="auto", base_prefix="music", credential_env_prefix="CATALOGSYNC_MAIN_S3", ) task_id = repo.enqueue_upload_task( file_asset_id=file_asset_id, source_location_id=int(source_location["id"]), target_backend_id=object_backend_id, target_container_name="bucket-a", target_locator="music/qq/Singer A/song-a.mp3", ) repo.claim_next_upload_task(target_backend_id=object_backend_id) repo.mark_upload_task_status(task_id=task_id, status="succeeded", last_error=None) with self.assertRaises(RuntimeError): repo.mark_upload_task_status(task_id=task_id, status="uploading", last_error=None) def test_list_missing_object_upload_candidates_skips_existing_active_remote(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.models import CatalogSong from musicdl.catalogsync.repository import CatalogRepository with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" library_root = Path(tmpdir) / "library" initialize_database(db_path, default_library_root=library_root).close() repo = CatalogRepository(db_path) local_backend_id = repo.get_default_backend_id() object_backend_id = repo.upsert_object_storage_backend( name="main-s3", container_name="bucket-a", endpoint="https://s3.example.com", region="auto", base_prefix="music", credential_env_prefix="CATALOGSYNC_MAIN_S3", ) song_a_id = repo.upsert_song(CatalogSong(platform="qq", remote_song_id="song-a", name="Song A", ext="mp3")) song_b_id = repo.upsert_song(CatalogSong(platform="qq", remote_song_id="song-b", name="Song B", ext="mp3")) asset_a_id = repo.record_local_file( song_id=song_a_id, backend_id=local_backend_id, relative_path="qq/Singer A/song-a.mp3", file_size_bytes=80, ext="mp3", quality_label="standard", ) asset_b_id = repo.record_local_file( song_id=song_b_id, backend_id=local_backend_id, relative_path="qq/Singer B/song-b.mp3", file_size_bytes=81, ext="mp3", quality_label="standard", ) repo.record_remote_file( file_asset_id=asset_b_id, backend_id=object_backend_id, container_name="bucket-a", locator="music/qq/Singer B/song-b.mp3", public_url=None, download_url=None, ) candidates = repo.list_missing_object_upload_candidates(target_backend_id=object_backend_id) self.assertEqual(1, len(candidates)) self.assertEqual(song_a_id, int(candidates[0]["song_id"])) self.assertEqual(asset_a_id, int(candidates[0]["file_asset_id"])) self.assertEqual("music/qq/Singer A/song-a.mp3", candidates[0]["target_locator"]) def test_list_missing_object_upload_candidates_supports_sources_playlist_ids_and_limit(self): from musicdl.catalogsync.db import initialize_database from musicdl.catalogsync.models import CatalogSong, PlaylistCandidate from musicdl.catalogsync.repository import CatalogRepository with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: db_path = Path(tmpdir) / "catalogsync.db" library_root = Path(tmpdir) / "library" initialize_database(db_path, default_library_root=library_root).close() repo = CatalogRepository(db_path) playlist_a = repo.upsert_playlist( PlaylistCandidate( platform="qq", pool_kind="manual_file", remote_id="playlist-a", name="Playlist A", url="https://y.qq.com/n/ryqq/playlist/playlist-a", ) ) playlist_b = repo.upsert_playlist( PlaylistCandidate( platform="qq", pool_kind="manual_file", remote_id="playlist-b", name="Playlist B", url="https://y.qq.com/n/ryqq/playlist/playlist-b", ) ) qq_song_id = repo.upsert_song(CatalogSong(platform="qq", remote_song_id="song-a", name="Song A", ext="mp3")) netease_song_id = repo.upsert_song( CatalogSong(platform="netease", remote_song_id="song-b", name="Song B", ext="flac") ) local_backend_id = repo.get_default_backend_id() repo.record_local_file( song_id=qq_song_id, backend_id=local_backend_id, relative_path="qq/Singer A/song-a.mp3", file_size_bytes=80, ext="mp3", quality_label="standard", ) repo.record_local_file( song_id=netease_song_id, backend_id=local_backend_id, relative_path="netease/Singer B/song-b.flac", file_size_bytes=128, ext="flac", quality_label="lossless", ) repo.link_playlist_song(playlist_a, qq_song_id, 1) repo.link_playlist_song(playlist_b, netease_song_id, 1) object_backend_id = repo.upsert_object_storage_backend( name="main-s3", container_name="bucket-a", endpoint="https://s3.example.com", region="auto", base_prefix="music", credential_env_prefix="CATALOGSYNC_MAIN_S3", ) candidates = repo.list_missing_object_upload_candidates( target_backend_id=object_backend_id, sources=["qq"], playlist_ids=[playlist_a], limit=1, ) self.assertEqual(1, len(candidates)) self.assertEqual(qq_song_id, int(candidates[0]["song_id"])) self.assertEqual("music/qq/Singer A/song-a.mp3", candidates[0]["target_locator"]) if __name__ == "__main__": unittest.main()