333 lines
14 KiB
Python
333 lines
14 KiB
Python
from __future__ import annotations
|
|
|
|
import copy
|
|
import json
|
|
from urllib.parse import parse_qs, urlparse
|
|
|
|
from musicdl.modules.utils import SongInfo, safeextractfromdict, seconds2hms
|
|
|
|
|
|
def _parse_duration_seconds(value) -> int:
|
|
try:
|
|
return max(int(float(value or 0)), 0)
|
|
except Exception:
|
|
return 0
|
|
|
|
|
|
def _has_positive_value(value) -> bool:
|
|
try:
|
|
return float(value or 0) > 0
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _normalize_text(value, default: str = "NULL") -> str:
|
|
text = str(value or "").strip()
|
|
if not text:
|
|
return default
|
|
return text
|
|
|
|
|
|
def _join_artist_names(value) -> str:
|
|
if isinstance(value, (list, tuple)):
|
|
names = []
|
|
for item in value:
|
|
if isinstance(item, dict):
|
|
name = str(item.get("name", "")).strip()
|
|
else:
|
|
name = str(item or "").strip()
|
|
if name and name not in names:
|
|
names.append(name)
|
|
return ", ".join(names) if names else "NULL"
|
|
text = str(value or "").replace("/", ", ").strip()
|
|
return text or "NULL"
|
|
|
|
|
|
def _normalize_audio_ext(value: str | None) -> str:
|
|
return str(value or "").strip().lower().lstrip(".")
|
|
|
|
|
|
def _remove_suffix(value: str, suffix: str) -> str:
|
|
text = str(value or "")
|
|
token = str(suffix or "")
|
|
if token and text.endswith(token):
|
|
return text[: -len(token)]
|
|
return text
|
|
|
|
|
|
def _remove_prefix(value: str, prefix: str) -> str:
|
|
text = str(value or "")
|
|
token = str(prefix or "")
|
|
if token and text.startswith(token):
|
|
return text[len(token) :]
|
|
return text
|
|
|
|
|
|
def guess_rough_audio_format(source: str, search_result: dict) -> str:
|
|
source_name = str(source or "")
|
|
if source_name == "QQMusicClient":
|
|
file_meta = safeextractfromdict(search_result, ["file"], {}) or {}
|
|
if any(_has_positive_value(file_meta.get(key)) for key in ("size_hires", "size_try", "size_flac", "size_ape")):
|
|
return "flac"
|
|
if any(_has_positive_value(file_meta.get(key)) for key in ("size_320mp3", "size_mp3", "size_128mp3")):
|
|
return "mp3"
|
|
if any(_has_positive_value(search_result.get(key)) for key in ("sizeflac", "sizeape")):
|
|
return "flac"
|
|
if any(_has_positive_value(search_result.get(key)) for key in ("size320", "size128", "sizeogg")):
|
|
return "mp3"
|
|
return ""
|
|
if source_name == "KuwoMusicClient":
|
|
meta_text = str(search_result.get("MINFO") or search_result.get("formats") or "").lower()
|
|
if any(token in meta_text for token in ("flac", "ape", "wav", "lossless", "hires")):
|
|
return "flac"
|
|
if any(token in meta_text for token in ("mp3", "320kmp3", "192kmp3", "128kmp3")):
|
|
return "mp3"
|
|
return ""
|
|
if source_name == "NeteaseMusicClient":
|
|
if _has_positive_value(safeextractfromdict(search_result, ["hr", "size"], 0)) or _has_positive_value(
|
|
safeextractfromdict(search_result, ["sq", "size"], 0)
|
|
):
|
|
return "flac"
|
|
if any(
|
|
_has_positive_value(safeextractfromdict(search_result, [quality_key, "size"], 0))
|
|
for quality_key in ("h", "m", "l")
|
|
):
|
|
return "mp3"
|
|
return ""
|
|
return ""
|
|
|
|
|
|
def build_deferred_song_info(
|
|
source: str,
|
|
raw_search_result: dict,
|
|
identifier,
|
|
song_name,
|
|
singers,
|
|
album: str | None = None,
|
|
duration_s: int | float = 0,
|
|
cover_url: str | None = None,
|
|
ext: str | None = None,
|
|
) -> SongInfo:
|
|
duration_seconds = _parse_duration_seconds(duration_s)
|
|
return SongInfo(
|
|
raw_data={"search": copy.deepcopy(raw_search_result or {}), "deferred_search": True},
|
|
source=str(source),
|
|
song_name=_normalize_text(song_name),
|
|
singers=_normalize_text(singers),
|
|
album=_normalize_text(album),
|
|
ext=_normalize_audio_ext(ext),
|
|
file_size_bytes=None,
|
|
file_size=None,
|
|
identifier=str(identifier or "").strip(),
|
|
duration_s=duration_seconds,
|
|
duration=seconds2hms(duration_seconds) if duration_seconds > 0 else "-:-:-",
|
|
lyric="NULL",
|
|
cover_url=str(cover_url or "").strip() or None,
|
|
download_url=None,
|
|
download_url_status={},
|
|
)
|
|
|
|
|
|
def _apply_work_dir(client, playlist_name: str, song_infos: list[SongInfo]) -> list[SongInfo]:
|
|
if not song_infos:
|
|
return []
|
|
if hasattr(client, "_constructuniqueworkdir") and callable(client._constructuniqueworkdir):
|
|
work_dir = client._constructuniqueworkdir(keyword=playlist_name)
|
|
for song_info in song_infos:
|
|
song_info.work_dir = work_dir
|
|
if hasattr(client, "_removeduplicates") and callable(client._removeduplicates):
|
|
return client._removeduplicates(song_infos=song_infos)
|
|
return song_infos
|
|
|
|
|
|
def _extract_playlist_id_from_url(playlist_url: str, query_keys: tuple[str, ...] = ("id", "pid", "bangId")) -> str:
|
|
parsed = urlparse(str(playlist_url or "").strip())
|
|
query_candidates = [parsed.query]
|
|
fragment = str(parsed.fragment or "").strip()
|
|
if fragment:
|
|
fragment_url = fragment if "://" in fragment else f"https://placeholder{fragment if fragment.startswith('/') else '/' + fragment}"
|
|
query_candidates.append(urlparse(fragment_url).query)
|
|
for query_text in query_candidates:
|
|
parsed_query = parse_qs(query_text, keep_blank_values=True)
|
|
for query_key in query_keys:
|
|
candidate = str((parsed_query.get(query_key) or [""])[0]).strip()
|
|
if candidate:
|
|
return candidate
|
|
for path_part in reversed([part for part in parsed.path.split("/") if part]):
|
|
candidate = _remove_suffix(_remove_suffix(str(path_part), ".html"), ".htm").strip()
|
|
if candidate:
|
|
return candidate
|
|
return ""
|
|
|
|
|
|
def build_netease_playlist_song_infos(client, playlist_url: str, request_overrides: dict | None = None) -> list[SongInfo]:
|
|
request_overrides = copy.deepcopy(request_overrides or {})
|
|
request_overrides.setdefault("timeout", (10, 30))
|
|
playlist_id = _extract_playlist_id_from_url(playlist_url, query_keys=("id",))
|
|
if not playlist_id:
|
|
return []
|
|
response = client.post("https://music.163.com/api/v6/playlist/detail", data={"id": playlist_id}, **request_overrides)
|
|
response.raise_for_status()
|
|
playlist_result = response.json()
|
|
playlist_info = safeextractfromdict(playlist_result, ["playlist"], {}) or {}
|
|
track_refs = safeextractfromdict(playlist_info, ["trackIds"], []) or []
|
|
playlist_name = _normalize_text(playlist_info.get("name"), f"playlist-{playlist_id}")
|
|
if not track_refs:
|
|
return []
|
|
|
|
track_details_by_id: dict[str, dict] = {}
|
|
for track_info in safeextractfromdict(playlist_info, ["tracks"], []) or []:
|
|
track_id = str(track_info.get("id") or "").strip()
|
|
if track_id:
|
|
track_details_by_id[track_id] = track_info
|
|
|
|
missing_track_ids = [
|
|
str(track_ref.get("id") or "").strip()
|
|
for track_ref in track_refs
|
|
if str(track_ref.get("id") or "").strip() and str(track_ref.get("id") or "").strip() not in track_details_by_id
|
|
]
|
|
for offset in range(0, len(missing_track_ids), 200):
|
|
batch_track_ids = [track_id for track_id in missing_track_ids[offset : offset + 200] if track_id.isdigit()]
|
|
if not batch_track_ids:
|
|
continue
|
|
payload = json.dumps([{"id": int(track_id), "v": 0} for track_id in batch_track_ids], ensure_ascii=False, separators=(",", ":"))
|
|
detail_response = client.post(
|
|
"https://interface3.music.163.com/api/v3/song/detail",
|
|
data={"c": payload},
|
|
**request_overrides,
|
|
)
|
|
detail_response.raise_for_status()
|
|
for track_info in detail_response.json().get("songs", []) or []:
|
|
track_id = str(track_info.get("id") or "").strip()
|
|
if track_id:
|
|
track_details_by_id[track_id] = track_info
|
|
|
|
song_infos: list[SongInfo] = []
|
|
for track_ref in track_refs:
|
|
track_id = str(track_ref.get("id") or "").strip()
|
|
track_info = track_details_by_id.get(track_id)
|
|
if not track_id or not isinstance(track_info, dict):
|
|
continue
|
|
duration_value = 0
|
|
if str(track_info.get("dt", "")).strip():
|
|
try:
|
|
duration_value = float(track_info.get("dt", 0) or 0) / 1000
|
|
except Exception:
|
|
duration_value = 0
|
|
song_infos.append(
|
|
build_deferred_song_info(
|
|
source=client.source,
|
|
raw_search_result=track_info,
|
|
identifier=track_id,
|
|
song_name=track_info.get("name"),
|
|
singers=_join_artist_names(track_info.get("ar") or []),
|
|
album=safeextractfromdict(track_info, ["al", "name"], None),
|
|
duration_s=duration_value,
|
|
cover_url=safeextractfromdict(track_info, ["al", "picUrl"], None),
|
|
ext=guess_rough_audio_format(client.source, track_info),
|
|
)
|
|
)
|
|
return _apply_work_dir(client, playlist_name, song_infos)
|
|
|
|
|
|
def build_qq_raw_track_song_infos(client, raw_tracks: list[dict], playlist_name: str | None = None) -> list[SongInfo]:
|
|
song_infos: list[SongInfo] = []
|
|
for track_info in raw_tracks or []:
|
|
track_id = track_info.get("mid") or track_info.get("songmid") or track_info.get("songid") or track_info.get("id")
|
|
if not track_id:
|
|
continue
|
|
cover_mid = safeextractfromdict(track_info, ["album", "mid"], "") or track_info.get("albummid")
|
|
song_infos.append(
|
|
build_deferred_song_info(
|
|
source=client.source,
|
|
raw_search_result=track_info,
|
|
identifier=track_id,
|
|
song_name=track_info.get("title") or track_info.get("songname") or track_info.get("name"),
|
|
singers=_join_artist_names(track_info.get("singer") or []),
|
|
album=safeextractfromdict(track_info, ["album", "title"], None) or track_info.get("albumname"),
|
|
duration_s=track_info.get("interval", 0),
|
|
cover_url=f"https://y.gtimg.cn/music/photo_new/T002R800x800M000{cover_mid}.jpg" if cover_mid else None,
|
|
ext=guess_rough_audio_format(client.source, track_info),
|
|
)
|
|
)
|
|
return _apply_work_dir(client, _normalize_text(playlist_name, "playlist"), song_infos)
|
|
|
|
|
|
def build_qq_playlist_song_infos(client, playlist_url: str, request_overrides: dict | None = None) -> list[SongInfo]:
|
|
request_overrides = copy.deepcopy(request_overrides or {})
|
|
request_overrides.setdefault("timeout", (10, 30))
|
|
playlist_id = _extract_playlist_id_from_url(playlist_url, query_keys=("id", "disstid"))
|
|
if not playlist_id:
|
|
return []
|
|
response = client.get(
|
|
"https://c.y.qq.com/qzone/fcg-bin/fcg_ucc_getcdinfo_byids_cp.fcg",
|
|
headers={"Referer": f"https://y.qq.com/n/ryqq/playlist/{playlist_id}"},
|
|
params={"disstid": str(playlist_id), "type": "1", "json": "1", "utf8": "1", "onlysong": "0", "format": "json"},
|
|
**request_overrides,
|
|
)
|
|
response.raise_for_status()
|
|
playlist_result = response.json()
|
|
raw_tracks = (
|
|
safeextractfromdict(playlist_result, ["cdlist", 0, "songlist"], [])
|
|
or safeextractfromdict(playlist_result, ["cdlist", 0, "list"], [])
|
|
or safeextractfromdict(playlist_result, ["songlist"], [])
|
|
or []
|
|
)
|
|
playlist_name = safeextractfromdict(playlist_result, ["cdlist", 0, "dissname"], None) or f"playlist-{playlist_id}"
|
|
return build_qq_raw_track_song_infos(client, raw_tracks, playlist_name=playlist_name)
|
|
|
|
|
|
def build_kuwo_raw_track_song_infos(client, raw_tracks: list[dict], playlist_name: str | None = None) -> list[SongInfo]:
|
|
song_infos: list[SongInfo] = []
|
|
for track_info in raw_tracks or []:
|
|
track_id = _remove_prefix(str(track_info.get("MUSICRID") or track_info.get("musicrid") or track_info.get("rid") or ""), "MUSIC_")
|
|
if not track_id:
|
|
continue
|
|
duration_value = track_info.get("DURATION") or track_info.get("duration", 0)
|
|
song_infos.append(
|
|
build_deferred_song_info(
|
|
source=client.source,
|
|
raw_search_result=track_info,
|
|
identifier=track_id,
|
|
song_name=track_info.get("SONGNAME") or track_info.get("name"),
|
|
singers=track_info.get("ARTIST") or track_info.get("artist"),
|
|
album=track_info.get("ALBUM") or track_info.get("album"),
|
|
duration_s=duration_value,
|
|
cover_url=track_info.get("hts_MVPIC") or track_info.get("albumpic") or track_info.get("pic"),
|
|
ext=guess_rough_audio_format(client.source, track_info),
|
|
)
|
|
)
|
|
return _apply_work_dir(client, _normalize_text(playlist_name, "playlist"), song_infos)
|
|
|
|
|
|
def build_kuwo_playlist_song_infos(client, playlist_url: str, request_overrides: dict | None = None) -> list[SongInfo]:
|
|
request_overrides = copy.deepcopy(request_overrides or {})
|
|
request_overrides.setdefault("timeout", (10, 30))
|
|
playlist_id = _extract_playlist_id_from_url(playlist_url, query_keys=("id", "pid"))
|
|
if not playlist_id:
|
|
return []
|
|
|
|
raw_tracks: list[dict] = []
|
|
page = 1
|
|
playlist_result_first = {}
|
|
while True:
|
|
response = client.get(
|
|
f"https://m.kuwo.cn/newh5app/wapi/api/www/playlist/playListInfo?pid={playlist_id}&pn={page}&rn=100",
|
|
**request_overrides,
|
|
)
|
|
response.raise_for_status()
|
|
playlist_result = response.json()
|
|
page_tracks = safeextractfromdict(playlist_result, ["data", "musicList"], []) or []
|
|
if not page_tracks:
|
|
break
|
|
raw_tracks.extend(page_tracks)
|
|
page += 1
|
|
if not playlist_result_first:
|
|
playlist_result_first = copy.deepcopy(playlist_result)
|
|
if float(safeextractfromdict(playlist_result, ["data", "total"], 0) or 0) <= len(raw_tracks):
|
|
break
|
|
|
|
deduped_tracks = list({str(track.get("musicrid") or track.get("rid") or ""): track for track in raw_tracks}.values())
|
|
playlist_name = safeextractfromdict(playlist_result_first, ["data", "name"], None) or f"playlist-{playlist_id}"
|
|
return build_kuwo_raw_track_song_infos(client, deduped_tracks, playlist_name=playlist_name)
|