from __future__ import annotations import json import re import subprocess from bs4 import BeautifulSoup from ..models import PlaylistCandidate from .base import BaseCollector PLAYLIST_SQUARE_URL = "https://www.kuwo.cn/playlist" TOPLIST_URL = "https://www.kuwo.cn/rankList" NUXT_SCRIPT_RE = re.compile(r"", re.DOTALL) NUXT_FUNCTION_RE = re.compile( r"^\(function\((?P.*?)\)\s*\{\s*return\s+(?P.*)\}\)\((?P.*)\)\s*;?\s*$", re.DOTALL, ) _COUNT_UNIT_MULTIPLIERS = { "万": 10_000, "亿": 100_000_000, } def _parse_play_count(value: object) -> int | None: if value in (None, ""): return None if isinstance(value, bool): return None if isinstance(value, (int, float)): return int(value) text = re.sub(r"\s+", "", str(value)).replace(",", "") if not text: return None if text.isdigit(): return int(text) match = re.search(r"([0-9]+(?:\.[0-9]+)?)([万亿])", text) if not match: return None number_value = float(match.group(1)) multiplier = _COUNT_UNIT_MULTIPLIERS.get(match.group(2)) if multiplier is None: return None return int(number_value * multiplier) def split_js_arguments(text: str) -> list[str]: items: list[str] = [] current: list[str] = [] quote_char = "" escape = False depth = 0 for char in str(text or ""): if escape: current.append(char) escape = False continue if quote_char: current.append(char) if char == "\\": escape = True elif char == quote_char: quote_char = "" continue if char in {"'", '"'}: current.append(char) quote_char = char continue if char in "([{": depth += 1 current.append(char) continue if char in ")]}": depth = max(depth - 1, 0) current.append(char) continue if char == "," and depth == 0: item = "".join(current).strip() if item: items.append(item) current = [] continue current.append(char) tail = "".join(current).strip() if tail: items.append(tail) return items def resolve_js_value(token: str, variables: dict[str, object] | None = None): token = str(token or "").strip() variables = variables or {} if not token: return None if token in variables: return variables[token] if token in {"true", "false", "null"}: return {"true": True, "false": False, "null": None}[token] if token.startswith(("'", '"')) and token.endswith(("'", '"')): normalized = token if token.startswith("'") and token.endswith("'"): normalized = '"' + token[1:-1].replace("\\", "\\\\").replace('"', '\\"') + '"' return json.loads(normalized) try: if "." in token: return float(token) return int(token) except ValueError: return token def extract_kuwo_bang_menu_items(script_body: str) -> list[dict]: match = NUXT_FUNCTION_RE.match(str(script_body or "").strip()) if not match: return [] params = [part.strip() for part in str(match.group("params") or "").split(",") if part.strip()] args = [resolve_js_value(part) for part in split_js_arguments(match.group("args") or "")] variables = {name: value for name, value in zip(params, args)} body = str(match.group("body") or "") if "bangMenu" not in body: return [] item_pattern = re.compile( r"\{sourceid:(?P[^,]+),.*?name:(?P[^,]+),\s*id:(?P[^,]+),\s*source:(?P[^,]+),\s*pic:(?P[^,]+),\s*pub:(?P[^,}\]]+)(?:,\s*(?:listencnt|playCount|listenCount):(?P[^,}\]]+))?", re.DOTALL, ) items: list[dict] = [] for item_match in item_pattern.finditer(body): resolved = { key: resolve_js_value(item_match.group(key), variables) for key in ("sourceid", "name", "id", "source", "pic", "pub", "play_count") } if not resolved.get("id"): continue items.append(resolved) return items def extract_nuxt_state(html: str) -> dict | None: match = NUXT_SCRIPT_RE.search(html) if not match: return None script_body = match.group(1) node_script = ( "const window = {}; " f"window.__NUXT__={script_body}; " "process.stdout.write(JSON.stringify(window.__NUXT__));" ) try: completed = subprocess.run( ["node", "-e", node_script], check=True, capture_output=True, timeout=10, ) except Exception: return None output = completed.stdout.decode("utf-8", errors="ignore").strip() if not output: return None try: return json.loads(output) except json.JSONDecodeError: return None def parse_playlist_square_html(html: str) -> list[PlaylistCandidate]: soup = BeautifulSoup(html, "lxml") items: list[PlaylistCandidate] = [] seen: set[str] = set() for anchor in soup.select("a[href*='playlist_detail/']"): href = anchor.get("href", "").strip() remote_id = href.rstrip("/").split("/")[-1] if not remote_id or remote_id in seen: continue seen.add(remote_id) absolute_url = href if href.startswith("http") else f"https://www.kuwo.cn{href}" name = anchor.get("title") or anchor.get_text(strip=True) or remote_id cover = (anchor.find("img") or {}).get("src") play_count_node = anchor.select_one(".num") items.append( PlaylistCandidate( platform="kuwo", pool_kind="playlist_square", remote_id=remote_id, name=name, url=absolute_url, cover_url=cover, play_count=_parse_play_count( play_count_node.get_text(" ", strip=True) if play_count_node else None ), ) ) return items def _extract_toplist_play_count(entry: dict) -> int | None: for key in ("listencnt", "play_count", "playCount", "listenCount"): parsed = _parse_play_count(entry.get(key)) if parsed is not None: return parsed return None def parse_toplist_html(html: str) -> list[PlaylistCandidate]: items: list[PlaylistCandidate] = [] state = extract_nuxt_state(html) if not state: for entry in extract_kuwo_bang_menu_items(NUXT_SCRIPT_RE.search(html).group(1) if NUXT_SCRIPT_RE.search(html) else ""): remote_id = str(entry.get("id", "")).strip() if not remote_id: continue items.append( PlaylistCandidate( platform="kuwo", pool_kind="toplist", remote_id=remote_id, name=entry.get("name") or remote_id, url=f"https://www.kuwo.cn/rankList?bangId={remote_id}", cover_url=entry.get("pic"), parse_strategy="kuwo_toplist", play_count=_extract_toplist_play_count(entry), metadata={"sourceid": str(entry.get("sourceid", "")), "pub": entry.get("pub")}, ) ) return items for group in state.get("data", []) or []: for menu in group.get("bangMenu", []) or []: for entry in menu.get("list", []) or []: remote_id = str(entry.get("id", "")).strip() if not remote_id: continue items.append( PlaylistCandidate( platform="kuwo", pool_kind="toplist", remote_id=remote_id, name=entry.get("name") or remote_id, url=f"https://www.kuwo.cn/rankList?bangId={remote_id}", cover_url=entry.get("pic"), parse_strategy="kuwo_toplist", play_count=_extract_toplist_play_count(entry), metadata={"sourceid": str(entry.get("sourceid", "")), "pub": entry.get("pub")}, ) ) return items class KuwoCollector(BaseCollector): def collect_playlist_square(self, page: int = 1, page_size: int = 30) -> list[PlaylistCandidate]: response = self.get( PLAYLIST_SQUARE_URL, params={"pn": str(max(page, 1)), "rn": str(max(page_size, 1))}, ) return parse_playlist_square_html(response.text) def collect_toplist(self) -> list[PlaylistCandidate]: response = self.get(TOPLIST_URL) return parse_toplist_html(response.text)