from __future__ import annotations import re from bs4 import BeautifulSoup from ..models import PlaylistCandidate from .base import BaseCollector PLAYLIST_SQUARE_URL = "https://music.163.com/discover/playlist" TOPLIST_API_URL = "https://music.163.com/api/toplist/detail" _COUNT_UNIT_MULTIPLIERS = { "万": 10_000, "亿": 100_000_000, } def _parse_play_count(value: object) -> int | None: if value in (None, ""): return None if isinstance(value, bool): return None if isinstance(value, (int, float)): return int(value) text = re.sub(r"\s+", "", str(value)).replace(",", "") if not text: return None if text.isdigit(): return int(text) match = re.search(r"([0-9]+(?:\.[0-9]+)?)([万亿])", text) if not match: return None number_value = float(match.group(1)) multiplier = _COUNT_UNIT_MULTIPLIERS.get(match.group(2)) if multiplier is None: return None return int(number_value * multiplier) def parse_playlist_square_html(html: str) -> list[PlaylistCandidate]: soup = BeautifulSoup(html, "lxml") items: list[PlaylistCandidate] = [] seen: set[str] = set() for anchor in soup.select("a.msk[href*='/playlist?id=']"): href = anchor.get("href", "") remote_id = href.split("id=")[-1].strip() if not remote_id or remote_id in seen: continue seen.add(remote_id) cover_node = anchor.parent if anchor.parent else anchor play_count_node = cover_node.select_one(".nb") items.append( PlaylistCandidate( platform="netease", pool_kind="playlist_square", remote_id=remote_id, name=anchor.get("title") or remote_id, url=f"https://music.163.com/#/playlist?id={remote_id}", cover_url=(anchor.find_previous("img") or {}).get("src"), play_count=_parse_play_count( play_count_node.get_text(" ", strip=True) if play_count_node else None ), ) ) return items def parse_toplist_payload(payload: dict) -> list[PlaylistCandidate]: items: list[PlaylistCandidate] = [] for entry in payload.get("list", []) or []: remote_id = str(entry.get("id", "")).strip() if not remote_id: continue items.append( PlaylistCandidate( platform="netease", pool_kind="toplist", remote_id=remote_id, name=entry.get("name") or remote_id, url=f"https://music.163.com/#/playlist?id={remote_id}", cover_url=entry.get("coverImgUrl"), parse_strategy="netease_toplist", play_count=_parse_play_count( entry.get("playCount") or entry.get("subscribedCount") ), metadata={"update_frequency": entry.get("updateFrequency")}, ) ) return items class NeteaseCollector(BaseCollector): def collect_playlist_square( self, category: str = "全部", order: str = "hot", page: int = 1, page_size: int = 35, offset: int | None = None, ) -> list[PlaylistCandidate]: if offset is None: offset = max(page - 1, 0) * max(page_size, 1) response = self.get( PLAYLIST_SQUARE_URL, params={"cat": category, "order": order, "offset": offset}, ) return parse_playlist_square_html(response.text) def collect_toplist(self) -> list[PlaylistCandidate]: response = self.get(TOPLIST_API_URL) return parse_toplist_payload(response.json())