173 lines
5.1 KiB
Python
173 lines
5.1 KiB
Python
from __future__ import annotations
|
||
|
||
import re
|
||
from dataclasses import dataclass, field
|
||
from typing import Any
|
||
|
||
|
||
SOURCE_NAME_MAP = {
|
||
"NeteaseMusicClient": "netease",
|
||
"QQMusicClient": "qq",
|
||
"KuwoMusicClient": "kuwo",
|
||
"netease": "netease",
|
||
"qq": "qq",
|
||
"kuwo": "kuwo",
|
||
}
|
||
|
||
ARTIST_SPLIT_RE = re.compile(r"\s*(?:/|,|,|、|&|\|)\s*")
|
||
|
||
|
||
def remove_suffix(value: str, suffix: str) -> str:
|
||
if suffix and value.endswith(suffix):
|
||
return value[: -len(suffix)]
|
||
return value
|
||
|
||
|
||
def normalize_source_name(source: str | None) -> str:
|
||
if not source:
|
||
return "unknown"
|
||
return SOURCE_NAME_MAP.get(source, remove_suffix(str(source), "MusicClient").lower())
|
||
|
||
|
||
def get_field(obj: Any, key: str, default: Any = None) -> Any:
|
||
if isinstance(obj, dict):
|
||
return obj.get(key, default)
|
||
return getattr(obj, key, default)
|
||
|
||
|
||
def serialize_song_info(song_info: Any) -> dict[str, Any]:
|
||
if song_info is None:
|
||
return {}
|
||
if isinstance(song_info, dict):
|
||
return dict(song_info)
|
||
if hasattr(song_info, "todict") and callable(song_info.todict):
|
||
return song_info.todict()
|
||
if hasattr(song_info, "__dict__"):
|
||
return {
|
||
key: value
|
||
for key, value in vars(song_info).items()
|
||
if not key.startswith("_")
|
||
}
|
||
return {}
|
||
|
||
|
||
def deserialize_song_info(snapshot: dict[str, Any] | None):
|
||
if not snapshot:
|
||
return None
|
||
from musicdl.modules.utils.data import SongInfo
|
||
|
||
return SongInfo.fromdict(snapshot)
|
||
|
||
|
||
def parse_size_to_bytes(file_size: Any) -> int | None:
|
||
if file_size in {None, "", "NULL"}:
|
||
return None
|
||
if isinstance(file_size, (int, float)):
|
||
return int(file_size)
|
||
text = str(file_size).strip().upper().replace("IB", "B")
|
||
match = re.match(r"^([0-9]+(?:\.[0-9]+)?)\s*([KMGTP]?B)$", text)
|
||
if not match:
|
||
return None
|
||
value = float(match.group(1))
|
||
unit = match.group(2)
|
||
multiplier = {
|
||
"B": 1,
|
||
"KB": 1024,
|
||
"MB": 1024**2,
|
||
"GB": 1024**3,
|
||
"TB": 1024**4,
|
||
"PB": 1024**5,
|
||
}[unit]
|
||
return int(value * multiplier)
|
||
|
||
|
||
def dedupe_preserve_order(values: list[str]) -> list[str]:
|
||
seen: set[str] = set()
|
||
result: list[str] = []
|
||
for value in values:
|
||
stripped = value.strip()
|
||
if not stripped or stripped in seen:
|
||
continue
|
||
seen.add(stripped)
|
||
result.append(stripped)
|
||
return result
|
||
|
||
|
||
def extract_artist_names(raw_data: dict | None, singers_text: str | None = None) -> list[str]:
|
||
raw_data = raw_data or {}
|
||
search_data = raw_data.get("search") if isinstance(raw_data, dict) else {}
|
||
candidates: list[str] = []
|
||
|
||
for key in ("ar", "artists", "singer"):
|
||
value = search_data.get(key)
|
||
if isinstance(value, list):
|
||
for item in value:
|
||
if isinstance(item, dict) and item.get("name"):
|
||
candidates.append(str(item["name"]))
|
||
|
||
for key in ("artist", "ARTIST", "author", "singerName", "singers"):
|
||
value = search_data.get(key)
|
||
if isinstance(value, str):
|
||
candidates.extend(ARTIST_SPLIT_RE.split(value))
|
||
|
||
if singers_text:
|
||
candidates.extend(ARTIST_SPLIT_RE.split(str(singers_text)))
|
||
|
||
return dedupe_preserve_order(candidates)
|
||
|
||
|
||
@dataclass
|
||
class PlaylistCandidate:
|
||
platform: str
|
||
pool_kind: str
|
||
remote_id: str
|
||
name: str
|
||
url: str
|
||
parse_strategy: str = "playlist_url"
|
||
cover_url: str | None = None
|
||
creator_name: str | None = None
|
||
play_count: int | None = None
|
||
collected_song_count: int | None = None
|
||
metadata: dict[str, Any] = field(default_factory=dict)
|
||
|
||
@property
|
||
def playlist_key(self) -> str:
|
||
return f"{self.platform}:{self.remote_id}"
|
||
|
||
|
||
@dataclass
|
||
class CatalogSong:
|
||
platform: str
|
||
remote_song_id: str
|
||
name: str | None = None
|
||
singers: str | None = None
|
||
album: str | None = None
|
||
ext: str | None = None
|
||
file_size_bytes: int | None = None
|
||
file_size_label: str | None = None
|
||
quality_label: str | None = None
|
||
metadata: dict[str, Any] = field(default_factory=dict)
|
||
|
||
@property
|
||
def song_key(self) -> str:
|
||
return f"{self.platform}:{self.remote_song_id}"
|
||
|
||
@classmethod
|
||
def from_song_info(cls, song_info: Any) -> "CatalogSong":
|
||
raw_data = get_field(song_info, "raw_data", {}) or {}
|
||
file_size_bytes = get_field(song_info, "file_size_bytes")
|
||
if file_size_bytes is None:
|
||
file_size_bytes = parse_size_to_bytes(get_field(song_info, "file_size"))
|
||
return cls(
|
||
platform=normalize_source_name(get_field(song_info, "source")),
|
||
remote_song_id=str(get_field(song_info, "identifier")),
|
||
name=get_field(song_info, "song_name"),
|
||
singers=get_field(song_info, "singers"),
|
||
album=get_field(song_info, "album"),
|
||
ext=get_field(song_info, "ext"),
|
||
file_size_bytes=file_size_bytes,
|
||
file_size_label=get_field(song_info, "file_size"),
|
||
quality_label=raw_data.get("quality"),
|
||
metadata={"raw_data": raw_data, "snapshot": serialize_song_info(song_info)},
|
||
)
|