Files

261 lines
8.8 KiB
Python

from __future__ import annotations
import json
import re
import subprocess
from bs4 import BeautifulSoup
from ..models import PlaylistCandidate
from .base import BaseCollector
PLAYLIST_SQUARE_URL = "https://www.kuwo.cn/playlist"
TOPLIST_URL = "https://www.kuwo.cn/rankList"
NUXT_SCRIPT_RE = re.compile(r"<script>\s*window\.__NUXT__=(.*?)</script>", re.DOTALL)
NUXT_FUNCTION_RE = re.compile(
r"^\(function\((?P<params>.*?)\)\s*\{\s*return\s+(?P<body>.*)\}\)\((?P<args>.*)\)\s*;?\s*$",
re.DOTALL,
)
_COUNT_UNIT_MULTIPLIERS = {
"": 10_000,
"亿": 100_000_000,
}
def _parse_play_count(value: object) -> int | None:
if value in (None, ""):
return None
if isinstance(value, bool):
return None
if isinstance(value, (int, float)):
return int(value)
text = re.sub(r"\s+", "", str(value)).replace(",", "")
if not text:
return None
if text.isdigit():
return int(text)
match = re.search(r"([0-9]+(?:\.[0-9]+)?)([万亿])", text)
if not match:
return None
number_value = float(match.group(1))
multiplier = _COUNT_UNIT_MULTIPLIERS.get(match.group(2))
if multiplier is None:
return None
return int(number_value * multiplier)
def split_js_arguments(text: str) -> list[str]:
items: list[str] = []
current: list[str] = []
quote_char = ""
escape = False
depth = 0
for char in str(text or ""):
if escape:
current.append(char)
escape = False
continue
if quote_char:
current.append(char)
if char == "\\":
escape = True
elif char == quote_char:
quote_char = ""
continue
if char in {"'", '"'}:
current.append(char)
quote_char = char
continue
if char in "([{":
depth += 1
current.append(char)
continue
if char in ")]}":
depth = max(depth - 1, 0)
current.append(char)
continue
if char == "," and depth == 0:
item = "".join(current).strip()
if item:
items.append(item)
current = []
continue
current.append(char)
tail = "".join(current).strip()
if tail:
items.append(tail)
return items
def resolve_js_value(token: str, variables: dict[str, object] | None = None):
token = str(token or "").strip()
variables = variables or {}
if not token:
return None
if token in variables:
return variables[token]
if token in {"true", "false", "null"}:
return {"true": True, "false": False, "null": None}[token]
if token.startswith(("'", '"')) and token.endswith(("'", '"')):
normalized = token
if token.startswith("'") and token.endswith("'"):
normalized = '"' + token[1:-1].replace("\\", "\\\\").replace('"', '\\"') + '"'
return json.loads(normalized)
try:
if "." in token:
return float(token)
return int(token)
except ValueError:
return token
def extract_kuwo_bang_menu_items(script_body: str) -> list[dict]:
match = NUXT_FUNCTION_RE.match(str(script_body or "").strip())
if not match:
return []
params = [part.strip() for part in str(match.group("params") or "").split(",") if part.strip()]
args = [resolve_js_value(part) for part in split_js_arguments(match.group("args") or "")]
variables = {name: value for name, value in zip(params, args)}
body = str(match.group("body") or "")
if "bangMenu" not in body:
return []
item_pattern = re.compile(
r"\{sourceid:(?P<sourceid>[^,]+),.*?name:(?P<name>[^,]+),\s*id:(?P<id>[^,]+),\s*source:(?P<source>[^,]+),\s*pic:(?P<pic>[^,]+),\s*pub:(?P<pub>[^,}\]]+)(?:,\s*(?:listencnt|playCount|listenCount):(?P<play_count>[^,}\]]+))?",
re.DOTALL,
)
items: list[dict] = []
for item_match in item_pattern.finditer(body):
resolved = {
key: resolve_js_value(item_match.group(key), variables)
for key in ("sourceid", "name", "id", "source", "pic", "pub", "play_count")
}
if not resolved.get("id"):
continue
items.append(resolved)
return items
def extract_nuxt_state(html: str) -> dict | None:
match = NUXT_SCRIPT_RE.search(html)
if not match:
return None
script_body = match.group(1)
node_script = (
"const window = {}; "
f"window.__NUXT__={script_body}; "
"process.stdout.write(JSON.stringify(window.__NUXT__));"
)
try:
completed = subprocess.run(
["node", "-e", node_script],
check=True,
capture_output=True,
timeout=10,
)
except Exception:
return None
output = completed.stdout.decode("utf-8", errors="ignore").strip()
if not output:
return None
try:
return json.loads(output)
except json.JSONDecodeError:
return None
def parse_playlist_square_html(html: str) -> list[PlaylistCandidate]:
soup = BeautifulSoup(html, "lxml")
items: list[PlaylistCandidate] = []
seen: set[str] = set()
for anchor in soup.select("a[href*='playlist_detail/']"):
href = anchor.get("href", "").strip()
remote_id = href.rstrip("/").split("/")[-1]
if not remote_id or remote_id in seen:
continue
seen.add(remote_id)
absolute_url = href if href.startswith("http") else f"https://www.kuwo.cn{href}"
name = anchor.get("title") or anchor.get_text(strip=True) or remote_id
cover = (anchor.find("img") or {}).get("src")
play_count_node = anchor.select_one(".num")
items.append(
PlaylistCandidate(
platform="kuwo",
pool_kind="playlist_square",
remote_id=remote_id,
name=name,
url=absolute_url,
cover_url=cover,
play_count=_parse_play_count(
play_count_node.get_text(" ", strip=True) if play_count_node else None
),
)
)
return items
def _extract_toplist_play_count(entry: dict) -> int | None:
for key in ("listencnt", "play_count", "playCount", "listenCount"):
parsed = _parse_play_count(entry.get(key))
if parsed is not None:
return parsed
return None
def parse_toplist_html(html: str) -> list[PlaylistCandidate]:
items: list[PlaylistCandidate] = []
state = extract_nuxt_state(html)
if not state:
for entry in extract_kuwo_bang_menu_items(NUXT_SCRIPT_RE.search(html).group(1) if NUXT_SCRIPT_RE.search(html) else ""):
remote_id = str(entry.get("id", "")).strip()
if not remote_id:
continue
items.append(
PlaylistCandidate(
platform="kuwo",
pool_kind="toplist",
remote_id=remote_id,
name=entry.get("name") or remote_id,
url=f"https://www.kuwo.cn/rankList?bangId={remote_id}",
cover_url=entry.get("pic"),
parse_strategy="kuwo_toplist",
play_count=_extract_toplist_play_count(entry),
metadata={"sourceid": str(entry.get("sourceid", "")), "pub": entry.get("pub")},
)
)
return items
for group in state.get("data", []) or []:
for menu in group.get("bangMenu", []) or []:
for entry in menu.get("list", []) or []:
remote_id = str(entry.get("id", "")).strip()
if not remote_id:
continue
items.append(
PlaylistCandidate(
platform="kuwo",
pool_kind="toplist",
remote_id=remote_id,
name=entry.get("name") or remote_id,
url=f"https://www.kuwo.cn/rankList?bangId={remote_id}",
cover_url=entry.get("pic"),
parse_strategy="kuwo_toplist",
play_count=_extract_toplist_play_count(entry),
metadata={"sourceid": str(entry.get("sourceid", "")), "pub": entry.get("pub")},
)
)
return items
class KuwoCollector(BaseCollector):
def collect_playlist_square(self, page: int = 1, page_size: int = 30) -> list[PlaylistCandidate]:
response = self.get(
PLAYLIST_SQUARE_URL,
params={"pn": str(max(page, 1)), "rn": str(max(page_size, 1))},
)
return parse_playlist_square_html(response.text)
def collect_toplist(self) -> list[PlaylistCandidate]:
response = self.get(TOPLIST_URL)
return parse_toplist_html(response.text)