266 lines
9.8 KiB
Python
266 lines
9.8 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from urllib.parse import urlparse
|
|
|
|
import requests
|
|
|
|
from .runtime import sanitize_path_component
|
|
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
PLAYLIST_META_FILENAME = ".playlist_meta.json"
|
|
PLAYLIST_YAML_FILENAME = "playlist.yaml"
|
|
PLAYLIST_COVERS_DIRNAME = "covers"
|
|
MAX_COVER_BYTES = 10 * 1024 * 1024
|
|
|
|
_YAML_SAFE_TEXT_RE = re.compile(r"^[A-Za-z0-9_./%+\- :]+$")
|
|
_SPECIAL_YAML_TOKENS = (": ", "#", "[", "]", "{", "}", ",", "&", "*", "!", "|", ">", "'", '"', "@", "`")
|
|
_COVER_EXT_RE = re.compile(r"\.(jpg|jpeg|png|webp|gif|bmp)$", re.IGNORECASE)
|
|
|
|
_CONTENT_TYPE_TO_EXT = {
|
|
"image/jpeg": ".jpg",
|
|
"image/jpg": ".jpg",
|
|
"image/png": ".png",
|
|
"image/webp": ".webp",
|
|
"image/gif": ".gif",
|
|
"image/bmp": ".bmp",
|
|
}
|
|
|
|
|
|
def yaml_scalar(value: Any) -> str:
|
|
if value in (None, ""):
|
|
return "null"
|
|
if isinstance(value, bool):
|
|
return "true" if value else "false"
|
|
if isinstance(value, (int, float)):
|
|
return str(int(value)) if isinstance(value, bool) or float(value).is_integer() else str(value)
|
|
text = str(value)
|
|
if _YAML_SAFE_TEXT_RE.match(text) and not any(token in text for token in _SPECIAL_YAML_TOKENS):
|
|
return text
|
|
return json.dumps(text, ensure_ascii=False)
|
|
|
|
|
|
def build_playlist_dir_name(playlist_name: str | None, playlist_id: int) -> str:
|
|
safe_name = sanitize_path_component(str(playlist_name or ""), f"playlist-{int(playlist_id)}")
|
|
return f"{safe_name}_{int(playlist_id)}"
|
|
|
|
|
|
def build_playlist_meta_payload(playlist: dict[str, Any]) -> dict[str, Any]:
|
|
return {
|
|
"playlist_id": int(playlist.get("id") or 0),
|
|
"platform": str(playlist.get("platform") or ""),
|
|
"remote_playlist_id": str(playlist.get("remote_playlist_id") or ""),
|
|
"name": str(playlist.get("name") or ""),
|
|
}
|
|
|
|
|
|
def read_playlist_meta(playlist_dir: Path) -> dict[str, Any] | None:
|
|
meta_path = playlist_dir / PLAYLIST_META_FILENAME
|
|
if not meta_path.exists():
|
|
return None
|
|
try:
|
|
payload = json.loads(meta_path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return None
|
|
return payload if isinstance(payload, dict) else None
|
|
|
|
|
|
def locate_playlist_dir(playlists_root: Path, playlist: dict[str, Any]) -> Path | None:
|
|
if not playlists_root.exists():
|
|
return None
|
|
playlist_id = int(playlist.get("id") or 0)
|
|
if playlist_id <= 0:
|
|
return None
|
|
preferred = playlists_root / build_playlist_dir_name(str(playlist.get("name") or ""), playlist_id)
|
|
if preferred.exists():
|
|
return preferred
|
|
|
|
suffix = f"_{playlist_id}"
|
|
candidates: list[Path] = []
|
|
for child in playlists_root.iterdir():
|
|
if not child.is_dir():
|
|
continue
|
|
if child.name.endswith(suffix):
|
|
candidates.append(child)
|
|
continue
|
|
meta_payload = read_playlist_meta(child)
|
|
if int(meta_payload.get("playlist_id") or 0) == playlist_id if meta_payload else False:
|
|
candidates.append(child)
|
|
|
|
if not candidates:
|
|
return None
|
|
candidates.sort(key=lambda path: path.stat().st_mtime, reverse=True)
|
|
return candidates[0]
|
|
|
|
|
|
def ensure_playlist_dir(playlists_root: Path, playlist: dict[str, Any]) -> Path:
|
|
existing = locate_playlist_dir(playlists_root, playlist)
|
|
if existing is not None:
|
|
return existing
|
|
playlist_id = int(playlist.get("id") or 0)
|
|
target = playlists_root / build_playlist_dir_name(str(playlist.get("name") or ""), playlist_id)
|
|
target.mkdir(parents=True, exist_ok=True)
|
|
return target
|
|
|
|
|
|
def _guess_cover_extension(url: str | None, content_type: str | None) -> str:
|
|
parsed = urlparse(str(url or ""))
|
|
match = _COVER_EXT_RE.search(str(parsed.path or ""))
|
|
if match:
|
|
return "." + str(match.group(1)).lower()
|
|
normalized_type = str(content_type or "").split(";", 1)[0].strip().lower()
|
|
return _CONTENT_TYPE_TO_EXT.get(normalized_type, ".jpg")
|
|
|
|
|
|
def download_cover_file(
|
|
*,
|
|
cover_url: str,
|
|
covers_dir: Path,
|
|
file_stem: str,
|
|
timeout: tuple[int, int] = (10, 20),
|
|
) -> str | None:
|
|
normalized_url = str(cover_url or "").strip()
|
|
if not normalized_url:
|
|
return None
|
|
try:
|
|
response = requests.get(normalized_url, timeout=timeout)
|
|
response.raise_for_status()
|
|
content = bytes(response.content or b"")
|
|
except Exception:
|
|
LOGGER.warning("Failed to download cover image: %s", normalized_url, exc_info=True)
|
|
return None
|
|
|
|
if not content:
|
|
return None
|
|
if len(content) > MAX_COVER_BYTES:
|
|
LOGGER.warning(
|
|
"Skipped oversized cover image (> %d bytes): %s",
|
|
MAX_COVER_BYTES,
|
|
normalized_url,
|
|
)
|
|
return None
|
|
|
|
extension = _guess_cover_extension(normalized_url, response.headers.get("Content-Type"))
|
|
normalized_stem = sanitize_path_component(file_stem, "cover")
|
|
filename = f"{normalized_stem}{extension}"
|
|
destination_path = covers_dir / filename
|
|
destination_path.write_bytes(content)
|
|
return f"{PLAYLIST_COVERS_DIRNAME}/{filename}"
|
|
|
|
|
|
def serialize_playlist_yaml(playlist: dict[str, Any], items: list[dict[str, Any]]) -> str:
|
|
lines = [
|
|
"playlist_id: " + yaml_scalar(playlist.get("id")),
|
|
"playlist_name: " + yaml_scalar(playlist.get("name")),
|
|
"platform: " + yaml_scalar(playlist.get("platform")),
|
|
"play_count: " + yaml_scalar(playlist.get("play_count")),
|
|
"playlist_cover_url: " + yaml_scalar(playlist.get("cover_url")),
|
|
"playlist_cover_file: " + yaml_scalar(playlist.get("cover_file")),
|
|
]
|
|
if not items:
|
|
lines.append("songs: []")
|
|
return "\n".join(lines) + "\n"
|
|
|
|
lines.append("songs:")
|
|
for song in items:
|
|
uploaded_locations = song.get("uploaded_locations")
|
|
normalized_locations = (
|
|
list(uploaded_locations)
|
|
if isinstance(uploaded_locations, list)
|
|
else []
|
|
)
|
|
lines.append(" - local_song_id: " + yaml_scalar(song.get("song_id")))
|
|
lines.append(" platform_song_id: " + yaml_scalar(song.get("remote_song_id")))
|
|
lines.append(" platform: " + yaml_scalar(song.get("platform")))
|
|
lines.append(" name: " + yaml_scalar(song.get("name")))
|
|
lines.append(" singers: " + yaml_scalar(song.get("singers")))
|
|
lines.append(" album: " + yaml_scalar(song.get("album")))
|
|
lines.append(" ext: " + yaml_scalar(song.get("ext")))
|
|
lines.append(" file_size_bytes: " + yaml_scalar(song.get("file_size_bytes")))
|
|
lines.append(" cover_url: " + yaml_scalar(song.get("cover_url")))
|
|
lines.append(" cover_file: " + yaml_scalar(song.get("cover_file")))
|
|
lines.append(" local_file_path: " + yaml_scalar(song.get("local_file_path")))
|
|
if not normalized_locations:
|
|
lines.append(" uploaded_locations: []")
|
|
continue
|
|
lines.append(" uploaded_locations:")
|
|
for location in normalized_locations:
|
|
payload = dict(location or {})
|
|
lines.append(" - backend_name: " + yaml_scalar(payload.get("backend_name")))
|
|
lines.append(" backend_type: " + yaml_scalar(payload.get("backend_type")))
|
|
lines.append(" uploaded_url: " + yaml_scalar(payload.get("url")))
|
|
lines.append(" container_name: " + yaml_scalar(payload.get("container_name")))
|
|
lines.append(" locator: " + yaml_scalar(payload.get("locator")))
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
def write_playlist_artifacts(
|
|
*,
|
|
playlist: dict[str, Any],
|
|
songs: list[dict[str, Any]],
|
|
playlists_root: Path,
|
|
) -> Path:
|
|
playlists_root.mkdir(parents=True, exist_ok=True)
|
|
playlist_dir = ensure_playlist_dir(playlists_root, playlist)
|
|
covers_dir = playlist_dir / PLAYLIST_COVERS_DIRNAME
|
|
covers_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
cover_file_cache: dict[str, str] = {}
|
|
playlist_cover_url = str(playlist.get("cover_url") or "").strip()
|
|
playlist_cover_file = None
|
|
if playlist_cover_url:
|
|
playlist_cover_file = download_cover_file(
|
|
cover_url=playlist_cover_url,
|
|
covers_dir=covers_dir,
|
|
file_stem="playlist-cover",
|
|
)
|
|
if playlist_cover_file:
|
|
cover_file_cache[playlist_cover_url] = playlist_cover_file
|
|
|
|
normalized_songs: list[dict[str, Any]] = []
|
|
for index, song in enumerate(songs, start=1):
|
|
payload = dict(song)
|
|
cover_url = str(payload.get("cover_url") or "").strip()
|
|
cover_file = None
|
|
if cover_url:
|
|
cover_file = cover_file_cache.get(cover_url)
|
|
if not cover_file:
|
|
remote_song_id = sanitize_path_component(
|
|
str(payload.get("remote_song_id") or payload.get("song_id") or index),
|
|
str(index),
|
|
)
|
|
cover_file = download_cover_file(
|
|
cover_url=cover_url,
|
|
covers_dir=covers_dir,
|
|
file_stem=f"song-{index}-{remote_song_id}",
|
|
)
|
|
if cover_file:
|
|
cover_file_cache[cover_url] = cover_file
|
|
payload["cover_url"] = cover_url or None
|
|
payload["cover_file"] = cover_file
|
|
normalized_songs.append(payload)
|
|
|
|
playlist_payload = {
|
|
"id": int(playlist.get("id") or 0),
|
|
"name": str(playlist.get("name") or ""),
|
|
"platform": str(playlist.get("platform") or ""),
|
|
"play_count": playlist.get("play_count"),
|
|
"cover_url": playlist_cover_url or None,
|
|
"cover_file": playlist_cover_file,
|
|
}
|
|
(playlist_dir / PLAYLIST_YAML_FILENAME).write_text(
|
|
serialize_playlist_yaml(playlist_payload, normalized_songs),
|
|
encoding="utf-8",
|
|
)
|
|
(playlist_dir / PLAYLIST_META_FILENAME).write_text(
|
|
json.dumps(build_playlist_meta_payload(playlist), ensure_ascii=False, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
return playlist_dir
|