Files

2209 lines
102 KiB
Python

import tempfile
import threading
import time
import unittest
import warnings
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import Mock, patch
class FakeHTTPResponse:
def __init__(self, payload, status_code=200):
self._payload = payload
self.status_code = status_code
def raise_for_status(self):
if int(self.status_code) >= 400:
raise RuntimeError(f"http {self.status_code}")
def json(self):
return self._payload
class FakeImageResponse:
def __init__(self, content: bytes, status_code: int = 200, content_type: str = "image/jpeg"):
self.content = bytes(content)
self.status_code = int(status_code)
self.headers = {"Content-Type": content_type}
def raise_for_status(self):
if int(self.status_code) >= 400:
raise RuntimeError(f"http {self.status_code}")
class CatalogServiceTests(unittest.TestCase):
@staticmethod
def _playlist_square_candidate(platform: str, remote_id: str):
from musicdl.catalogsync.models import PlaylistCandidate
playlist_url = {
"netease": f"https://music.163.com/#/playlist?id={remote_id}",
"qq": f"https://y.qq.com/n/ryqq/playlist/{remote_id}",
"kuwo": f"https://www.kuwo.cn/playlist_detail/{remote_id}",
}[platform]
return PlaylistCandidate(
platform=platform,
pool_kind="playlist_square",
remote_id=remote_id,
name=f"{platform}-playlist-{remote_id}",
url=playlist_url,
)
def test_collect_playlists_paginates_netease_playlist_square(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.catalogsync.services import CatalogSyncService
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = CatalogRepository(db_path)
netease_collector = SimpleNamespace(
collect_playlist_square=Mock(
side_effect=[
[self._playlist_square_candidate("netease", "10001")],
[self._playlist_square_candidate("netease", "10002")],
[],
]
),
collect_toplist=Mock(return_value=[]),
)
service = CatalogSyncService(
repository=repo,
collectors={"netease": netease_collector},
)
counts = service.collect_playlists(sources=["netease"], include_toplist=False)
self.assertEqual(2, counts["playlist_square"])
self.assertEqual(3, netease_collector.collect_playlist_square.call_count)
def test_collect_playlists_paginates_kuwo_playlist_square(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.catalogsync.services import CatalogSyncService
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = CatalogRepository(db_path)
kuwo_collector = SimpleNamespace(
collect_playlist_square=Mock(
side_effect=[
[self._playlist_square_candidate("kuwo", "20001")],
[self._playlist_square_candidate("kuwo", "20002")],
[],
]
),
collect_toplist=Mock(return_value=[]),
)
service = CatalogSyncService(
repository=repo,
collectors={"kuwo": kuwo_collector},
)
counts = service.collect_playlists(sources=["kuwo"], include_toplist=False)
self.assertEqual(2, counts["playlist_square"])
self.assertEqual(3, kuwo_collector.collect_playlist_square.call_count)
def test_collect_playlists_handles_qq_square_failure_with_clear_boundary(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.catalogsync.services import CatalogSyncService
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = CatalogRepository(db_path)
qq_collector = SimpleNamespace(
collect_playlist_square=Mock(side_effect=RuntimeError("qq square unavailable")),
collect_toplist=Mock(return_value=[]),
)
netease_collector = SimpleNamespace(
collect_playlist_square=Mock(
return_value=[self._playlist_square_candidate("netease", "30001")]
),
collect_toplist=Mock(return_value=[]),
)
service = CatalogSyncService(
repository=repo,
collectors={"qq": qq_collector, "netease": netease_collector},
)
try:
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always", RuntimeWarning)
counts = service.collect_playlists(
sources=["qq", "netease"],
include_toplist=False,
)
except RuntimeError as exc: # pragma: no cover - current gap guard
self.fail(f"collect_playlists should not abort on qq square failure: {exc}")
self.assertEqual(1, counts["playlist_square"])
self.assertEqual(1, netease_collector.collect_playlist_square.call_count)
runtime_warnings = [warning for warning in caught if issubclass(warning.category, RuntimeWarning)]
self.assertEqual([], runtime_warnings)
def test_collect_playlists_stops_when_playlist_square_pages_repeat_only_duplicates(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.catalogsync.services import CatalogSyncService
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = CatalogRepository(db_path)
collector = SimpleNamespace(
collect_playlist_square=Mock(
side_effect=[
[self._playlist_square_candidate("netease", "40001")],
[self._playlist_square_candidate("netease", "40001")],
[self._playlist_square_candidate("netease", "40002")],
]
),
collect_toplist=Mock(return_value=[]),
)
service = CatalogSyncService(
repository=repo,
collectors={"netease": collector},
)
counts = service.collect_playlists(sources=["netease"], include_toplist=False)
self.assertEqual(1, counts["playlist_square"])
self.assertEqual(2, collector.collect_playlist_square.call_count)
def test_resolve_playlist_song_infos_uses_snapshot_builder_for_supported_playlist_urls(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.catalogsync.services import CatalogSyncService
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = CatalogRepository(db_path)
service = CatalogSyncService(repo)
playlist_row = {
"platform": "netease",
"parse_strategy": "playlist_url",
"url": "https://music.163.com/#/playlist?id=17745989905",
}
with patch("musicdl.catalogsync.services.build_netease_playlist_song_infos", return_value=["snapshot-song"]) as builder:
with patch.object(service, "get_client", return_value="netease-client"):
result = service.resolve_playlist_song_infos(playlist_row)
self.assertEqual(["snapshot-song"], result)
builder.assert_called_once_with("netease-client", playlist_row["url"])
def test_resolve_netease_toplist_uses_netease_snapshot_builder(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.catalogsync.services import CatalogSyncService
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = CatalogRepository(db_path)
service = CatalogSyncService(repo)
playlist_row = {
"platform": "netease",
"parse_strategy": "netease_toplist",
"url": "https://music.163.com/#/playlist?id=19723756",
}
with patch(
"musicdl.catalogsync.services.build_netease_playlist_song_infos",
return_value=["netease-toplist-song"],
) as builder:
with patch.object(service, "get_client", return_value="netease-client"):
result = service.resolve_playlist_song_infos(playlist_row)
self.assertEqual(["netease-toplist-song"], result)
builder.assert_called_once_with("netease-client", playlist_row["url"])
def test_resolve_qq_toplist_falls_back_when_songlist_has_no_data_field(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.catalogsync.services import CatalogSyncService
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = CatalogRepository(db_path)
service = CatalogSyncService(repo)
fake_client = SimpleNamespace(
source="QQMusicClient",
_constructuniqueworkdir=lambda keyword: f"/tmp/{keyword}",
_removeduplicates=lambda song_infos: list(song_infos),
)
playlist_row = {"remote_playlist_id": "75", "name": "有声榜"}
v8_payload = {
"songlist": [
{"songid": 0, "Franking_value": "0"},
]
}
musicu_payload = {
"toplist": {
"code": 0,
"data": {
"data": {
"song": [
{
"rank": 1,
"songId": 0,
"albumMid": "000HiJZD1F4wUi",
"title": "道士不好惹",
"singerName": "萌鹿剧场/骨头-萌鹿剧场",
},
{
"rank": 2,
"songId": 0,
"albumMid": "002qjuVq0iug1k",
"title": "晚安妈妈睡前故事大全",
"singerName": "QQ音乐儿童官方频道/晚安妈妈",
},
]
}
},
}
}
mocked_get = Mock(return_value=FakeHTTPResponse(v8_payload))
mocked_post = Mock(return_value=FakeHTTPResponse(musicu_payload))
with patch.object(service, "get_client", return_value=fake_client):
with patch("musicdl.catalogsync.services.requests.get", mocked_get):
with patch("musicdl.catalogsync.services.requests.post", mocked_post):
song_infos = service._resolve_qq_toplist(playlist_row)
self.assertEqual(2, len(song_infos))
self.assertTrue(all(song_info.identifier for song_info in song_infos))
self.assertEqual("道士不好惹", song_infos[0].song_name)
self.assertEqual("萌鹿剧场, 骨头-萌鹿剧场", song_infos[0].singers)
self.assertEqual(1, mocked_get.call_count)
self.assertEqual(1, mocked_post.call_count)
def test_import_manual_playlists_creates_platform_specific_manual_pools(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.manual_playlists import parse_playlist_file
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.catalogsync.services import CatalogSyncService
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
playlist_file = Path(tmpdir) / "playlists.txt"
playlist_file.write_text(
"\n".join(
[
"https://music.163.com/#/playlist?id=17745989905",
"https://y.qq.com/n/ryqq/playlist/7707261125",
]
),
encoding="utf-8",
)
initialize_database(db_path).close()
repo = CatalogRepository(db_path)
service = CatalogSyncService(repo)
parsed = parse_playlist_file(playlist_file)
playlist_ids = service.import_manual_playlists(playlist_file, parsed.entries)
pools = repo._fetchall(
"""
SELECT platform, pool_kind, external_id, name
FROM playlist_pools
WHERE pool_kind = 'manual_file'
ORDER BY platform ASC
"""
)
self.assertEqual(2, len(playlist_ids))
self.assertEqual(["netease", "qq"], [row["platform"] for row in pools])
def test_sync_specific_playlists_only_processes_requested_playlist_ids(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.models import PlaylistCandidate
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.catalogsync.services import CatalogSyncService
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = CatalogRepository(db_path)
service = CatalogSyncService(repo)
playlist_a = repo.upsert_playlist(
PlaylistCandidate(
platform="netease",
pool_kind="manual_file",
remote_id="17745989905",
name="Playlist A",
url="https://music.163.com/#/playlist?id=17745989905",
)
)
playlist_b = repo.upsert_playlist(
PlaylistCandidate(
platform="netease",
pool_kind="manual_file",
remote_id="17729789137",
name="Playlist B",
url="https://music.163.com/#/playlist?id=17729789137",
)
)
with patch.object(service, "resolve_playlist_song_infos", return_value=[]) as resolver:
service.sync_specific_playlists([playlist_b])
called_row = resolver.call_args[0][0]
self.assertEqual(playlist_b, int(called_row["id"]))
def test_sync_playlist_row_backfills_netease_play_count_from_playlist_detail(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.models import PlaylistCandidate
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.catalogsync.services import CatalogSyncService
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = CatalogRepository(db_path)
service = CatalogSyncService(repo)
playlist_id = repo.upsert_playlist(
PlaylistCandidate(
platform="netease",
pool_kind="manual_file",
remote_id="17745989905",
name="Playlist A",
url="https://music.163.com/#/playlist?id=17745989905",
)
)
playlist_row = repo.list_playlists_by_ids([playlist_id])[0]
fake_client = SimpleNamespace(
post=Mock(
return_value=FakeHTTPResponse(
{
"playlist": {
"playCount": 7654321,
}
}
)
)
)
with patch.object(service, "resolve_playlist_song_infos", return_value=[]):
with patch.object(service, "get_client", return_value=fake_client):
service.sync_playlist_row(playlist_row)
refreshed_row = repo.list_playlists_by_ids([playlist_id])[0]
self.assertEqual(7654321, refreshed_row["play_count"])
def test_sync_playlist_row_backfills_qq_and_kuwo_play_count_from_playlist_detail(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.models import PlaylistCandidate
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.catalogsync.services import CatalogSyncService
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = CatalogRepository(db_path)
service = CatalogSyncService(repo)
qq_playlist_id = repo.upsert_playlist(
PlaylistCandidate(
platform="qq",
pool_kind="manual_file",
remote_id="7707261125",
name="QQ Playlist",
url="https://y.qq.com/n/ryqq/playlist/7707261125",
)
)
kuwo_playlist_id = repo.upsert_playlist(
PlaylistCandidate(
platform="kuwo",
pool_kind="manual_file",
remote_id="3694434192",
name="Kuwo Playlist",
url="https://www.kuwo.cn/playlist_detail/3694434192",
)
)
qq_playlist_row = repo.list_playlists_by_ids([qq_playlist_id])[0]
kuwo_playlist_row = repo.list_playlists_by_ids([kuwo_playlist_id])[0]
fake_qq_client = SimpleNamespace(
get=Mock(
return_value=FakeHTTPResponse(
{
"cdlist": [
{
"visitnum": 8527054,
}
]
}
)
)
)
fake_kuwo_client = SimpleNamespace(
get=Mock(
return_value=FakeHTTPResponse(
{
"data": {
"listencnt": 2196,
}
}
)
)
)
with patch.object(service, "resolve_playlist_song_infos", return_value=[]):
with patch.object(
service,
"get_client",
side_effect=lambda platform: {
"qq": fake_qq_client,
"kuwo": fake_kuwo_client,
}[platform],
):
service.sync_playlist_row(qq_playlist_row)
service.sync_playlist_row(kuwo_playlist_row)
refreshed_rows = {
int(row["id"]): row
for row in repo.list_playlists_by_ids([qq_playlist_id, kuwo_playlist_id])
}
self.assertEqual(8527054, refreshed_rows[qq_playlist_id]["play_count"])
self.assertEqual(2196, refreshed_rows[kuwo_playlist_id]["play_count"])
def test_sync_playlist_row_does_not_write_playlist_artifacts_until_explicit_export(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.models import PlaylistCandidate
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.catalogsync.services import CatalogSyncService
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
root = Path(tmpdir)
db_path = root / "catalogsync.db"
library_root = root / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
service = CatalogSyncService(repo)
playlist_id = repo.upsert_playlist(
PlaylistCandidate(
platform="qq",
pool_kind="manual_file",
remote_id="7707261125",
name="Playlist Artifact Ready",
url="https://y.qq.com/n/ryqq/playlist/7707261125",
cover_url="https://img.example.invalid/playlist-cover.jpg",
play_count=123456,
)
)
pool_id = repo.upsert_playlist_pool(
platform="qq",
pool_kind="manual_file",
external_id="manual-file:qq:7707261125",
name="Manual File QQ",
url="https://example.invalid/manual/qq/7707261125",
)
repo.link_pool_playlist(pool_id, playlist_id)
playlist_row = repo.list_playlists_by_ids([playlist_id])[0]
deferred_song = SimpleNamespace(
source="QQMusicClient",
identifier="song-artifact-1",
song_name="Song Artifact 1",
singers="Singer Artifact",
album="Album Artifact",
ext="flac",
file_size="1 MB",
file_size_bytes=1024 * 1024,
cover_url="https://img.example.invalid/song-cover.jpg",
raw_data={"search": {"id": "song-artifact-1"}, "quality": "lossless"},
)
with patch.object(service, "resolve_playlist_song_infos", return_value=[deferred_song]):
with patch.object(service, "resolve_playlist_play_count", return_value=123456):
linked_count = service.sync_playlist_row(playlist_row)
playlist_root = root / "playlists"
self.assertFalse(playlist_root.exists())
with patch(
"musicdl.catalogsync.services.requests.get",
side_effect=[
FakeImageResponse(b"playlist-cover"),
FakeImageResponse(b"song-cover"),
],
):
playlist_dir = service.ensure_playlist_artifacts_for_playlist(playlist_id)
self.assertIsNotNone(playlist_dir)
assert playlist_dir is not None
self.assertIn("Playlist Artifact Ready", playlist_dir.name)
self.assertIn(str(playlist_id), playlist_dir.name)
yaml_path = playlist_dir / "playlist.yaml"
self.assertTrue(yaml_path.exists())
yaml_text = yaml_path.read_text(encoding="utf-8")
self.assertIn("playlist_id: " + str(playlist_id), yaml_text)
self.assertIn("playlist_name: Playlist Artifact Ready", yaml_text)
self.assertIn("play_count: 123456", yaml_text)
self.assertIn("cover_url: https://img.example.invalid/song-cover.jpg", yaml_text)
self.assertIn("Song Artifact 1", yaml_text)
self.assertIn("Album Artifact", yaml_text)
self.assertIn("platform_song_id: song-artifact-1", yaml_text)
covers_dir = playlist_dir / "covers"
self.assertTrue(covers_dir.exists())
cover_names = {path.name for path in covers_dir.iterdir() if path.is_file()}
self.assertIn("playlist-cover.jpg", cover_names)
self.assertIn("song-1-song-artifact-1.jpg", cover_names)
self.assertEqual(1, linked_count)
def test_store_playlist_candidates_upserts_pool_and_links_playlists(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.models import PlaylistCandidate
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.catalogsync.services import CatalogSyncService
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = CatalogRepository(db_path)
service = CatalogSyncService(repository=repo)
pool_id = service.store_playlist_candidates(
platform="qq",
pool_kind="playlist_square",
pool_name="QQ 歌单广场",
candidates=[
PlaylistCandidate(
platform="qq",
pool_kind="playlist_square",
remote_id="7707261125",
name="甜度爆表",
url="https://y.qq.com/n/ryqq/playlist/7707261125",
),
PlaylistCandidate(
platform="qq",
pool_kind="playlist_square",
remote_id="7578943835",
name="丧系 Rap",
url="https://y.qq.com/n/ryqq/playlist/7578943835",
),
],
)
self.assertEqual(2, len(repo.list_playlists()))
self.assertEqual(2, len(repo.list_pool_playlist_links(pool_id)))
def test_store_playlist_songs_populates_songs_and_derived_artist_pool(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.models import PlaylistCandidate
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.catalogsync.services import CatalogSyncService
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = CatalogRepository(db_path)
service = CatalogSyncService(repository=repo)
pool_id = service.store_playlist_candidates(
platform="netease",
pool_kind="playlist_square",
pool_name="网易云歌单广场",
candidates=[
PlaylistCandidate(
platform="netease",
pool_kind="playlist_square",
remote_id="7583298906",
name="华语清新收藏夹",
url="https://music.163.com/#/playlist?id=7583298906",
)
],
)
playlist_id = repo.list_playlists()[0]["id"]
artist_pool_id = service.store_playlist_songs(
playlist_id=playlist_id,
source_pool_id=pool_id,
song_infos=[
SimpleNamespace(
source="NeteaseMusicClient",
identifier="101",
song_name="第一首歌",
singers="歌手甲 / 歌手乙",
album="专辑 A",
ext="flac",
file_size_bytes=2048,
file_size="0.00 MB",
raw_data={"quality": "lossless", "search": {"ar": [{"name": "歌手甲"}, {"name": "歌手乙"}]}},
),
SimpleNamespace(
source="NeteaseMusicClient",
identifier="102",
song_name="第二首歌",
singers="歌手乙 / 歌手丙",
album="专辑 B",
ext="mp3",
file_size_bytes=1024,
file_size="0.00 MB",
raw_data={"quality": "standard", "search": {}},
),
],
)
self.assertEqual(2, repo.count_rows("songs"))
self.assertEqual(2, repo.count_rows("playlist_songs"))
self.assertEqual(1, repo.count_rows("artist_pools"))
self.assertEqual(3, repo.count_rows("artists"))
self.assertEqual(3, repo.count_rows("pool_artists"))
self.assertEqual(4, repo.count_rows("artist_songs"))
self.assertIsInstance(artist_pool_id, int)
def test_download_planner_skips_song_with_existing_local_file(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.catalogsync.downloader import DownloadPlanner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
song_a_id = repo.upsert_song(
CatalogSong(platform="qq", remote_song_id="song-a", name="已下载歌曲", ext="flac", file_size_bytes=100)
)
song_b_id = repo.upsert_song(
CatalogSong(platform="qq", remote_song_id="song-b", name="待下载歌曲", ext="mp3", file_size_bytes=80)
)
backend_id = repo.get_default_backend_id()
repo.record_local_file(
song_id=song_a_id,
backend_id=backend_id,
relative_path="qq/已下载歌曲.flac",
file_size_bytes=100,
ext="flac",
quality_label="lossless",
)
planner = DownloadPlanner(repository=repo)
pending = planner.build_download_queue()
self.assertEqual([song_b_id], [item["song_id"] for item in pending])
def test_download_planner_can_limit_queue_to_specific_playlists(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import DownloadPlanner
from musicdl.catalogsync.models import CatalogSong, PlaylistCandidate
from musicdl.catalogsync.repository import CatalogRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = CatalogRepository(db_path)
playlist_a = repo.upsert_playlist(
PlaylistCandidate(
platform="qq",
pool_kind="manual_file",
remote_id="7707261125",
name="Playlist A",
url="https://y.qq.com/n/ryqq/playlist/7707261125",
)
)
playlist_b = repo.upsert_playlist(
PlaylistCandidate(
platform="qq",
pool_kind="manual_file",
remote_id="7578943835",
name="Playlist B",
url="https://y.qq.com/n/ryqq/playlist/7578943835",
)
)
song_a = repo.upsert_song(CatalogSong(platform="qq", remote_song_id="song-a", name="A"))
song_b = repo.upsert_song(CatalogSong(platform="qq", remote_song_id="song-b", name="B"))
repo.link_playlist_song(playlist_a, song_a, 1)
repo.link_playlist_song(playlist_b, song_b, 1)
planner = DownloadPlanner(repo)
queue = planner.build_download_queue(playlist_ids=[playlist_a])
self.assertEqual([song_a], [item["song_id"] for item in queue])
def test_catalog_downloader_records_alternate_backend_when_space_switches_root(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
class FakeClient:
def download(self, song_infos, num_threadings=1, auto_supplement_song=False):
save_path = Path(song_infos[0].work_dir) / "song-c.mp3"
save_path.parent.mkdir(parents=True, exist_ok=True)
save_path.write_bytes(b"fake-audio")
return [SimpleNamespace(save_path=str(save_path))]
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library-a"
alternate_root = Path(tmpdir) / "library-b"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
song_id = repo.upsert_song(
CatalogSong(
platform="qq",
remote_song_id="song-c",
name="闈炲父缁х画涓嬭浇",
ext="mp3",
file_size_bytes=80,
quality_label="standard",
metadata={"snapshot": {"identifier": "song-c"}},
)
)
downloader = CatalogDownloader(repository=repo)
with patch(
"musicdl.catalogsync.downloader.deserialize_song_info",
return_value=SimpleNamespace(singers="Singer A / Singer B"),
):
with patch.object(downloader, "ensure_space", return_value=alternate_root):
with patch.object(downloader, "get_client", return_value=FakeClient()):
downloaded_count = downloader.download_pending(library_root=library_root, limit=1)
location = repo._fetchone(
"""
SELECT sb.base_path, sb.name, fl.locator, fl.absolute_path
FROM file_locations fl
JOIN file_assets fa ON fa.id = fl.file_asset_id
JOIN storage_backends sb ON sb.id = fl.backend_id
WHERE fa.song_id = ?
ORDER BY fl.id DESC
LIMIT 1
""",
(song_id,),
)
self.assertEqual(1, downloaded_count)
self.assertEqual(str(alternate_root.resolve()), location["base_path"])
self.assertTrue(str(location["absolute_path"]).startswith(str(alternate_root.resolve())))
self.assertEqual("qq/Singer A/song-c.mp3", location["locator"])
def test_catalog_downloader_prompts_once_and_reuses_new_root_with_multiple_workers(self):
from collections import namedtuple
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
class FakeClient:
def download(self, song_infos, num_threadings=1, auto_supplement_song=False):
save_path = Path(song_infos[0].work_dir) / f"{song_infos[0].identifier}.mp3"
save_path.parent.mkdir(parents=True, exist_ok=True)
save_path.write_bytes(b"fake-audio")
return [SimpleNamespace(save_path=str(save_path))]
DiskUsage = namedtuple("DiskUsage", ["total", "used", "free"])
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library-a"
alternate_root = Path(tmpdir) / "library-b"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
repo.upsert_song(
CatalogSong(
platform="qq",
remote_song_id="song-a",
name="Song A",
singers="Singer A / Singer B",
ext="mp3",
file_size_bytes=80,
metadata={"snapshot": {"identifier": "song-a"}},
)
)
repo.upsert_song(
CatalogSong(
platform="qq",
remote_song_id="song-b",
name="Song B",
singers="Singer A / Singer B",
ext="mp3",
file_size_bytes=80,
metadata={"snapshot": {"identifier": "song-b"}},
)
)
downloader = CatalogDownloader(repository=repo, worker_count=2)
def fake_disk_usage(path):
resolved = Path(path).resolve()
if resolved == library_root.resolve():
return DiskUsage(total=1000, used=960, free=40)
return DiskUsage(total=1000, used=100, free=900)
with patch(
"musicdl.catalogsync.downloader.deserialize_song_info",
side_effect=[
SimpleNamespace(identifier="song-a", singers="Singer A / Singer B"),
SimpleNamespace(identifier="song-b", singers="Singer A / Singer B"),
],
):
with patch.object(downloader, "get_client", return_value=FakeClient()):
with patch("musicdl.catalogsync.downloader.shutil.disk_usage", side_effect=fake_disk_usage):
with patch("builtins.input", return_value=str(alternate_root)) as mocked_input:
downloaded_count = downloader.download_pending(library_root=library_root, limit=2)
locations = repo._fetchall(
"""
SELECT absolute_path
FROM file_locations
ORDER BY id ASC
"""
)
self.assertEqual(2, downloaded_count)
self.assertEqual(1, mocked_input.call_count)
self.assertEqual(2, len(locations))
self.assertTrue(all(str(row["absolute_path"]).startswith(str(alternate_root.resolve())) for row in locations))
def test_catalog_downloader_records_platform_first_artist_locator(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
class FakeClient:
def download(self, song_infos, num_threadings=1, auto_supplement_song=False):
save_path = Path(song_infos[0].work_dir) / "song-c.mp3"
save_path.parent.mkdir(parents=True, exist_ok=True)
save_path.write_bytes(b"fake-audio")
return [SimpleNamespace(save_path=str(save_path))]
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
repo.upsert_song(
CatalogSong(
platform="qq",
remote_song_id="song-c",
name="Song C",
singers="Singer A / Singer B",
ext="mp3",
file_size_bytes=80,
metadata={"snapshot": {"identifier": "song-c"}},
)
)
downloader = CatalogDownloader(repository=repo)
with patch(
"musicdl.catalogsync.downloader.deserialize_song_info",
return_value=SimpleNamespace(singers="Singer A / Singer B"),
):
with patch.object(downloader, "get_client", return_value=FakeClient()):
downloader.download_pending(library_root=library_root, limit=1)
location = repo._fetchone("SELECT locator FROM file_locations ORDER BY id DESC LIMIT 1")
self.assertEqual("qq/Singer A/song-c.mp3", location["locator"])
def test_catalog_downloader_refreshes_song_info_before_download(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
class FakeClient:
def __init__(self):
self.downloaded_identifiers = []
def _parsewithofficialapiv1(self, search_result, request_overrides=None, song_info_flac=None):
return SimpleNamespace(
identifier="fresh-song",
singers="Singer A / Singer B",
song_name="Fresh Song",
ext="mp3",
with_valid_download_url=True,
)
def download(self, song_infos, num_threadings=1, request_overrides=None, auto_supplement_song=False):
self.downloaded_identifiers.append(song_infos[0].identifier)
save_path = Path(song_infos[0].work_dir) / f"{song_infos[0].identifier}.mp3"
save_path.parent.mkdir(parents=True, exist_ok=True)
save_path.write_bytes(b"fresh-audio")
return [SimpleNamespace(save_path=str(save_path))]
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
repo.upsert_song(
CatalogSong(
platform="qq",
remote_song_id="song-c",
name="Song C",
singers="Singer A / Singer B",
ext="mp3",
file_size_bytes=80,
metadata={"snapshot": {"identifier": "song-c"}},
)
)
downloader = CatalogDownloader(repository=repo)
fake_client = FakeClient()
stale_song_info = SimpleNamespace(
identifier="song-c",
singers="Singer A / Singer B",
song_name="Song C",
raw_data={"search": {"songmid": "song-c"}},
with_valid_download_url=False,
)
with patch(
"musicdl.catalogsync.downloader.deserialize_song_info",
return_value=stale_song_info,
):
with patch.object(downloader, "get_client", return_value=fake_client):
downloaded_count = downloader.download_pending(library_root=library_root, limit=1)
location = repo._fetchone("SELECT locator FROM file_locations ORDER BY id DESC LIMIT 1")
self.assertEqual(1, downloaded_count)
self.assertEqual(["fresh-song"], fake_client.downloaded_identifiers)
self.assertEqual("qq/Singer A/fresh-song.mp3", location["locator"])
def test_catalog_downloader_uses_resolved_cross_platform_source_for_download(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.modules.utils.data import SongInfo
class FakeClient:
def __init__(self):
self.downloaded_identifiers = []
def download(self, song_infos, num_threadings=1, auto_supplement_song=False, request_overrides=None):
self.downloaded_identifiers.append(song_infos[0].identifier)
save_path = Path(song_infos[0].work_dir) / f"{song_infos[0].identifier}.{song_infos[0].ext}"
save_path.parent.mkdir(parents=True, exist_ok=True)
save_path.write_bytes(b"fake-audio")
return [SimpleNamespace(save_path=str(save_path))]
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
repo.upsert_song(
CatalogSong(
platform="netease",
remote_song_id="song-c",
name="Song C",
singers="Singer A / Singer B",
ext="mp3",
file_size_bytes=1024,
quality_label="standard",
metadata={"snapshot": {"identifier": "song-c"}},
)
)
downloader = CatalogDownloader(repository=repo)
netease_client = FakeClient()
qq_client = FakeClient()
stale_song_info = SongInfo(
source="NeteaseMusicClient",
identifier="song-c",
song_name="Song C",
singers="Singer A / Singer B",
ext="mp3",
raw_data={"search": {"id": "song-c"}},
download_url=None,
download_url_status={},
)
resolved_song_info = SongInfo(
source="QQMusicClient",
identifier="qq-song-c",
song_name="Song C",
singers="Singer A / Singer B",
ext="flac",
file_size_bytes=4096,
file_size="4.00 MB",
raw_data={"quality": "lossless"},
download_url="https://example.com/song-c.flac",
download_url_status={"ok": True},
)
with patch(
"musicdl.catalogsync.downloader.deserialize_song_info",
return_value=stale_song_info,
):
with patch.object(
downloader,
"resolve_song_info_for_download",
return_value=resolved_song_info,
) as resolver:
with patch.object(
downloader,
"get_client",
side_effect=lambda platform: {
"netease": netease_client,
"qq": qq_client,
}[platform],
):
downloaded_count = downloader.download_pending(
library_root=library_root,
limit=1,
download_sources=["qq", "netease"],
)
location = repo._fetchone("SELECT locator FROM file_locations ORDER BY id DESC LIMIT 1")
asset = repo._fetchone("SELECT ext, quality_label FROM file_assets ORDER BY id DESC LIMIT 1")
self.assertEqual(1, downloaded_count)
self.assertEqual([], netease_client.downloaded_identifiers)
self.assertEqual(["qq-song-c"], qq_client.downloaded_identifiers)
self.assertEqual(["qq", "netease"], resolver.call_args.kwargs["download_sources"])
self.assertEqual("qq/Singer A/qq-song-c.flac", location["locator"])
self.assertEqual("flac", asset["ext"])
self.assertEqual("lossless", asset["quality_label"])
def test_catalog_downloader_falls_back_to_db_singers_when_snapshot_singers_is_null_text(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
class FakeClient:
def download(self, song_infos, num_threadings=1, auto_supplement_song=False):
save_path = Path(song_infos[0].work_dir) / "song-null.mp3"
save_path.parent.mkdir(parents=True, exist_ok=True)
save_path.write_bytes(b"fake-audio")
return [SimpleNamespace(save_path=str(save_path))]
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
repo.upsert_song(
CatalogSong(
platform="qq",
remote_song_id="song-null",
name="Song Null",
singers="Singer A / Singer B",
ext="mp3",
file_size_bytes=80,
metadata={"snapshot": {"identifier": "song-null"}},
)
)
downloader = CatalogDownloader(repository=repo)
with patch(
"musicdl.catalogsync.downloader.deserialize_song_info",
return_value=SimpleNamespace(singers="NULL"),
):
with patch.object(downloader, "get_client", return_value=FakeClient()):
downloader.download_pending(library_root=library_root, limit=1)
location = repo._fetchone("SELECT locator FROM file_locations ORDER BY id DESC LIMIT 1")
self.assertEqual("qq/Singer A/song-null.mp3", location["locator"])
def test_catalog_downloader_falls_back_to_db_singers_when_snapshot_singers_is_non_string(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
class FakeClient:
def download(self, song_infos, num_threadings=1, auto_supplement_song=False):
save_path = Path(song_infos[0].work_dir) / "song-non-string.mp3"
save_path.parent.mkdir(parents=True, exist_ok=True)
save_path.write_bytes(b"fake-audio")
return [SimpleNamespace(save_path=str(save_path))]
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
repo.upsert_song(
CatalogSong(
platform="qq",
remote_song_id="song-non-string",
name="Song Non String",
singers="Singer A / Singer B",
ext="mp3",
file_size_bytes=80,
metadata={"snapshot": {"identifier": "song-non-string"}},
)
)
downloader = CatalogDownloader(repository=repo)
with patch(
"musicdl.catalogsync.downloader.deserialize_song_info",
return_value=SimpleNamespace(singers=123),
):
with patch.object(downloader, "get_client", return_value=FakeClient()):
downloader.download_pending(library_root=library_root, limit=1)
location = repo._fetchone("SELECT locator FROM file_locations ORDER BY id DESC LIMIT 1")
self.assertEqual("qq/Singer A/song-non-string.mp3", location["locator"])
def test_catalog_downloader_falls_back_to_db_singers_when_snapshot_singers_is_falsy_non_string(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
class FakeClient:
def download(self, song_infos, num_threadings=1, auto_supplement_song=False):
save_path = Path(song_infos[0].work_dir) / "song-falsy-non-string.mp3"
save_path.parent.mkdir(parents=True, exist_ok=True)
save_path.write_bytes(b"fake-audio")
return [SimpleNamespace(save_path=str(save_path))]
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
repo.upsert_song(
CatalogSong(
platform="qq",
remote_song_id="song-falsy-non-string",
name="Song Falsy Non String",
singers="Singer A / Singer B",
ext="mp3",
file_size_bytes=80,
metadata={"snapshot": {"identifier": "song-falsy-non-string"}},
)
)
downloader = CatalogDownloader(repository=repo)
with patch(
"musicdl.catalogsync.downloader.deserialize_song_info",
return_value=SimpleNamespace(singers=0),
):
with patch.object(downloader, "get_client", return_value=FakeClient()):
downloader.download_pending(library_root=library_root, limit=1)
location = repo._fetchone("SELECT locator FROM file_locations ORDER BY id DESC LIMIT 1")
self.assertEqual("qq/Singer A/song-falsy-non-string.mp3", location["locator"])
def test_catalog_downloader_rebuilds_save_path_after_switching_work_dir(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.modules.utils.data import SongInfo
class FakeClient:
def download(self, song_infos, num_threadings=1, auto_supplement_song=False):
save_path = Path(song_infos[0].save_path)
save_path.parent.mkdir(parents=True, exist_ok=True)
save_path.write_bytes(b"fake-audio")
return [SimpleNamespace(save_path=str(save_path))]
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
repo.upsert_song(
CatalogSong(
platform="qq",
remote_song_id="song-c",
name="Song C",
singers="Singer A / Singer B",
ext="mp3",
file_size_bytes=80,
metadata={"snapshot": {"identifier": "song-c"}},
)
)
downloader = CatalogDownloader(repository=repo)
stale_path = Path(tmpdir) / "legacy" / "song-c.mp3"
song_info = SongInfo(
source="QQMusicClient",
identifier="song-c",
song_name="Song C",
singers="Singer A / Singer B",
ext="mp3",
work_dir=str(stale_path.parent),
)
song_info._save_path = str(stale_path)
with patch("musicdl.catalogsync.downloader.deserialize_song_info", return_value=song_info):
with patch.object(downloader, "get_client", return_value=FakeClient()):
downloaded_count = downloader.download_pending(library_root=library_root, limit=1)
location = repo._fetchone("SELECT locator, absolute_path FROM file_locations ORDER BY id DESC LIMIT 1")
self.assertEqual(1, downloaded_count)
self.assertTrue(str(location["locator"]).startswith("qq/Singer A/"))
self.assertTrue(str(location["absolute_path"]).startswith(str(library_root.resolve())))
def test_catalog_downloader_uses_unknown_artist_fallback_directory(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
class FakeClient:
def download(self, song_infos, num_threadings=1, auto_supplement_song=False):
save_path = Path(song_infos[0].work_dir) / "song-a.flac"
save_path.parent.mkdir(parents=True, exist_ok=True)
save_path.write_bytes(b"fake-audio")
return [SimpleNamespace(save_path=str(save_path))]
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
repo.upsert_song(
CatalogSong(
platform="netease",
remote_song_id="song-a",
name="Song A",
singers="NULL",
ext="flac",
file_size_bytes=100,
metadata={"snapshot": {"identifier": "song-a"}},
)
)
downloader = CatalogDownloader(repository=repo)
with patch(
"musicdl.catalogsync.downloader.deserialize_song_info",
return_value=SimpleNamespace(singers="NULL"),
):
with patch.object(downloader, "get_client", return_value=FakeClient()):
downloader.download_pending(library_root=library_root, limit=1)
location = repo._fetchone("SELECT locator FROM file_locations ORDER BY id DESC LIMIT 1")
self.assertEqual("netease/Unknown Artist/song-a.flac", location["locator"])
def test_catalog_downloader_reports_real_time_progress_to_worker_callback(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.modules.utils.data import SongInfo
class SlowWritingClient:
def download(self, song_infos, num_threadings=1, request_overrides=None, auto_supplement_song=False):
song_info = song_infos[0]
save_path = Path(song_info.save_path)
save_path.parent.mkdir(parents=True, exist_ok=True)
with open(save_path, "wb") as file_obj:
for chunk in (b"a" * 8, b"b" * 8, b"c" * 8):
file_obj.write(chunk)
file_obj.flush()
time.sleep(0.05)
return [SimpleNamespace(save_path=str(save_path))]
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
song_id = repo.upsert_song(
CatalogSong(
platform="qq",
remote_song_id="song-progress",
name="Song Progress",
singers="Singer Progress",
ext="mp3",
file_size_bytes=24,
metadata={"snapshot": {"identifier": "song-progress"}},
)
)
downloader = CatalogDownloader(repository=repo)
song_info = SongInfo(
source="QQMusicClient",
identifier="song-progress",
song_name="Song Progress",
singers="Singer Progress",
ext="mp3",
file_size_bytes=24,
download_url="https://example.com/song-progress.mp3",
download_url_status={"ok": True},
)
worker_updates: list[dict[str, object]] = []
with patch("musicdl.catalogsync.downloader.deserialize_song_info", return_value=song_info):
with patch.object(downloader, "get_client", return_value=SlowWritingClient()):
ok = downloader.download_song_row(
row={
"id": song_id,
"playlist_id": 123,
"platform": "qq",
"name": "Song Progress",
"singers": "Singer Progress",
"ext": "mp3",
"file_size_bytes": 24,
"metadata_json": '{"snapshot":{"identifier":"song-progress"}}',
},
library_root=library_root,
worker_callback=lambda **state: worker_updates.append(dict(state)),
)
self.assertTrue(ok)
speed_updates = [
state
for state in worker_updates
if float(state.get("speed_bytes_per_sec") or 0) > 0 and int(state.get("downloaded_bytes") or 0) > 0
]
self.assertTrue(speed_updates, worker_updates)
self.assertEqual(123, speed_updates[-1]["current_playlist_id"])
self.assertEqual(song_id, speed_updates[-1]["current_song_id"])
self.assertGreaterEqual(int(speed_updates[-1]["progress_percent"] or 0), 33)
self.assertIn("MB", str(speed_updates[-1].get("last_progress_text") or ""))
def test_catalog_downloader_reports_progress_when_client_switches_save_path_during_download(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.modules.utils.data import SongInfo
from musicdl.modules.utils.misc import shortenpathsinsonginfos
long_song_name = f"Song Progress {'X' * 220}"
switched_paths: list[tuple[str, str]] = []
class PathSwitchingClient:
def download(self, song_infos, num_threadings=1, request_overrides=None, auto_supplement_song=False):
song_info = song_infos[0]
original_path = str(song_info.save_path)
shortenpathsinsonginfos(song_infos)
switched_path = Path(song_info.save_path)
switched_paths.append((original_path, str(switched_path)))
switched_path.parent.mkdir(parents=True, exist_ok=True)
with open(switched_path, "wb") as file_obj:
for chunk in (b"a" * 8, b"b" * 8, b"c" * 8):
file_obj.write(chunk)
file_obj.flush()
time.sleep(0.05)
return [SimpleNamespace(save_path=str(switched_path))]
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
song_id = repo.upsert_song(
CatalogSong(
platform="qq",
remote_song_id="song-progress-switched",
name=long_song_name,
singers="Singer Progress",
ext="mp3",
file_size_bytes=24,
metadata={"snapshot": {"identifier": "song-progress-switched"}},
)
)
downloader = CatalogDownloader(repository=repo)
song_info = SongInfo(
source="QQMusicClient",
identifier="song-progress-switched",
song_name=long_song_name,
singers="Singer Progress",
ext="mp3",
file_size_bytes=24,
download_url="https://example.com/song-progress-switched.mp3",
download_url_status={"ok": True},
)
self.assertGreater(len(song_info.save_path), 240)
song_info._save_path = None
worker_updates: list[dict[str, object]] = []
with patch("musicdl.catalogsync.downloader.deserialize_song_info", return_value=song_info):
with patch.object(downloader, "get_client", return_value=PathSwitchingClient()):
ok = downloader.download_song_row(
row={
"id": song_id,
"playlist_id": 456,
"platform": "qq",
"name": long_song_name,
"singers": "Singer Progress",
"ext": "mp3",
"file_size_bytes": 24,
"metadata_json": '{"snapshot":{"identifier":"song-progress-switched"}}',
},
library_root=library_root,
worker_callback=lambda **state: worker_updates.append(dict(state)),
)
self.assertTrue(ok)
self.assertTrue(switched_paths)
self.assertLessEqual(len(switched_paths[0][1]), 240)
speed_updates = [
state
for state in worker_updates
if float(state.get("speed_bytes_per_sec") or 0) > 0 and int(state.get("downloaded_bytes") or 0) > 0
]
self.assertTrue(speed_updates, worker_updates)
self.assertEqual(456, speed_updates[-1]["current_playlist_id"])
self.assertEqual(song_id, speed_updates[-1]["current_song_id"])
self.assertGreaterEqual(int(speed_updates[-1]["progress_percent"] or 0), 33)
def test_catalog_downloader_reports_resolve_activity_to_worker_callback(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.modules.utils.data import SongInfo
class FakeClient:
def download(self, song_infos, num_threadings=1, request_overrides=None, auto_supplement_song=False):
save_path = Path(song_infos[0].save_path)
save_path.parent.mkdir(parents=True, exist_ok=True)
save_path.write_bytes(b"fake-audio")
return [SimpleNamespace(save_path=str(save_path))]
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
song_id = repo.upsert_song(
CatalogSong(
platform="netease",
remote_song_id="song-resolve",
name="Song Resolve",
singers="Singer Resolve",
ext="mp3",
file_size_bytes=10,
metadata={"snapshot": {"identifier": "song-resolve"}},
)
)
downloader = CatalogDownloader(repository=repo)
snapshot_song = SongInfo(
source="NeteaseMusicClient",
identifier="song-resolve",
song_name="Song Resolve",
singers="Singer Resolve",
ext="mp3",
download_url=None,
download_url_status={},
)
resolved_song = SongInfo(
source="QQMusicClient",
identifier="qq-song-resolve",
song_name="Song Resolve",
singers="Singer Resolve",
ext="mp3",
download_url="https://example.com/song-resolve.mp3",
download_url_status={"ok": True},
)
worker_updates: list[dict[str, object]] = []
def fake_resolve_song_info(*, row, snapshot_song_info, download_sources=None, progress_callback=None):
if progress_callback is not None:
progress_callback("resolving source qq (1/2)")
progress_callback("resolving source kuwo (2/2)")
return resolved_song
with patch("musicdl.catalogsync.downloader.deserialize_song_info", return_value=snapshot_song):
with patch.object(downloader, "get_client", return_value=FakeClient()):
with patch.object(downloader._resolver, "resolve_song_info", side_effect=fake_resolve_song_info):
ok = downloader.download_song_row(
row={
"id": song_id,
"playlist_id": 789,
"platform": "netease",
"name": "Song Resolve",
"singers": "Singer Resolve",
"ext": "mp3",
"file_size_bytes": 10,
"metadata_json": '{"snapshot":{"identifier":"song-resolve"}}',
},
library_root=library_root,
worker_callback=lambda **state: worker_updates.append(dict(state)),
)
self.assertTrue(ok)
resolve_updates = [
state for state in worker_updates if "resolving source" in str(state.get("last_progress_text") or "")
]
self.assertEqual(
["resolving source qq (1/2)", "resolving source kuwo (2/2)"],
[str(state["last_progress_text"]) for state in resolve_updates],
)
self.assertTrue(any(int(state.get("downloaded_bytes") or 0) > 0 for state in worker_updates))
def test_catalog_downloader_resolve_song_row_returns_resolved_payload(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.modules.utils.data import SongInfo
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
song_id = repo.upsert_song(
CatalogSong(
platform="netease",
remote_song_id="song-resolve-api",
name="Song Resolve",
singers="Singer Resolve",
ext="mp3",
file_size_bytes=10,
metadata={"snapshot": {"identifier": "song-resolve-api"}},
)
)
downloader = CatalogDownloader(repository=repo)
snapshot_song = SongInfo(
source="NeteaseMusicClient",
identifier="song-resolve-api",
song_name="Song Resolve",
singers="Singer Resolve",
ext="mp3",
download_url=None,
download_url_status={},
)
resolved_song = SongInfo(
source="QQMusicClient",
identifier="qq-song-resolve-api",
song_name="Song Resolve",
singers="Singer Resolve",
ext="mp3",
download_url="https://example.com/song-resolve-api.mp3",
download_url_status={"ok": True},
)
row = {
"id": song_id,
"playlist_id": 789,
"platform": "netease",
"name": "Song Resolve",
"singers": "Singer Resolve",
"ext": "mp3",
"file_size_bytes": 10,
"metadata_json": '{"snapshot":{"identifier":"song-resolve-api"}}',
}
with patch("musicdl.catalogsync.downloader.deserialize_song_info", return_value=snapshot_song):
with patch.object(downloader._resolver, "resolve_song_info", return_value=resolved_song):
payload = downloader.resolve_song_row(
row=row,
library_root=library_root,
download_sources=["qq", "kuwo"],
)
self.assertIsNotNone(payload)
self.assertEqual(song_id, payload.row["id"])
self.assertEqual("Song Resolve / Singer Resolve", payload.display_text)
self.assertEqual("QQMusicClient", payload.resolved_song_info.source)
def test_catalog_downloader_download_resolved_song_reports_progress_and_records_file(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.modules.utils.data import SongInfo
class SlowWritingClient:
def download(self, song_infos, num_threadings=1, request_overrides=None, auto_supplement_song=False):
song_info = song_infos[0]
save_path = Path(song_info.save_path)
save_path.parent.mkdir(parents=True, exist_ok=True)
with open(save_path, "wb") as file_obj:
for chunk in (b"a" * 8, b"b" * 8, b"c" * 8):
file_obj.write(chunk)
file_obj.flush()
time.sleep(0.05)
return [SimpleNamespace(save_path=str(save_path))]
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
song_id = repo.upsert_song(
CatalogSong(
platform="netease",
remote_song_id="song-resolved-download",
name="Song Resolved Download",
singers="Singer Resolved",
ext="mp3",
file_size_bytes=24,
metadata={"snapshot": {"identifier": "song-resolved-download"}},
)
)
downloader = CatalogDownloader(repository=repo)
snapshot_song = SongInfo(
source="NeteaseMusicClient",
identifier="song-resolved-download",
song_name="Song Resolved Download",
singers="Singer Resolved",
ext="mp3",
download_url=None,
download_url_status={},
)
resolved_song = SongInfo(
source="QQMusicClient",
identifier="qq-song-resolved-download",
song_name="Song Resolved Download",
singers="Singer Resolved",
ext="mp3",
file_size_bytes=24,
download_url="https://example.com/song-resolved-download.mp3",
download_url_status={"ok": True},
)
row = {
"id": song_id,
"playlist_id": 123,
"platform": "netease",
"name": "Song Resolved Download",
"singers": "Singer Resolved",
"ext": "mp3",
"file_size_bytes": 24,
"metadata_json": '{"snapshot":{"identifier":"song-resolved-download"}}',
}
worker_updates: list[dict[str, object]] = []
with patch("musicdl.catalogsync.downloader.deserialize_song_info", return_value=snapshot_song):
with patch.object(downloader, "get_client", return_value=SlowWritingClient()):
with patch.object(downloader._resolver, "resolve_song_info", return_value=resolved_song):
resolved_payload = downloader.resolve_song_row(
row=row,
library_root=library_root,
download_sources=["qq", "kuwo"],
worker_callback=lambda **state: worker_updates.append(dict(state)),
)
self.assertIsNotNone(resolved_payload)
ok = downloader.download_resolved_song(
resolved_payload=resolved_payload,
worker_callback=lambda **state: worker_updates.append(dict(state)),
)
self.assertTrue(ok)
self.assertTrue(any(int(state.get("downloaded_bytes") or 0) > 0 for state in worker_updates), worker_updates)
self.assertTrue(repo.song_has_active_local_file(song_id))
def test_catalog_downloader_download_song_row_remains_a_compatibility_wrapper(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.modules.utils.data import SongInfo
class FakeClient:
def download(self, song_infos, num_threadings=1, request_overrides=None, auto_supplement_song=False):
save_path = Path(song_infos[0].save_path)
save_path.parent.mkdir(parents=True, exist_ok=True)
save_path.write_bytes(b"fake-audio")
return [SimpleNamespace(save_path=str(save_path))]
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
song_id = repo.upsert_song(
CatalogSong(
platform="netease",
remote_song_id="song-wrapper",
name="Song Wrapper",
singers="Singer Wrapper",
ext="mp3",
file_size_bytes=10,
metadata={"snapshot": {"identifier": "song-wrapper"}},
)
)
downloader = CatalogDownloader(repository=repo)
snapshot_song = SongInfo(
source="NeteaseMusicClient",
identifier="song-wrapper",
song_name="Song Wrapper",
singers="Singer Wrapper",
ext="mp3",
download_url=None,
download_url_status={},
)
resolved_song = SongInfo(
source="QQMusicClient",
identifier="qq-song-wrapper",
song_name="Song Wrapper",
singers="Singer Wrapper",
ext="mp3",
download_url="https://example.com/song-wrapper.mp3",
download_url_status={"ok": True},
)
row = {
"id": song_id,
"playlist_id": 456,
"platform": "netease",
"name": "Song Wrapper",
"singers": "Singer Wrapper",
"ext": "mp3",
"file_size_bytes": 10,
"metadata_json": '{"snapshot":{"identifier":"song-wrapper"}}',
}
worker_updates: list[dict[str, object]] = []
def fake_resolve_song_info(*, row, snapshot_song_info, download_sources=None, progress_callback=None):
if progress_callback is not None:
progress_callback("resolving source qq (1/1)")
return resolved_song
with patch("musicdl.catalogsync.downloader.deserialize_song_info", return_value=snapshot_song):
with patch.object(downloader, "get_client", return_value=FakeClient()):
with patch.object(downloader._resolver, "resolve_song_info", side_effect=fake_resolve_song_info):
ok = downloader.download_song_row(
row=row,
library_root=library_root,
worker_callback=lambda **state: worker_updates.append(dict(state)),
)
self.assertTrue(ok)
self.assertTrue(
any("resolving source" in str(state.get("last_progress_text") or "") for state in worker_updates),
worker_updates,
)
self.assertTrue(
any("starting download via" in str(state.get("last_progress_text") or "") for state in worker_updates),
worker_updates,
)
def test_catalog_downloader_download_resolved_song_saves_lyrics_file(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.modules.utils.data import SongInfo
class FakeClient:
def download(self, song_infos, num_threadings=1, request_overrides=None, auto_supplement_song=False):
save_path = Path(song_infos[0].save_path)
save_path.parent.mkdir(parents=True, exist_ok=True)
save_path.write_bytes(b"fake-audio")
return [SimpleNamespace(save_path=str(save_path))]
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
song_id = repo.upsert_song(
CatalogSong(
platform="netease",
remote_song_id="song-lyrics-save",
name="Song Lyrics Save",
singers="Singer Lyrics",
ext="mp3",
file_size_bytes=10,
metadata={"snapshot": {"identifier": "song-lyrics-save"}},
)
)
downloader = CatalogDownloader(repository=repo)
snapshot_song = SongInfo(
source="NeteaseMusicClient",
identifier="song-lyrics-save",
song_name="Song Lyrics Save",
singers="Singer Lyrics",
ext="mp3",
download_url=None,
download_url_status={},
)
resolved_song = SongInfo(
source="QQMusicClient",
identifier="qq-song-lyrics-save",
song_name="Song Lyrics Save",
singers="Singer Lyrics",
ext="mp3",
lyric="[00:00.00]hello world",
download_url="https://example.com/song-lyrics-save.mp3",
download_url_status={"ok": True},
)
row = {
"id": song_id,
"playlist_id": 123,
"platform": "netease",
"name": "Song Lyrics Save",
"singers": "Singer Lyrics",
"ext": "mp3",
"file_size_bytes": 10,
"metadata_json": '{"snapshot":{"identifier":"song-lyrics-save"}}',
}
with patch("musicdl.catalogsync.downloader.deserialize_song_info", return_value=snapshot_song):
with patch.object(downloader, "get_client", return_value=FakeClient()):
with patch.object(downloader._resolver, "resolve_song_info", return_value=resolved_song):
resolved_payload = downloader.resolve_song_row(row=row, library_root=library_root)
self.assertIsNotNone(resolved_payload)
ok = downloader.download_resolved_song(resolved_payload=resolved_payload)
self.assertTrue(ok)
lrc_path = Path(resolved_payload.resolved_song_info.save_path).with_suffix(".lrc")
self.assertTrue(lrc_path.exists())
self.assertEqual("[00:00.00]hello world\n", lrc_path.read_text(encoding="utf-8"))
def test_catalog_downloader_download_resolved_song_does_not_overwrite_existing_lyrics_by_default(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.modules.utils.data import SongInfo
class FakeClient:
def download(self, song_infos, num_threadings=1, request_overrides=None, auto_supplement_song=False):
save_path = Path(song_infos[0].save_path)
save_path.parent.mkdir(parents=True, exist_ok=True)
save_path.write_bytes(b"fake-audio")
return [SimpleNamespace(save_path=str(save_path))]
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
song_id = repo.upsert_song(
CatalogSong(
platform="netease",
remote_song_id="song-lyrics-overwrite",
name="Song Lyrics Overwrite",
singers="Singer Lyrics",
ext="mp3",
file_size_bytes=10,
metadata={"snapshot": {"identifier": "song-lyrics-overwrite"}},
)
)
downloader = CatalogDownloader(repository=repo)
snapshot_song = SongInfo(
source="NeteaseMusicClient",
identifier="song-lyrics-overwrite",
song_name="Song Lyrics Overwrite",
singers="Singer Lyrics",
ext="mp3",
download_url=None,
download_url_status={},
)
resolved_song = SongInfo(
source="QQMusicClient",
identifier="qq-song-lyrics-overwrite",
song_name="Song Lyrics Overwrite",
singers="Singer Lyrics",
ext="mp3",
lyric="[00:00.00]new lyric",
download_url="https://example.com/song-lyrics-overwrite.mp3",
download_url_status={"ok": True},
)
row = {
"id": song_id,
"playlist_id": 123,
"platform": "netease",
"name": "Song Lyrics Overwrite",
"singers": "Singer Lyrics",
"ext": "mp3",
"file_size_bytes": 10,
"metadata_json": '{"snapshot":{"identifier":"song-lyrics-overwrite"}}',
}
with patch("musicdl.catalogsync.downloader.deserialize_song_info", return_value=snapshot_song):
with patch.object(downloader, "get_client", return_value=FakeClient()):
with patch.object(downloader._resolver, "resolve_song_info", return_value=resolved_song):
resolved_payload = downloader.resolve_song_row(row=row, library_root=library_root)
self.assertIsNotNone(resolved_payload)
lrc_path = Path(resolved_payload.resolved_song_info.save_path).with_suffix(".lrc")
lrc_path.parent.mkdir(parents=True, exist_ok=True)
lrc_path.write_text("old lyric\n", encoding="utf-8")
ok = downloader.download_resolved_song(resolved_payload=resolved_payload)
self.assertTrue(ok)
self.assertEqual("old lyric\n", lrc_path.read_text(encoding="utf-8"))
def test_catalog_downloader_download_resolved_song_looks_up_lyrics_when_missing(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
from musicdl.modules.utils.data import SongInfo
class FakeClient:
def download(self, song_infos, num_threadings=1, request_overrides=None, auto_supplement_song=False):
save_path = Path(song_infos[0].save_path)
save_path.parent.mkdir(parents=True, exist_ok=True)
save_path.write_bytes(b"fake-audio")
return [SimpleNamespace(save_path=str(save_path))]
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
song_id = repo.upsert_song(
CatalogSong(
platform="netease",
remote_song_id="song-lyrics-search",
name="Song Lyrics Search",
singers="Singer Search",
ext="mp3",
file_size_bytes=10,
metadata={"snapshot": {"identifier": "song-lyrics-search"}},
)
)
downloader = CatalogDownloader(repository=repo)
snapshot_song = SongInfo(
source="NeteaseMusicClient",
identifier="song-lyrics-search",
song_name="Song Lyrics Search",
singers="Singer Search",
ext="mp3",
download_url=None,
download_url_status={},
)
resolved_song = SongInfo(
source="QQMusicClient",
identifier="qq-song-lyrics-search",
song_name="Song Lyrics Search",
singers="Singer Search",
ext="mp3",
lyric="NULL",
download_url="https://example.com/song-lyrics-search.mp3",
download_url_status={"ok": True},
)
row = {
"id": song_id,
"playlist_id": 123,
"platform": "netease",
"name": "Song Lyrics Search",
"singers": "Singer Search",
"ext": "mp3",
"file_size_bytes": 10,
"metadata_json": '{"snapshot":{"identifier":"song-lyrics-search"}}',
}
with patch("musicdl.catalogsync.downloader.deserialize_song_info", return_value=snapshot_song):
with patch.object(downloader, "get_client", return_value=FakeClient()):
with patch.object(downloader._resolver, "resolve_song_info", return_value=resolved_song):
with patch("musicdl.catalogsync.downloader.LyricSearchClient.search", return_value=({}, "[00:00.00]searched lyric")) as search_mock:
resolved_payload = downloader.resolve_song_row(row=row, library_root=library_root)
self.assertIsNotNone(resolved_payload)
ok = downloader.download_resolved_song(resolved_payload=resolved_payload)
self.assertTrue(ok)
search_mock.assert_called_once()
lrc_path = Path(resolved_payload.resolved_song_info.save_path).with_suffix(".lrc")
self.assertEqual("[00:00.00]searched lyric\n", lrc_path.read_text(encoding="utf-8"))
def test_catalog_downloader_sync_local_lyrics_updates_existing_downloaded_song_files(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
song_id = repo.upsert_song(
CatalogSong(
platform="netease",
remote_song_id="song-local-lyrics",
name="Song Local Lyrics",
singers="Singer Local",
ext="mp3",
file_size_bytes=10,
metadata={
"snapshot": {
"identifier": "song-local-lyrics",
"song_name": "Song Local Lyrics",
"singers": "Singer Local",
"ext": "mp3",
}
},
)
)
backend_id = repo.ensure_local_backend(library_root, name="default-local", is_default=True)
relative_path = "netease/Singer Local/Song Local Lyrics - song-local-lyrics.mp3"
absolute_path = library_root / relative_path
absolute_path.parent.mkdir(parents=True, exist_ok=True)
absolute_path.write_bytes(b"fake-audio")
repo.record_local_file(
song_id=song_id,
backend_id=backend_id,
relative_path=relative_path.replace("\\", "/"),
file_size_bytes=10,
ext="mp3",
quality_label=None,
)
downloader = CatalogDownloader(repository=repo)
with patch("musicdl.catalogsync.downloader.LyricSearchClient.search", return_value=({}, "[00:00.00]batch lyric")) as search_mock:
summary = downloader.sync_local_lyrics(limit=10, overwrite_lyrics=False)
self.assertEqual(1, summary["processed"])
self.assertEqual(1, summary["saved"])
self.assertEqual(0, summary["failed"])
search_mock.assert_called_once()
lrc_path = absolute_path.with_suffix(".lrc")
self.assertEqual("[00:00.00]batch lyric\n", lrc_path.read_text(encoding="utf-8"))
def test_catalog_downloader_sync_local_lyrics_does_not_hang_on_lyric_search_timeout(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
song_id = repo.upsert_song(
CatalogSong(
platform="netease",
remote_song_id="song-local-timeout",
name="Song Local Timeout",
singers="Singer Timeout",
ext="mp3",
file_size_bytes=10,
metadata={
"snapshot": {
"identifier": "song-local-timeout",
"song_name": "Song Local Timeout",
"singers": "Singer Timeout",
"ext": "mp3",
}
},
)
)
backend_id = repo.ensure_local_backend(library_root, name="default-local", is_default=True)
relative_path = "netease/Singer Timeout/Song Local Timeout - song-local-timeout.mp3"
absolute_path = library_root / relative_path
absolute_path.parent.mkdir(parents=True, exist_ok=True)
absolute_path.write_bytes(b"fake-audio")
repo.record_local_file(
song_id=song_id,
backend_id=backend_id,
relative_path=relative_path.replace("\\", "/"),
file_size_bytes=10,
ext="mp3",
quality_label=None,
)
downloader = CatalogDownloader(repository=repo)
def blocked_search(*args, **kwargs):
time.sleep(0.2)
return {}, "NULL"
with patch.object(downloader, "_lyric_search_timeout_seconds", 0.05):
with patch("musicdl.catalogsync.downloader.LyricSearchClient.search", side_effect=blocked_search):
summary = downloader.sync_local_lyrics(limit=10, overwrite_lyrics=False)
self.assertEqual(1, summary["processed"])
self.assertEqual(0, summary["saved"])
self.assertEqual(1, summary["skipped"])
self.assertEqual(0, summary["failed"])
self.assertFalse(absolute_path.with_suffix(".lrc").exists())
def test_catalog_downloader_sync_local_lyrics_runs_workers_concurrently_and_reports_progress(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.downloader import CatalogDownloader
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.repository import CatalogRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = CatalogRepository(db_path)
backend_id = repo.ensure_local_backend(library_root, name="default-local", is_default=True)
song_rows = []
for index in range(3):
identifier = f"song-local-progress-{index}"
song_id = repo.upsert_song(
CatalogSong(
platform="netease",
remote_song_id=identifier,
name=f"Song Local Progress {index}",
singers="Singer Progress",
ext="mp3",
file_size_bytes=10,
metadata={
"snapshot": {
"identifier": identifier,
"song_name": f"Song Local Progress {index}",
"singers": "Singer Progress",
"ext": "mp3",
}
},
)
)
relative_path = f"netease/Singer Progress/Song Local Progress {index} - {identifier}.mp3"
absolute_path = library_root / relative_path
absolute_path.parent.mkdir(parents=True, exist_ok=True)
absolute_path.write_bytes(b"fake-audio")
repo.record_local_file(
song_id=song_id,
backend_id=backend_id,
relative_path=relative_path.replace("\\", "/"),
file_size_bytes=10,
ext="mp3",
quality_label=None,
)
song_rows.append((song_id, absolute_path))
downloader = CatalogDownloader(repository=repo, worker_count=3)
active_workers = {"value": 0}
max_active_workers = {"value": 0}
state_lock = threading.Lock()
progress_updates: list[dict[str, object]] = []
def fake_sync(*, row, song_info, saved_path, overwrite_lyrics, worker_callback=None, display_text=None):
del song_info, saved_path, overwrite_lyrics, worker_callback, display_text
with state_lock:
active_workers["value"] += 1
max_active_workers["value"] = max(max_active_workers["value"], active_workers["value"])
time.sleep(0.05)
with state_lock:
active_workers["value"] -= 1
return "saved"
with patch.object(downloader, "_sync_lyrics_for_saved_song", side_effect=fake_sync):
summary = downloader.sync_local_lyrics(
limit=10,
overwrite_lyrics=False,
progress_callback=lambda **state: progress_updates.append(dict(state)),
)
self.assertEqual(3, summary["total"])
self.assertEqual(3, summary["processed"])
self.assertEqual(3, summary["saved"])
self.assertEqual(0, summary["skipped"])
self.assertEqual(0, summary["failed"])
self.assertGreaterEqual(max_active_workers["value"], 2)
self.assertGreaterEqual(len(progress_updates), 2)
self.assertEqual(0, progress_updates[0]["processed"])
self.assertEqual(0, progress_updates[0]["progress_percent"])
self.assertEqual(3, progress_updates[-1]["processed"])
self.assertEqual(100, progress_updates[-1]["progress_percent"])
if __name__ == "__main__":
unittest.main()