Files

644 lines
26 KiB
Python

from __future__ import annotations
import hashlib
import inspect
import logging
import warnings
from pathlib import Path
from typing import Any, Callable, Dict
import requests
from urllib3.exceptions import InsecureRequestWarning
from .collectors import KuwoCollector, NeteaseCollector, QQCollector, parse_kuwo_toplist_html
from .deferred import (
build_kuwo_playlist_song_infos,
build_kuwo_raw_track_song_infos,
build_netease_playlist_song_infos,
build_qq_playlist_song_infos,
build_qq_raw_track_song_infos,
)
from .models import CatalogSong, extract_artist_names
from .playlist_artifacts import write_playlist_artifacts
from .repository import CatalogRepository
SOURCE_CLIENT_NAMES = {
"netease": "NeteaseMusicClient",
"qq": "QQMusicClient",
"kuwo": "KuwoMusicClient",
}
SOURCE_POOL_NAMES = {
"netease": {"playlist_square": "网易云歌单广场", "toplist": "网易云排行榜"},
"qq": {"playlist_square": "QQ 音乐歌单广场", "toplist": "QQ 音乐排行榜"},
"kuwo": {"playlist_square": "酷我歌单广场", "toplist": "酷我排行榜"},
}
LOGGER = logging.getLogger(__name__)
PLAYLIST_SQUARE_PAGE_SIZES = {
"netease": 35,
"qq": 30,
"kuwo": 30,
}
PlaylistProgressCallback = Callable[[str, Dict[str, Any]], None]
class CatalogSyncService:
def __init__(
self,
repository: CatalogRepository,
collectors: dict[str, object] | None = None,
work_dir: str = "musicdl_outputs/catalogsync",
playlists_root: str | Path | None = None,
):
self.repository = repository
self.collectors = collectors or {
"netease": NeteaseCollector(),
"qq": QQCollector(),
"kuwo": KuwoCollector(),
}
self.work_dir = work_dir
self.playlists_root = Path(playlists_root).resolve() if playlists_root else None
self._clients: dict[str, object] = {}
def get_client(self, platform: str):
if platform not in self._clients:
from musicdl.modules import BuildMusicClient
self._clients[platform] = BuildMusicClient(
{
"type": SOURCE_CLIENT_NAMES[platform],
"disable_print": True,
"maintain_session": False,
"work_dir": self.work_dir,
"search_size_per_source": 1,
"search_size_per_page": 1,
"strict_limit_search_size_per_page": True,
}
)
return self._clients[platform]
def store_playlist_candidates(
self,
platform: str,
pool_kind: str,
pool_name: str,
candidates: list,
pool_external_id: str | None = None,
) -> int:
pool_id = self.repository.upsert_playlist_pool(
platform=platform,
pool_kind=pool_kind,
external_id=pool_external_id or pool_kind,
name=pool_name,
)
for candidate in candidates:
playlist_id = self.repository.upsert_playlist(candidate)
self.repository.link_pool_playlist(pool_id, playlist_id)
return pool_id
def collect_playlists(
self,
sources: list[str],
include_playlist_square: bool = True,
include_toplist: bool = True,
progress_callback: PlaylistProgressCallback | None = None,
) -> dict[str, int]:
counts = {"playlist_square": 0, "toplist": 0}
for source in sources:
collector = self.collectors[source]
self._emit_progress(
progress_callback,
"source_started",
source=source,
include_playlist_square=include_playlist_square,
include_toplist=include_toplist,
)
if include_playlist_square:
try:
counts["playlist_square"] += self._collect_playlist_square(
source,
collector,
progress_callback=progress_callback,
)
except Exception as exc:
LOGGER.warning(
"playlist_square collection failed for source=%s",
source,
exc_info=True,
)
if include_toplist:
toplist_candidates = collector.collect_toplist()
self.store_playlist_candidates(
platform=source,
pool_kind="toplist",
pool_name=SOURCE_POOL_NAMES[source]["toplist"],
candidates=toplist_candidates,
)
counts["toplist"] += len(toplist_candidates)
self._emit_progress(
progress_callback,
"toplist_collected",
source=source,
count=len(toplist_candidates),
)
self._emit_progress(progress_callback, "source_finished", source=source, counts=dict(counts))
return counts
def _collect_playlist_square(
self,
source: str,
collector: object,
*,
progress_callback: PlaylistProgressCallback | None = None,
) -> int:
total = 0
page = 1
seen_remote_ids: set[str] = set()
while True:
candidates, should_continue = self._collect_playlist_square_page(source, collector, page)
unique_candidates = []
for candidate in candidates:
remote_id = str(getattr(candidate, "remote_id", "") or "").strip()
dedupe_key = f"{source}:{remote_id}"
if remote_id and dedupe_key in seen_remote_ids:
continue
if remote_id:
seen_remote_ids.add(dedupe_key)
unique_candidates.append(candidate)
if unique_candidates:
self.store_playlist_candidates(
platform=source,
pool_kind="playlist_square",
pool_name=SOURCE_POOL_NAMES[source]["playlist_square"],
candidates=unique_candidates,
)
total += len(unique_candidates)
self._emit_progress(
progress_callback,
"playlist_square_page",
source=source,
page=page,
page_count=len(candidates),
new_count=len(unique_candidates),
total=total,
duplicate_page=bool(candidates) and not bool(unique_candidates),
)
if (candidates and not unique_candidates) or not should_continue:
break
page += 1
return total
def _collect_playlist_square_page(self, source: str, collector: object, page: int) -> tuple[list, bool]:
method = collector.collect_playlist_square
kwargs = self._build_playlist_square_kwargs(method, source, page)
if kwargs is None:
candidates, has_more = self._normalize_playlist_square_result(method())
if self._is_mock_side_effect_iterator(method):
return candidates, (bool(candidates) and (has_more is not False))
return candidates, bool(has_more)
candidates, has_more = self._normalize_playlist_square_result(method(**kwargs))
if has_more is False:
return candidates, False
return candidates, bool(candidates)
@staticmethod
def _normalize_playlist_square_result(result) -> tuple[list, bool | None]:
if isinstance(result, tuple) and len(result) == 2:
candidates = list(result[0] or [])
has_more = result[1]
return candidates, None if has_more is None else bool(has_more)
if isinstance(result, dict):
raw_candidates = result.get("candidates")
if raw_candidates is None:
raw_candidates = result.get("items", [])
candidates = list(raw_candidates or [])
has_more = result.get("has_more")
return candidates, None if has_more is None else bool(has_more)
return list(result or []), None
@staticmethod
def _is_mock_side_effect_iterator(method) -> bool:
side_effect = getattr(method, "side_effect", None)
if side_effect is None:
return False
if isinstance(side_effect, BaseException):
return False
return not callable(side_effect)
@staticmethod
def _build_playlist_square_kwargs(method, source: str, page: int) -> dict[str, int] | None:
try:
signature = inspect.signature(method)
except (TypeError, ValueError):
return None
parameters = signature.parameters
kwargs: dict[str, int] = {}
page_size = PLAYLIST_SQUARE_PAGE_SIZES.get(source, 30)
if "page" in parameters:
kwargs["page"] = max(page, 1)
if "page_size" in parameters:
kwargs["page_size"] = page_size
if "offset" in parameters and "page" not in parameters:
kwargs["offset"] = max(page - 1, 0) * page_size
return kwargs or None
@staticmethod
def _emit_progress(
callback: PlaylistProgressCallback | None,
event_type: str,
**payload: Any,
) -> None:
if callback is None:
return
callback(event_type, payload)
def import_manual_playlists(self, playlist_file: str | Path, candidates: list) -> list[int]:
playlist_ids: list[int] = []
pool_ids_by_platform: dict[str, int] = {}
for candidate in candidates:
pool_id = pool_ids_by_platform.get(candidate.platform)
if pool_id is None:
pool_id = self.repository.get_or_create_manual_file_pool(playlist_file, candidate.platform)
pool_ids_by_platform[candidate.platform] = pool_id
playlist_id = self.repository.upsert_playlist(candidate)
self.repository.link_pool_playlist(pool_id, playlist_id)
playlist_ids.append(playlist_id)
return playlist_ids
def store_playlist_songs(self, playlist_id: int, source_pool_id: int, song_infos: list[object]) -> int:
pool_row = self.repository.get_playlist_pool(source_pool_id)
if not pool_row:
raise RuntimeError(f"Unknown playlist pool: {source_pool_id}")
artist_pool_id = self.repository.ensure_derived_artist_pool(
platform=pool_row["platform"],
source_pool_id=source_pool_id,
source_pool_name=pool_row["name"],
)
for position, song_info in enumerate(song_infos, start=1):
song = CatalogSong.from_song_info(song_info)
song_id = self.repository.upsert_song(song)
self.repository.link_playlist_song(playlist_id, song_id, position)
for artist_name in extract_artist_names(song.metadata.get("raw_data"), song.singers):
artist_id = self.repository.upsert_artist(song.platform, artist_name)
self.repository.link_pool_artist(artist_pool_id, artist_id)
self.repository.link_artist_song(artist_id, song_id)
return artist_pool_id
def sync_playlist_catalog(self, sources: list[str] | None = None, limit: int | None = None) -> int:
processed = 0
for playlist_row in self.repository.list_playlists(sources=sources, limit=limit):
processed += self.sync_playlist_row(playlist_row)
return processed
def sync_specific_playlists(self, playlist_ids: list[int]) -> int:
processed = 0
for playlist_row in self.repository.list_playlists_by_ids(playlist_ids):
processed += self.sync_playlist_row(playlist_row)
return processed
def sync_playlist_row(self, playlist_row) -> int:
song_infos = self.resolve_playlist_song_infos(playlist_row)
pool_ids = self.repository.get_pool_ids_for_playlist(int(playlist_row["id"]))
for pool_id in pool_ids:
self.store_playlist_songs(int(playlist_row["id"]), pool_id, song_infos)
self._backfill_playlist_play_count(playlist_row)
return len(song_infos)
def _backfill_playlist_play_count(self, playlist_row) -> None:
playlist_id = int(playlist_row["id"])
play_count = self.resolve_playlist_play_count(playlist_row)
if play_count is None:
return
self.repository.update_playlist_play_count(playlist_id, play_count)
def _resolve_playlists_root(self) -> Path | None:
if self.playlists_root is not None:
self.playlists_root.mkdir(parents=True, exist_ok=True)
return self.playlists_root
library_root = self.repository.get_default_local_library_root()
if library_root is None:
return None
playlists_root = library_root.parent / "playlists"
playlists_root.mkdir(parents=True, exist_ok=True)
return playlists_root
def _playlist_export_payload(self, playlist_id: int) -> tuple[dict[str, Any], list[dict[str, Any]]] | None:
playlist_rows = self.repository.list_playlists_by_ids([int(playlist_id)])
if not playlist_rows:
return None
playlist_row = dict(playlist_rows[0])
payload = {
"id": int(playlist_row["id"]),
"platform": str(playlist_row["platform"] or ""),
"remote_playlist_id": str(playlist_row["remote_playlist_id"] or ""),
"name": str(playlist_row["name"] or ""),
"play_count": self._coerce_int(playlist_row["play_count"]),
"cover_url": str(playlist_row.get("cover_url") or "").strip() or None,
}
songs = self.repository.list_playlist_song_details(int(playlist_id), limit=5000)
return payload, songs
def _write_playlist_artifacts(self, playlist_row) -> Path | None:
playlists_root = self._resolve_playlists_root()
if playlists_root is None:
return None
playlist_id = int(playlist_row["id"])
export_payload = self._playlist_export_payload(playlist_id)
if export_payload is None:
return None
playlist_payload, song_items = export_payload
try:
return write_playlist_artifacts(
playlist=playlist_payload,
songs=song_items,
playlists_root=playlists_root,
)
except Exception:
LOGGER.warning(
"Failed to write playlist artifacts: playlist_id=%s",
playlist_id,
exc_info=True,
)
return None
def ensure_playlist_artifacts_for_playlist(self, playlist_id: int) -> Path | None:
playlist_rows = self.repository.list_playlists_by_ids([int(playlist_id)])
if not playlist_rows:
return None
return self._write_playlist_artifacts(playlist_rows[0])
def resolve_playlist_play_count(self, playlist_row) -> int | None:
platform = str(playlist_row["platform"] or "").strip()
parse_strategy = str(playlist_row["parse_strategy"] or "").strip()
remote_id = str(playlist_row["remote_playlist_id"] or "").strip()
fallback_value = self._coerce_int(playlist_row["play_count"])
if parse_strategy != "playlist_url" or platform not in {"netease", "qq", "kuwo"} or not remote_id:
return fallback_value
try:
if platform == "netease":
client = self.get_client("netease")
response = client.post(
"https://music.163.com/api/v6/playlist/detail",
data={"id": remote_id},
timeout=(10, 30),
)
response.raise_for_status()
payload = response.json() or {}
playlist_payload = payload.get("playlist") or {}
return self._coerce_int(playlist_payload.get("playCount")) or fallback_value
if platform == "qq":
client = self.get_client("qq")
response = client.get(
"https://c.y.qq.com/qzone/fcg-bin/fcg_ucc_getcdinfo_byids_cp.fcg",
headers={"Referer": f"https://y.qq.com/n/ryqq/playlist/{remote_id}"},
params={
"disstid": str(remote_id),
"type": "1",
"json": "1",
"utf8": "1",
"onlysong": "0",
"format": "json",
},
timeout=(10, 30),
)
response.raise_for_status()
payload = response.json() or {}
playlist_payload = ((payload.get("cdlist") or [{}])[0] or {}) if isinstance(payload, dict) else {}
return self._coerce_int(playlist_payload.get("visitnum")) or fallback_value
client = self.get_client("kuwo")
response = client.get(
f"https://m.kuwo.cn/newh5app/wapi/api/www/playlist/playListInfo?pid={remote_id}&pn=1&rn=100",
timeout=(10, 30),
)
response.raise_for_status()
payload = response.json() or {}
data_payload = payload.get("data") or {}
return self._coerce_int(data_payload.get("listencnt")) or fallback_value
except Exception:
LOGGER.warning(
"Failed to resolve playlist play_count during sync: platform=%s remote_id=%s",
platform,
remote_id,
exc_info=True,
)
return fallback_value
@staticmethod
def _coerce_int(value: object) -> int | None:
if value in (None, "") or isinstance(value, bool):
return None
if isinstance(value, (int, float)):
return int(value)
text = str(value).strip().replace(",", "")
if not text:
return None
try:
return int(float(text))
except ValueError:
return None
def resolve_playlist_song_infos(self, playlist_row) -> list[object]:
strategy = playlist_row["parse_strategy"]
if strategy == "playlist_url":
if playlist_row["platform"] == "netease":
return build_netease_playlist_song_infos(self.get_client("netease"), playlist_row["url"])
if playlist_row["platform"] == "qq":
return build_qq_playlist_song_infos(self.get_client("qq"), playlist_row["url"])
if playlist_row["platform"] == "kuwo":
return build_kuwo_playlist_song_infos(self.get_client("kuwo"), playlist_row["url"])
client = self.get_client(playlist_row["platform"])
return client.parseplaylist(playlist_row["url"])
if strategy == "netease_toplist":
return build_netease_playlist_song_infos(self.get_client("netease"), playlist_row["url"])
if strategy == "qq_toplist":
return self._resolve_qq_toplist(playlist_row)
if strategy == "kuwo_toplist":
return self._resolve_kuwo_toplist(playlist_row)
raise ValueError(f"Unsupported parse strategy: {strategy}")
def _resolve_qq_toplist(self, playlist_row) -> list[object]:
remote_id = str(playlist_row["remote_playlist_id"] or "").strip()
with warnings.catch_warnings():
warnings.simplefilter("ignore", InsecureRequestWarning)
response = requests.get(
"https://c.y.qq.com/v8/fcg-bin/fcg_v8_toplist_cp.fcg",
params={
"topid": remote_id,
"tpl": "3",
"page": "detail",
"type": "top",
"format": "json",
},
headers={
"User-Agent": "Mozilla/5.0",
"Referer": "https://y.qq.com/",
"Origin": "https://y.qq.com/",
},
timeout=15,
verify=False,
)
response.raise_for_status()
raw_tracks = []
for item in response.json().get("songlist", []) or []:
if not isinstance(item, dict):
continue
track_data = item.get("data")
if isinstance(track_data, dict) and track_data:
raw_tracks.append(track_data)
if not raw_tracks:
raw_tracks = self._resolve_qq_toplist_fallback_tracks(remote_id)
client = self.get_client("qq")
return build_qq_raw_track_song_infos(client, raw_tracks, playlist_name=playlist_row["name"])
def _resolve_qq_toplist_fallback_tracks(self, remote_id: str) -> list[dict]:
if not remote_id:
return []
request_payload = {
"comm": {"ct": 24, "cv": 0},
"toplist": {
"module": "musicToplist.ToplistInfoServer",
"method": "GetDetail",
"param": {
"topid": int(remote_id) if remote_id.isdigit() else remote_id,
"offset": 0,
"num": 100,
"period": "",
},
},
}
with warnings.catch_warnings():
warnings.simplefilter("ignore", InsecureRequestWarning)
response = requests.post(
"https://u.y.qq.com/cgi-bin/musicu.fcg",
json=request_payload,
headers={
"User-Agent": "Mozilla/5.0",
"Referer": "https://y.qq.com/",
"Origin": "https://y.qq.com/",
"Content-Type": "application/json",
},
timeout=15,
verify=False,
)
response.raise_for_status()
payload_raw = response.json()
payload = payload_raw if isinstance(payload_raw, dict) else {}
toplist_data = payload.get("toplist") or {}
toplist_inner = toplist_data.get("data") or {}
toplist_detail = toplist_inner.get("data") or {}
raw_items = toplist_detail.get("song") or toplist_detail.get("songlist") or []
if not isinstance(raw_items, list):
return []
fallback_tracks: list[dict] = []
for item in raw_items:
if not isinstance(item, dict):
continue
title = str(item.get("title") or item.get("name") or "").strip()
singer_text = str(item.get("singerName") or item.get("singers") or "").strip()
album_mid = str(item.get("albumMid") or item.get("albummid") or "").strip()
if not title:
continue
track_id = str(
item.get("songMid")
or item.get("songmid")
or item.get("mid")
or item.get("songId")
or item.get("songid")
or ""
).strip()
if not track_id or track_id == "0":
hash_input = f"{remote_id}|{title}|{singer_text}|{album_mid}"
track_id = f"qqtop_{remote_id}_{hashlib.md5(hash_input.encode('utf-8')).hexdigest()[:16]}"
singer_items = [{"name": part.strip()} for part in singer_text.split("/") if part.strip()]
fallback_tracks.append(
{
"songmid": track_id,
"title": title,
"singer": singer_items,
"album": {"mid": album_mid, "title": str(item.get("albumName") or "").strip()},
"albummid": album_mid,
"interval": item.get("interval", 0),
"qq_toplist_fallback": True,
"qq_toplist_remote_id": remote_id,
}
)
return fallback_tracks
def _resolve_kuwo_toplist(self, playlist_row) -> list[object]:
playlist_name = str(playlist_row["name"] or "").strip()
remote_id = str(playlist_row["remote_playlist_id"] or "").strip()
if not playlist_name or playlist_name == remote_id:
playlist_name = self._resolve_kuwo_toplist_name(remote_id) or remote_id
with warnings.catch_warnings():
warnings.simplefilter("ignore", InsecureRequestWarning)
response = requests.get(
"https://kw-api.cenguigui.cn",
params={"name": playlist_name, "type": "rank", "page": "1", "limit": "100"},
timeout=15,
verify=False,
)
response.raise_for_status()
raw_tracks = []
for item in response.json().get("data", {}).get("musicList", []) or []:
rid = str(item.get("rid", "")).strip()
if not rid:
continue
raw_tracks.append(
{
"musicrid": f"MUSIC_{rid}",
"rid": rid,
"name": item.get("name"),
"artist": item.get("artist"),
"album": item.get("album"),
"albumpic": item.get("pic"),
}
)
client = self.get_client("kuwo")
return build_kuwo_raw_track_song_infos(client, raw_tracks, playlist_name=playlist_name)
def _resolve_kuwo_toplist_name(self, remote_id: str) -> str | None:
if not remote_id:
return None
with warnings.catch_warnings():
warnings.simplefilter("ignore", InsecureRequestWarning)
response = requests.get(
"https://www.kuwo.cn/rankList",
params={"bangId": remote_id},
timeout=15,
verify=False,
)
response.raise_for_status()
for candidate in parse_kuwo_toplist_html(response.text):
if candidate.remote_id == remote_id:
return candidate.name
return None
@staticmethod
def _resolve_raw_tracks(client, raw_tracks: list[dict]) -> list[object]:
song_infos = []
for track in raw_tracks:
song_info_flac = client._parsewiththirdpartapis(track, {})
try:
song_info = client._parsewithofficialapiv1(
track,
song_info_flac=song_info_flac,
lossless_quality_is_sufficient=not bool(client.default_cookies),
request_overrides={},
)
except Exception:
song_info = song_info_flac
if not song_info.with_valid_download_url:
song_info = song_info_flac
if song_info.with_valid_download_url:
song_infos.append(song_info)
return song_infos