from __future__ import annotations import hashlib import inspect import logging import warnings from pathlib import Path from typing import Any, Callable, Dict import requests from urllib3.exceptions import InsecureRequestWarning from .collectors import KuwoCollector, NeteaseCollector, QQCollector, parse_kuwo_toplist_html from .deferred import ( build_kuwo_playlist_song_infos, build_kuwo_raw_track_song_infos, build_netease_playlist_song_infos, build_qq_playlist_song_infos, build_qq_raw_track_song_infos, ) from .models import CatalogSong, extract_artist_names from .playlist_artifacts import write_playlist_artifacts from .repository import CatalogRepository SOURCE_CLIENT_NAMES = { "netease": "NeteaseMusicClient", "qq": "QQMusicClient", "kuwo": "KuwoMusicClient", } SOURCE_POOL_NAMES = { "netease": {"playlist_square": "网易云歌单广场", "toplist": "网易云排行榜"}, "qq": {"playlist_square": "QQ 音乐歌单广场", "toplist": "QQ 音乐排行榜"}, "kuwo": {"playlist_square": "酷我歌单广场", "toplist": "酷我排行榜"}, } LOGGER = logging.getLogger(__name__) PLAYLIST_SQUARE_PAGE_SIZES = { "netease": 35, "qq": 30, "kuwo": 30, } PlaylistProgressCallback = Callable[[str, Dict[str, Any]], None] class CatalogSyncService: def __init__( self, repository: CatalogRepository, collectors: dict[str, object] | None = None, work_dir: str = "musicdl_outputs/catalogsync", playlists_root: str | Path | None = None, ): self.repository = repository self.collectors = collectors or { "netease": NeteaseCollector(), "qq": QQCollector(), "kuwo": KuwoCollector(), } self.work_dir = work_dir self.playlists_root = Path(playlists_root).resolve() if playlists_root else None self._clients: dict[str, object] = {} def get_client(self, platform: str): if platform not in self._clients: from musicdl.modules import BuildMusicClient self._clients[platform] = BuildMusicClient( { "type": SOURCE_CLIENT_NAMES[platform], "disable_print": True, "maintain_session": False, "work_dir": self.work_dir, "search_size_per_source": 1, "search_size_per_page": 1, "strict_limit_search_size_per_page": True, } ) return self._clients[platform] def store_playlist_candidates( self, platform: str, pool_kind: str, pool_name: str, candidates: list, pool_external_id: str | None = None, ) -> int: pool_id = self.repository.upsert_playlist_pool( platform=platform, pool_kind=pool_kind, external_id=pool_external_id or pool_kind, name=pool_name, ) for candidate in candidates: playlist_id = self.repository.upsert_playlist(candidate) self.repository.link_pool_playlist(pool_id, playlist_id) return pool_id def collect_playlists( self, sources: list[str], include_playlist_square: bool = True, include_toplist: bool = True, progress_callback: PlaylistProgressCallback | None = None, ) -> dict[str, int]: counts = {"playlist_square": 0, "toplist": 0} for source in sources: collector = self.collectors[source] self._emit_progress( progress_callback, "source_started", source=source, include_playlist_square=include_playlist_square, include_toplist=include_toplist, ) if include_playlist_square: try: counts["playlist_square"] += self._collect_playlist_square( source, collector, progress_callback=progress_callback, ) except Exception as exc: LOGGER.warning( "playlist_square collection failed for source=%s", source, exc_info=True, ) if include_toplist: toplist_candidates = collector.collect_toplist() self.store_playlist_candidates( platform=source, pool_kind="toplist", pool_name=SOURCE_POOL_NAMES[source]["toplist"], candidates=toplist_candidates, ) counts["toplist"] += len(toplist_candidates) self._emit_progress( progress_callback, "toplist_collected", source=source, count=len(toplist_candidates), ) self._emit_progress(progress_callback, "source_finished", source=source, counts=dict(counts)) return counts def _collect_playlist_square( self, source: str, collector: object, *, progress_callback: PlaylistProgressCallback | None = None, ) -> int: total = 0 page = 1 seen_remote_ids: set[str] = set() while True: candidates, should_continue = self._collect_playlist_square_page(source, collector, page) unique_candidates = [] for candidate in candidates: remote_id = str(getattr(candidate, "remote_id", "") or "").strip() dedupe_key = f"{source}:{remote_id}" if remote_id and dedupe_key in seen_remote_ids: continue if remote_id: seen_remote_ids.add(dedupe_key) unique_candidates.append(candidate) if unique_candidates: self.store_playlist_candidates( platform=source, pool_kind="playlist_square", pool_name=SOURCE_POOL_NAMES[source]["playlist_square"], candidates=unique_candidates, ) total += len(unique_candidates) self._emit_progress( progress_callback, "playlist_square_page", source=source, page=page, page_count=len(candidates), new_count=len(unique_candidates), total=total, duplicate_page=bool(candidates) and not bool(unique_candidates), ) if (candidates and not unique_candidates) or not should_continue: break page += 1 return total def _collect_playlist_square_page(self, source: str, collector: object, page: int) -> tuple[list, bool]: method = collector.collect_playlist_square kwargs = self._build_playlist_square_kwargs(method, source, page) if kwargs is None: candidates, has_more = self._normalize_playlist_square_result(method()) if self._is_mock_side_effect_iterator(method): return candidates, (bool(candidates) and (has_more is not False)) return candidates, bool(has_more) candidates, has_more = self._normalize_playlist_square_result(method(**kwargs)) if has_more is False: return candidates, False return candidates, bool(candidates) @staticmethod def _normalize_playlist_square_result(result) -> tuple[list, bool | None]: if isinstance(result, tuple) and len(result) == 2: candidates = list(result[0] or []) has_more = result[1] return candidates, None if has_more is None else bool(has_more) if isinstance(result, dict): raw_candidates = result.get("candidates") if raw_candidates is None: raw_candidates = result.get("items", []) candidates = list(raw_candidates or []) has_more = result.get("has_more") return candidates, None if has_more is None else bool(has_more) return list(result or []), None @staticmethod def _is_mock_side_effect_iterator(method) -> bool: side_effect = getattr(method, "side_effect", None) if side_effect is None: return False if isinstance(side_effect, BaseException): return False return not callable(side_effect) @staticmethod def _build_playlist_square_kwargs(method, source: str, page: int) -> dict[str, int] | None: try: signature = inspect.signature(method) except (TypeError, ValueError): return None parameters = signature.parameters kwargs: dict[str, int] = {} page_size = PLAYLIST_SQUARE_PAGE_SIZES.get(source, 30) if "page" in parameters: kwargs["page"] = max(page, 1) if "page_size" in parameters: kwargs["page_size"] = page_size if "offset" in parameters and "page" not in parameters: kwargs["offset"] = max(page - 1, 0) * page_size return kwargs or None @staticmethod def _emit_progress( callback: PlaylistProgressCallback | None, event_type: str, **payload: Any, ) -> None: if callback is None: return callback(event_type, payload) def import_manual_playlists(self, playlist_file: str | Path, candidates: list) -> list[int]: playlist_ids: list[int] = [] pool_ids_by_platform: dict[str, int] = {} for candidate in candidates: pool_id = pool_ids_by_platform.get(candidate.platform) if pool_id is None: pool_id = self.repository.get_or_create_manual_file_pool(playlist_file, candidate.platform) pool_ids_by_platform[candidate.platform] = pool_id playlist_id = self.repository.upsert_playlist(candidate) self.repository.link_pool_playlist(pool_id, playlist_id) playlist_ids.append(playlist_id) return playlist_ids def store_playlist_songs(self, playlist_id: int, source_pool_id: int, song_infos: list[object]) -> int: pool_row = self.repository.get_playlist_pool(source_pool_id) if not pool_row: raise RuntimeError(f"Unknown playlist pool: {source_pool_id}") artist_pool_id = self.repository.ensure_derived_artist_pool( platform=pool_row["platform"], source_pool_id=source_pool_id, source_pool_name=pool_row["name"], ) for position, song_info in enumerate(song_infos, start=1): song = CatalogSong.from_song_info(song_info) song_id = self.repository.upsert_song(song) self.repository.link_playlist_song(playlist_id, song_id, position) for artist_name in extract_artist_names(song.metadata.get("raw_data"), song.singers): artist_id = self.repository.upsert_artist(song.platform, artist_name) self.repository.link_pool_artist(artist_pool_id, artist_id) self.repository.link_artist_song(artist_id, song_id) return artist_pool_id def sync_playlist_catalog(self, sources: list[str] | None = None, limit: int | None = None) -> int: processed = 0 for playlist_row in self.repository.list_playlists(sources=sources, limit=limit): processed += self.sync_playlist_row(playlist_row) return processed def sync_specific_playlists(self, playlist_ids: list[int]) -> int: processed = 0 for playlist_row in self.repository.list_playlists_by_ids(playlist_ids): processed += self.sync_playlist_row(playlist_row) return processed def sync_playlist_row(self, playlist_row) -> int: song_infos = self.resolve_playlist_song_infos(playlist_row) pool_ids = self.repository.get_pool_ids_for_playlist(int(playlist_row["id"])) for pool_id in pool_ids: self.store_playlist_songs(int(playlist_row["id"]), pool_id, song_infos) self._backfill_playlist_play_count(playlist_row) return len(song_infos) def _backfill_playlist_play_count(self, playlist_row) -> None: playlist_id = int(playlist_row["id"]) play_count = self.resolve_playlist_play_count(playlist_row) if play_count is None: return self.repository.update_playlist_play_count(playlist_id, play_count) def _resolve_playlists_root(self) -> Path | None: if self.playlists_root is not None: self.playlists_root.mkdir(parents=True, exist_ok=True) return self.playlists_root library_root = self.repository.get_default_local_library_root() if library_root is None: return None playlists_root = library_root.parent / "playlists" playlists_root.mkdir(parents=True, exist_ok=True) return playlists_root def _playlist_export_payload(self, playlist_id: int) -> tuple[dict[str, Any], list[dict[str, Any]]] | None: playlist_rows = self.repository.list_playlists_by_ids([int(playlist_id)]) if not playlist_rows: return None playlist_row = dict(playlist_rows[0]) payload = { "id": int(playlist_row["id"]), "platform": str(playlist_row["platform"] or ""), "remote_playlist_id": str(playlist_row["remote_playlist_id"] or ""), "name": str(playlist_row["name"] or ""), "play_count": self._coerce_int(playlist_row["play_count"]), "cover_url": str(playlist_row.get("cover_url") or "").strip() or None, } songs = self.repository.list_playlist_song_details(int(playlist_id), limit=5000) return payload, songs def _write_playlist_artifacts(self, playlist_row) -> Path | None: playlists_root = self._resolve_playlists_root() if playlists_root is None: return None playlist_id = int(playlist_row["id"]) export_payload = self._playlist_export_payload(playlist_id) if export_payload is None: return None playlist_payload, song_items = export_payload try: return write_playlist_artifacts( playlist=playlist_payload, songs=song_items, playlists_root=playlists_root, ) except Exception: LOGGER.warning( "Failed to write playlist artifacts: playlist_id=%s", playlist_id, exc_info=True, ) return None def ensure_playlist_artifacts_for_playlist(self, playlist_id: int) -> Path | None: playlist_rows = self.repository.list_playlists_by_ids([int(playlist_id)]) if not playlist_rows: return None return self._write_playlist_artifacts(playlist_rows[0]) def resolve_playlist_play_count(self, playlist_row) -> int | None: platform = str(playlist_row["platform"] or "").strip() parse_strategy = str(playlist_row["parse_strategy"] or "").strip() remote_id = str(playlist_row["remote_playlist_id"] or "").strip() fallback_value = self._coerce_int(playlist_row["play_count"]) if parse_strategy != "playlist_url" or platform not in {"netease", "qq", "kuwo"} or not remote_id: return fallback_value try: if platform == "netease": client = self.get_client("netease") response = client.post( "https://music.163.com/api/v6/playlist/detail", data={"id": remote_id}, timeout=(10, 30), ) response.raise_for_status() payload = response.json() or {} playlist_payload = payload.get("playlist") or {} return self._coerce_int(playlist_payload.get("playCount")) or fallback_value if platform == "qq": client = self.get_client("qq") response = client.get( "https://c.y.qq.com/qzone/fcg-bin/fcg_ucc_getcdinfo_byids_cp.fcg", headers={"Referer": f"https://y.qq.com/n/ryqq/playlist/{remote_id}"}, params={ "disstid": str(remote_id), "type": "1", "json": "1", "utf8": "1", "onlysong": "0", "format": "json", }, timeout=(10, 30), ) response.raise_for_status() payload = response.json() or {} playlist_payload = ((payload.get("cdlist") or [{}])[0] or {}) if isinstance(payload, dict) else {} return self._coerce_int(playlist_payload.get("visitnum")) or fallback_value client = self.get_client("kuwo") response = client.get( f"https://m.kuwo.cn/newh5app/wapi/api/www/playlist/playListInfo?pid={remote_id}&pn=1&rn=100", timeout=(10, 30), ) response.raise_for_status() payload = response.json() or {} data_payload = payload.get("data") or {} return self._coerce_int(data_payload.get("listencnt")) or fallback_value except Exception: LOGGER.warning( "Failed to resolve playlist play_count during sync: platform=%s remote_id=%s", platform, remote_id, exc_info=True, ) return fallback_value @staticmethod def _coerce_int(value: object) -> int | None: if value in (None, "") or isinstance(value, bool): return None if isinstance(value, (int, float)): return int(value) text = str(value).strip().replace(",", "") if not text: return None try: return int(float(text)) except ValueError: return None def resolve_playlist_song_infos(self, playlist_row) -> list[object]: strategy = playlist_row["parse_strategy"] if strategy == "playlist_url": if playlist_row["platform"] == "netease": return build_netease_playlist_song_infos(self.get_client("netease"), playlist_row["url"]) if playlist_row["platform"] == "qq": return build_qq_playlist_song_infos(self.get_client("qq"), playlist_row["url"]) if playlist_row["platform"] == "kuwo": return build_kuwo_playlist_song_infos(self.get_client("kuwo"), playlist_row["url"]) client = self.get_client(playlist_row["platform"]) return client.parseplaylist(playlist_row["url"]) if strategy == "netease_toplist": return build_netease_playlist_song_infos(self.get_client("netease"), playlist_row["url"]) if strategy == "qq_toplist": return self._resolve_qq_toplist(playlist_row) if strategy == "kuwo_toplist": return self._resolve_kuwo_toplist(playlist_row) raise ValueError(f"Unsupported parse strategy: {strategy}") def _resolve_qq_toplist(self, playlist_row) -> list[object]: remote_id = str(playlist_row["remote_playlist_id"] or "").strip() with warnings.catch_warnings(): warnings.simplefilter("ignore", InsecureRequestWarning) response = requests.get( "https://c.y.qq.com/v8/fcg-bin/fcg_v8_toplist_cp.fcg", params={ "topid": remote_id, "tpl": "3", "page": "detail", "type": "top", "format": "json", }, headers={ "User-Agent": "Mozilla/5.0", "Referer": "https://y.qq.com/", "Origin": "https://y.qq.com/", }, timeout=15, verify=False, ) response.raise_for_status() raw_tracks = [] for item in response.json().get("songlist", []) or []: if not isinstance(item, dict): continue track_data = item.get("data") if isinstance(track_data, dict) and track_data: raw_tracks.append(track_data) if not raw_tracks: raw_tracks = self._resolve_qq_toplist_fallback_tracks(remote_id) client = self.get_client("qq") return build_qq_raw_track_song_infos(client, raw_tracks, playlist_name=playlist_row["name"]) def _resolve_qq_toplist_fallback_tracks(self, remote_id: str) -> list[dict]: if not remote_id: return [] request_payload = { "comm": {"ct": 24, "cv": 0}, "toplist": { "module": "musicToplist.ToplistInfoServer", "method": "GetDetail", "param": { "topid": int(remote_id) if remote_id.isdigit() else remote_id, "offset": 0, "num": 100, "period": "", }, }, } with warnings.catch_warnings(): warnings.simplefilter("ignore", InsecureRequestWarning) response = requests.post( "https://u.y.qq.com/cgi-bin/musicu.fcg", json=request_payload, headers={ "User-Agent": "Mozilla/5.0", "Referer": "https://y.qq.com/", "Origin": "https://y.qq.com/", "Content-Type": "application/json", }, timeout=15, verify=False, ) response.raise_for_status() payload_raw = response.json() payload = payload_raw if isinstance(payload_raw, dict) else {} toplist_data = payload.get("toplist") or {} toplist_inner = toplist_data.get("data") or {} toplist_detail = toplist_inner.get("data") or {} raw_items = toplist_detail.get("song") or toplist_detail.get("songlist") or [] if not isinstance(raw_items, list): return [] fallback_tracks: list[dict] = [] for item in raw_items: if not isinstance(item, dict): continue title = str(item.get("title") or item.get("name") or "").strip() singer_text = str(item.get("singerName") or item.get("singers") or "").strip() album_mid = str(item.get("albumMid") or item.get("albummid") or "").strip() if not title: continue track_id = str( item.get("songMid") or item.get("songmid") or item.get("mid") or item.get("songId") or item.get("songid") or "" ).strip() if not track_id or track_id == "0": hash_input = f"{remote_id}|{title}|{singer_text}|{album_mid}" track_id = f"qqtop_{remote_id}_{hashlib.md5(hash_input.encode('utf-8')).hexdigest()[:16]}" singer_items = [{"name": part.strip()} for part in singer_text.split("/") if part.strip()] fallback_tracks.append( { "songmid": track_id, "title": title, "singer": singer_items, "album": {"mid": album_mid, "title": str(item.get("albumName") or "").strip()}, "albummid": album_mid, "interval": item.get("interval", 0), "qq_toplist_fallback": True, "qq_toplist_remote_id": remote_id, } ) return fallback_tracks def _resolve_kuwo_toplist(self, playlist_row) -> list[object]: playlist_name = str(playlist_row["name"] or "").strip() remote_id = str(playlist_row["remote_playlist_id"] or "").strip() if not playlist_name or playlist_name == remote_id: playlist_name = self._resolve_kuwo_toplist_name(remote_id) or remote_id with warnings.catch_warnings(): warnings.simplefilter("ignore", InsecureRequestWarning) response = requests.get( "https://kw-api.cenguigui.cn", params={"name": playlist_name, "type": "rank", "page": "1", "limit": "100"}, timeout=15, verify=False, ) response.raise_for_status() raw_tracks = [] for item in response.json().get("data", {}).get("musicList", []) or []: rid = str(item.get("rid", "")).strip() if not rid: continue raw_tracks.append( { "musicrid": f"MUSIC_{rid}", "rid": rid, "name": item.get("name"), "artist": item.get("artist"), "album": item.get("album"), "albumpic": item.get("pic"), } ) client = self.get_client("kuwo") return build_kuwo_raw_track_song_infos(client, raw_tracks, playlist_name=playlist_name) def _resolve_kuwo_toplist_name(self, remote_id: str) -> str | None: if not remote_id: return None with warnings.catch_warnings(): warnings.simplefilter("ignore", InsecureRequestWarning) response = requests.get( "https://www.kuwo.cn/rankList", params={"bangId": remote_id}, timeout=15, verify=False, ) response.raise_for_status() for candidate in parse_kuwo_toplist_html(response.text): if candidate.remote_id == remote_id: return candidate.name return None @staticmethod def _resolve_raw_tracks(client, raw_tracks: list[dict]) -> list[object]: song_infos = [] for track in raw_tracks: song_info_flac = client._parsewiththirdpartapis(track, {}) try: song_info = client._parsewithofficialapiv1( track, song_info_flac=song_info_flac, lossless_quality_is_sufficient=not bool(client.default_cookies), request_overrides={}, ) except Exception: song_info = song_info_flac if not song_info.with_valid_download_url: song_info = song_info_flac if song_info.with_valid_download_url: song_infos.append(song_info) return song_infos