from __future__ import annotations import copy import json from urllib.parse import parse_qs, urlparse from musicdl.modules.utils import SongInfo, safeextractfromdict, seconds2hms def _parse_duration_seconds(value) -> int: try: return max(int(float(value or 0)), 0) except Exception: return 0 def _has_positive_value(value) -> bool: try: return float(value or 0) > 0 except Exception: return False def _normalize_text(value, default: str = "NULL") -> str: text = str(value or "").strip() if not text: return default return text def _join_artist_names(value) -> str: if isinstance(value, (list, tuple)): names = [] for item in value: if isinstance(item, dict): name = str(item.get("name", "")).strip() else: name = str(item or "").strip() if name and name not in names: names.append(name) return ", ".join(names) if names else "NULL" text = str(value or "").replace("/", ", ").strip() return text or "NULL" def _normalize_audio_ext(value: str | None) -> str: return str(value or "").strip().lower().lstrip(".") def _remove_suffix(value: str, suffix: str) -> str: text = str(value or "") token = str(suffix or "") if token and text.endswith(token): return text[: -len(token)] return text def _remove_prefix(value: str, prefix: str) -> str: text = str(value or "") token = str(prefix or "") if token and text.startswith(token): return text[len(token) :] return text def guess_rough_audio_format(source: str, search_result: dict) -> str: source_name = str(source or "") if source_name == "QQMusicClient": file_meta = safeextractfromdict(search_result, ["file"], {}) or {} if any(_has_positive_value(file_meta.get(key)) for key in ("size_hires", "size_try", "size_flac", "size_ape")): return "flac" if any(_has_positive_value(file_meta.get(key)) for key in ("size_320mp3", "size_mp3", "size_128mp3")): return "mp3" if any(_has_positive_value(search_result.get(key)) for key in ("sizeflac", "sizeape")): return "flac" if any(_has_positive_value(search_result.get(key)) for key in ("size320", "size128", "sizeogg")): return "mp3" return "" if source_name == "KuwoMusicClient": meta_text = str(search_result.get("MINFO") or search_result.get("formats") or "").lower() if any(token in meta_text for token in ("flac", "ape", "wav", "lossless", "hires")): return "flac" if any(token in meta_text for token in ("mp3", "320kmp3", "192kmp3", "128kmp3")): return "mp3" return "" if source_name == "NeteaseMusicClient": if _has_positive_value(safeextractfromdict(search_result, ["hr", "size"], 0)) or _has_positive_value( safeextractfromdict(search_result, ["sq", "size"], 0) ): return "flac" if any( _has_positive_value(safeextractfromdict(search_result, [quality_key, "size"], 0)) for quality_key in ("h", "m", "l") ): return "mp3" return "" return "" def build_deferred_song_info( source: str, raw_search_result: dict, identifier, song_name, singers, album: str | None = None, duration_s: int | float = 0, cover_url: str | None = None, ext: str | None = None, ) -> SongInfo: duration_seconds = _parse_duration_seconds(duration_s) return SongInfo( raw_data={"search": copy.deepcopy(raw_search_result or {}), "deferred_search": True}, source=str(source), song_name=_normalize_text(song_name), singers=_normalize_text(singers), album=_normalize_text(album), ext=_normalize_audio_ext(ext), file_size_bytes=None, file_size=None, identifier=str(identifier or "").strip(), duration_s=duration_seconds, duration=seconds2hms(duration_seconds) if duration_seconds > 0 else "-:-:-", lyric="NULL", cover_url=str(cover_url or "").strip() or None, download_url=None, download_url_status={}, ) def _apply_work_dir(client, playlist_name: str, song_infos: list[SongInfo]) -> list[SongInfo]: if not song_infos: return [] if hasattr(client, "_constructuniqueworkdir") and callable(client._constructuniqueworkdir): work_dir = client._constructuniqueworkdir(keyword=playlist_name) for song_info in song_infos: song_info.work_dir = work_dir if hasattr(client, "_removeduplicates") and callable(client._removeduplicates): return client._removeduplicates(song_infos=song_infos) return song_infos def _extract_playlist_id_from_url(playlist_url: str, query_keys: tuple[str, ...] = ("id", "pid", "bangId")) -> str: parsed = urlparse(str(playlist_url or "").strip()) query_candidates = [parsed.query] fragment = str(parsed.fragment or "").strip() if fragment: fragment_url = fragment if "://" in fragment else f"https://placeholder{fragment if fragment.startswith('/') else '/' + fragment}" query_candidates.append(urlparse(fragment_url).query) for query_text in query_candidates: parsed_query = parse_qs(query_text, keep_blank_values=True) for query_key in query_keys: candidate = str((parsed_query.get(query_key) or [""])[0]).strip() if candidate: return candidate for path_part in reversed([part for part in parsed.path.split("/") if part]): candidate = _remove_suffix(_remove_suffix(str(path_part), ".html"), ".htm").strip() if candidate: return candidate return "" def build_netease_playlist_song_infos(client, playlist_url: str, request_overrides: dict | None = None) -> list[SongInfo]: request_overrides = copy.deepcopy(request_overrides or {}) request_overrides.setdefault("timeout", (10, 30)) playlist_id = _extract_playlist_id_from_url(playlist_url, query_keys=("id",)) if not playlist_id: return [] response = client.post("https://music.163.com/api/v6/playlist/detail", data={"id": playlist_id}, **request_overrides) response.raise_for_status() playlist_result = response.json() playlist_info = safeextractfromdict(playlist_result, ["playlist"], {}) or {} track_refs = safeextractfromdict(playlist_info, ["trackIds"], []) or [] playlist_name = _normalize_text(playlist_info.get("name"), f"playlist-{playlist_id}") if not track_refs: return [] track_details_by_id: dict[str, dict] = {} for track_info in safeextractfromdict(playlist_info, ["tracks"], []) or []: track_id = str(track_info.get("id") or "").strip() if track_id: track_details_by_id[track_id] = track_info missing_track_ids = [ str(track_ref.get("id") or "").strip() for track_ref in track_refs if str(track_ref.get("id") or "").strip() and str(track_ref.get("id") or "").strip() not in track_details_by_id ] for offset in range(0, len(missing_track_ids), 200): batch_track_ids = [track_id for track_id in missing_track_ids[offset : offset + 200] if track_id.isdigit()] if not batch_track_ids: continue payload = json.dumps([{"id": int(track_id), "v": 0} for track_id in batch_track_ids], ensure_ascii=False, separators=(",", ":")) detail_response = client.post( "https://interface3.music.163.com/api/v3/song/detail", data={"c": payload}, **request_overrides, ) detail_response.raise_for_status() for track_info in detail_response.json().get("songs", []) or []: track_id = str(track_info.get("id") or "").strip() if track_id: track_details_by_id[track_id] = track_info song_infos: list[SongInfo] = [] for track_ref in track_refs: track_id = str(track_ref.get("id") or "").strip() track_info = track_details_by_id.get(track_id) if not track_id or not isinstance(track_info, dict): continue duration_value = 0 if str(track_info.get("dt", "")).strip(): try: duration_value = float(track_info.get("dt", 0) or 0) / 1000 except Exception: duration_value = 0 song_infos.append( build_deferred_song_info( source=client.source, raw_search_result=track_info, identifier=track_id, song_name=track_info.get("name"), singers=_join_artist_names(track_info.get("ar") or []), album=safeextractfromdict(track_info, ["al", "name"], None), duration_s=duration_value, cover_url=safeextractfromdict(track_info, ["al", "picUrl"], None), ext=guess_rough_audio_format(client.source, track_info), ) ) return _apply_work_dir(client, playlist_name, song_infos) def build_qq_raw_track_song_infos(client, raw_tracks: list[dict], playlist_name: str | None = None) -> list[SongInfo]: song_infos: list[SongInfo] = [] for track_info in raw_tracks or []: track_id = track_info.get("mid") or track_info.get("songmid") or track_info.get("songid") or track_info.get("id") if not track_id: continue cover_mid = safeextractfromdict(track_info, ["album", "mid"], "") or track_info.get("albummid") song_infos.append( build_deferred_song_info( source=client.source, raw_search_result=track_info, identifier=track_id, song_name=track_info.get("title") or track_info.get("songname") or track_info.get("name"), singers=_join_artist_names(track_info.get("singer") or []), album=safeextractfromdict(track_info, ["album", "title"], None) or track_info.get("albumname"), duration_s=track_info.get("interval", 0), cover_url=f"https://y.gtimg.cn/music/photo_new/T002R800x800M000{cover_mid}.jpg" if cover_mid else None, ext=guess_rough_audio_format(client.source, track_info), ) ) return _apply_work_dir(client, _normalize_text(playlist_name, "playlist"), song_infos) def build_qq_playlist_song_infos(client, playlist_url: str, request_overrides: dict | None = None) -> list[SongInfo]: request_overrides = copy.deepcopy(request_overrides or {}) request_overrides.setdefault("timeout", (10, 30)) playlist_id = _extract_playlist_id_from_url(playlist_url, query_keys=("id", "disstid")) if not playlist_id: return [] response = client.get( "https://c.y.qq.com/qzone/fcg-bin/fcg_ucc_getcdinfo_byids_cp.fcg", headers={"Referer": f"https://y.qq.com/n/ryqq/playlist/{playlist_id}"}, params={"disstid": str(playlist_id), "type": "1", "json": "1", "utf8": "1", "onlysong": "0", "format": "json"}, **request_overrides, ) response.raise_for_status() playlist_result = response.json() raw_tracks = ( safeextractfromdict(playlist_result, ["cdlist", 0, "songlist"], []) or safeextractfromdict(playlist_result, ["cdlist", 0, "list"], []) or safeextractfromdict(playlist_result, ["songlist"], []) or [] ) playlist_name = safeextractfromdict(playlist_result, ["cdlist", 0, "dissname"], None) or f"playlist-{playlist_id}" return build_qq_raw_track_song_infos(client, raw_tracks, playlist_name=playlist_name) def build_kuwo_raw_track_song_infos(client, raw_tracks: list[dict], playlist_name: str | None = None) -> list[SongInfo]: song_infos: list[SongInfo] = [] for track_info in raw_tracks or []: track_id = _remove_prefix(str(track_info.get("MUSICRID") or track_info.get("musicrid") or track_info.get("rid") or ""), "MUSIC_") if not track_id: continue duration_value = track_info.get("DURATION") or track_info.get("duration", 0) song_infos.append( build_deferred_song_info( source=client.source, raw_search_result=track_info, identifier=track_id, song_name=track_info.get("SONGNAME") or track_info.get("name"), singers=track_info.get("ARTIST") or track_info.get("artist"), album=track_info.get("ALBUM") or track_info.get("album"), duration_s=duration_value, cover_url=track_info.get("hts_MVPIC") or track_info.get("albumpic") or track_info.get("pic"), ext=guess_rough_audio_format(client.source, track_info), ) ) return _apply_work_dir(client, _normalize_text(playlist_name, "playlist"), song_infos) def build_kuwo_playlist_song_infos(client, playlist_url: str, request_overrides: dict | None = None) -> list[SongInfo]: request_overrides = copy.deepcopy(request_overrides or {}) request_overrides.setdefault("timeout", (10, 30)) playlist_id = _extract_playlist_id_from_url(playlist_url, query_keys=("id", "pid")) if not playlist_id: return [] raw_tracks: list[dict] = [] page = 1 playlist_result_first = {} while True: response = client.get( f"https://m.kuwo.cn/newh5app/wapi/api/www/playlist/playListInfo?pid={playlist_id}&pn={page}&rn=100", **request_overrides, ) response.raise_for_status() playlist_result = response.json() page_tracks = safeextractfromdict(playlist_result, ["data", "musicList"], []) or [] if not page_tracks: break raw_tracks.extend(page_tracks) page += 1 if not playlist_result_first: playlist_result_first = copy.deepcopy(playlist_result) if float(safeextractfromdict(playlist_result, ["data", "total"], 0) or 0) <= len(raw_tracks): break deduped_tracks = list({str(track.get("musicrid") or track.get("rid") or ""): track for track in raw_tracks}.values()) playlist_name = safeextractfromdict(playlist_result_first, ["data", "name"], None) or f"playlist-{playlist_id}" return build_kuwo_raw_track_song_infos(client, deduped_tracks, playlist_name=playlist_name)