from __future__ import annotations import re from dataclasses import dataclass, field from typing import Any SOURCE_NAME_MAP = { "NeteaseMusicClient": "netease", "QQMusicClient": "qq", "KuwoMusicClient": "kuwo", "netease": "netease", "qq": "qq", "kuwo": "kuwo", } ARTIST_SPLIT_RE = re.compile(r"\s*(?:/|,|,|、|&|\|)\s*") def remove_suffix(value: str, suffix: str) -> str: if suffix and value.endswith(suffix): return value[: -len(suffix)] return value def normalize_source_name(source: str | None) -> str: if not source: return "unknown" return SOURCE_NAME_MAP.get(source, remove_suffix(str(source), "MusicClient").lower()) def get_field(obj: Any, key: str, default: Any = None) -> Any: if isinstance(obj, dict): return obj.get(key, default) return getattr(obj, key, default) def serialize_song_info(song_info: Any) -> dict[str, Any]: if song_info is None: return {} if isinstance(song_info, dict): return dict(song_info) if hasattr(song_info, "todict") and callable(song_info.todict): return song_info.todict() if hasattr(song_info, "__dict__"): return { key: value for key, value in vars(song_info).items() if not key.startswith("_") } return {} def deserialize_song_info(snapshot: dict[str, Any] | None): if not snapshot: return None from musicdl.modules.utils.data import SongInfo return SongInfo.fromdict(snapshot) def parse_size_to_bytes(file_size: Any) -> int | None: if file_size in {None, "", "NULL"}: return None if isinstance(file_size, (int, float)): return int(file_size) text = str(file_size).strip().upper().replace("IB", "B") match = re.match(r"^([0-9]+(?:\.[0-9]+)?)\s*([KMGTP]?B)$", text) if not match: return None value = float(match.group(1)) unit = match.group(2) multiplier = { "B": 1, "KB": 1024, "MB": 1024**2, "GB": 1024**3, "TB": 1024**4, "PB": 1024**5, }[unit] return int(value * multiplier) def dedupe_preserve_order(values: list[str]) -> list[str]: seen: set[str] = set() result: list[str] = [] for value in values: stripped = value.strip() if not stripped or stripped in seen: continue seen.add(stripped) result.append(stripped) return result def extract_artist_names(raw_data: dict | None, singers_text: str | None = None) -> list[str]: raw_data = raw_data or {} search_data = raw_data.get("search") if isinstance(raw_data, dict) else {} candidates: list[str] = [] for key in ("ar", "artists", "singer"): value = search_data.get(key) if isinstance(value, list): for item in value: if isinstance(item, dict) and item.get("name"): candidates.append(str(item["name"])) for key in ("artist", "ARTIST", "author", "singerName", "singers"): value = search_data.get(key) if isinstance(value, str): candidates.extend(ARTIST_SPLIT_RE.split(value)) if singers_text: candidates.extend(ARTIST_SPLIT_RE.split(str(singers_text))) return dedupe_preserve_order(candidates) @dataclass class PlaylistCandidate: platform: str pool_kind: str remote_id: str name: str url: str parse_strategy: str = "playlist_url" cover_url: str | None = None creator_name: str | None = None play_count: int | None = None collected_song_count: int | None = None metadata: dict[str, Any] = field(default_factory=dict) @property def playlist_key(self) -> str: return f"{self.platform}:{self.remote_id}" @dataclass class CatalogSong: platform: str remote_song_id: str name: str | None = None singers: str | None = None album: str | None = None ext: str | None = None file_size_bytes: int | None = None file_size_label: str | None = None quality_label: str | None = None metadata: dict[str, Any] = field(default_factory=dict) @property def song_key(self) -> str: return f"{self.platform}:{self.remote_song_id}" @classmethod def from_song_info(cls, song_info: Any) -> "CatalogSong": raw_data = get_field(song_info, "raw_data", {}) or {} file_size_bytes = get_field(song_info, "file_size_bytes") if file_size_bytes is None: file_size_bytes = parse_size_to_bytes(get_field(song_info, "file_size")) return cls( platform=normalize_source_name(get_field(song_info, "source")), remote_song_id=str(get_field(song_info, "identifier")), name=get_field(song_info, "song_name"), singers=get_field(song_info, "singers"), album=get_field(song_info, "album"), ext=get_field(song_info, "ext"), file_size_bytes=file_size_bytes, file_size_label=get_field(song_info, "file_size"), quality_label=raw_data.get("quality"), metadata={"raw_data": raw_data, "snapshot": serialize_song_info(song_info)}, )