114 lines
3.7 KiB
Python
114 lines
3.7 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
from ..models import PlaylistCandidate
|
|
from .base import BaseCollector
|
|
|
|
|
|
PLAYLIST_SQUARE_URL = "https://music.163.com/discover/playlist"
|
|
TOPLIST_API_URL = "https://music.163.com/api/toplist/detail"
|
|
|
|
_COUNT_UNIT_MULTIPLIERS = {
|
|
"万": 10_000,
|
|
"亿": 100_000_000,
|
|
}
|
|
|
|
|
|
def _parse_play_count(value: object) -> int | None:
|
|
if value in (None, ""):
|
|
return None
|
|
if isinstance(value, bool):
|
|
return None
|
|
if isinstance(value, (int, float)):
|
|
return int(value)
|
|
text = re.sub(r"\s+", "", str(value)).replace(",", "")
|
|
if not text:
|
|
return None
|
|
if text.isdigit():
|
|
return int(text)
|
|
match = re.search(r"([0-9]+(?:\.[0-9]+)?)([万亿])", text)
|
|
if not match:
|
|
return None
|
|
number_value = float(match.group(1))
|
|
multiplier = _COUNT_UNIT_MULTIPLIERS.get(match.group(2))
|
|
if multiplier is None:
|
|
return None
|
|
return int(number_value * multiplier)
|
|
|
|
|
|
def parse_playlist_square_html(html: str) -> list[PlaylistCandidate]:
|
|
soup = BeautifulSoup(html, "lxml")
|
|
items: list[PlaylistCandidate] = []
|
|
seen: set[str] = set()
|
|
for anchor in soup.select("a.msk[href*='/playlist?id=']"):
|
|
href = anchor.get("href", "")
|
|
remote_id = href.split("id=")[-1].strip()
|
|
if not remote_id or remote_id in seen:
|
|
continue
|
|
seen.add(remote_id)
|
|
cover_node = anchor.parent if anchor.parent else anchor
|
|
play_count_node = cover_node.select_one(".nb")
|
|
items.append(
|
|
PlaylistCandidate(
|
|
platform="netease",
|
|
pool_kind="playlist_square",
|
|
remote_id=remote_id,
|
|
name=anchor.get("title") or remote_id,
|
|
url=f"https://music.163.com/#/playlist?id={remote_id}",
|
|
cover_url=(anchor.find_previous("img") or {}).get("src"),
|
|
play_count=_parse_play_count(
|
|
play_count_node.get_text(" ", strip=True) if play_count_node else None
|
|
),
|
|
)
|
|
)
|
|
return items
|
|
|
|
|
|
def parse_toplist_payload(payload: dict) -> list[PlaylistCandidate]:
|
|
items: list[PlaylistCandidate] = []
|
|
for entry in payload.get("list", []) or []:
|
|
remote_id = str(entry.get("id", "")).strip()
|
|
if not remote_id:
|
|
continue
|
|
items.append(
|
|
PlaylistCandidate(
|
|
platform="netease",
|
|
pool_kind="toplist",
|
|
remote_id=remote_id,
|
|
name=entry.get("name") or remote_id,
|
|
url=f"https://music.163.com/#/playlist?id={remote_id}",
|
|
cover_url=entry.get("coverImgUrl"),
|
|
parse_strategy="netease_toplist",
|
|
play_count=_parse_play_count(
|
|
entry.get("playCount") or entry.get("subscribedCount")
|
|
),
|
|
metadata={"update_frequency": entry.get("updateFrequency")},
|
|
)
|
|
)
|
|
return items
|
|
|
|
|
|
class NeteaseCollector(BaseCollector):
|
|
def collect_playlist_square(
|
|
self,
|
|
category: str = "全部",
|
|
order: str = "hot",
|
|
page: int = 1,
|
|
page_size: int = 35,
|
|
offset: int | None = None,
|
|
) -> list[PlaylistCandidate]:
|
|
if offset is None:
|
|
offset = max(page - 1, 0) * max(page_size, 1)
|
|
response = self.get(
|
|
PLAYLIST_SQUARE_URL,
|
|
params={"cat": category, "order": order, "offset": offset},
|
|
)
|
|
return parse_playlist_square_html(response.text)
|
|
|
|
def collect_toplist(self) -> list[PlaylistCandidate]:
|
|
response = self.get(TOPLIST_API_URL)
|
|
return parse_toplist_payload(response.json())
|