from __future__ import annotations import json import logging import re from pathlib import Path from typing import Any from urllib.parse import urlparse import requests from .runtime import sanitize_path_component LOGGER = logging.getLogger(__name__) PLAYLIST_META_FILENAME = ".playlist_meta.json" PLAYLIST_YAML_FILENAME = "playlist.yaml" PLAYLIST_COVERS_DIRNAME = "covers" MAX_COVER_BYTES = 10 * 1024 * 1024 _YAML_SAFE_TEXT_RE = re.compile(r"^[A-Za-z0-9_./%+\- :]+$") _SPECIAL_YAML_TOKENS = (": ", "#", "[", "]", "{", "}", ",", "&", "*", "!", "|", ">", "'", '"', "@", "`") _COVER_EXT_RE = re.compile(r"\.(jpg|jpeg|png|webp|gif|bmp)$", re.IGNORECASE) _CONTENT_TYPE_TO_EXT = { "image/jpeg": ".jpg", "image/jpg": ".jpg", "image/png": ".png", "image/webp": ".webp", "image/gif": ".gif", "image/bmp": ".bmp", } def yaml_scalar(value: Any) -> str: if value in (None, ""): return "null" if isinstance(value, bool): return "true" if value else "false" if isinstance(value, (int, float)): return str(int(value)) if isinstance(value, bool) or float(value).is_integer() else str(value) text = str(value) if _YAML_SAFE_TEXT_RE.match(text) and not any(token in text for token in _SPECIAL_YAML_TOKENS): return text return json.dumps(text, ensure_ascii=False) def build_playlist_dir_name(playlist_name: str | None, playlist_id: int) -> str: safe_name = sanitize_path_component(str(playlist_name or ""), f"playlist-{int(playlist_id)}") return f"{safe_name}_{int(playlist_id)}" def build_playlist_meta_payload(playlist: dict[str, Any]) -> dict[str, Any]: return { "playlist_id": int(playlist.get("id") or 0), "platform": str(playlist.get("platform") or ""), "remote_playlist_id": str(playlist.get("remote_playlist_id") or ""), "name": str(playlist.get("name") or ""), } def read_playlist_meta(playlist_dir: Path) -> dict[str, Any] | None: meta_path = playlist_dir / PLAYLIST_META_FILENAME if not meta_path.exists(): return None try: payload = json.loads(meta_path.read_text(encoding="utf-8")) except Exception: return None return payload if isinstance(payload, dict) else None def locate_playlist_dir(playlists_root: Path, playlist: dict[str, Any]) -> Path | None: if not playlists_root.exists(): return None playlist_id = int(playlist.get("id") or 0) if playlist_id <= 0: return None preferred = playlists_root / build_playlist_dir_name(str(playlist.get("name") or ""), playlist_id) if preferred.exists(): return preferred suffix = f"_{playlist_id}" candidates: list[Path] = [] for child in playlists_root.iterdir(): if not child.is_dir(): continue if child.name.endswith(suffix): candidates.append(child) continue meta_payload = read_playlist_meta(child) if int(meta_payload.get("playlist_id") or 0) == playlist_id if meta_payload else False: candidates.append(child) if not candidates: return None candidates.sort(key=lambda path: path.stat().st_mtime, reverse=True) return candidates[0] def ensure_playlist_dir(playlists_root: Path, playlist: dict[str, Any]) -> Path: existing = locate_playlist_dir(playlists_root, playlist) if existing is not None: return existing playlist_id = int(playlist.get("id") or 0) target = playlists_root / build_playlist_dir_name(str(playlist.get("name") or ""), playlist_id) target.mkdir(parents=True, exist_ok=True) return target def _guess_cover_extension(url: str | None, content_type: str | None) -> str: parsed = urlparse(str(url or "")) match = _COVER_EXT_RE.search(str(parsed.path or "")) if match: return "." + str(match.group(1)).lower() normalized_type = str(content_type or "").split(";", 1)[0].strip().lower() return _CONTENT_TYPE_TO_EXT.get(normalized_type, ".jpg") def download_cover_file( *, cover_url: str, covers_dir: Path, file_stem: str, timeout: tuple[int, int] = (10, 20), ) -> str | None: normalized_url = str(cover_url or "").strip() if not normalized_url: return None try: response = requests.get(normalized_url, timeout=timeout) response.raise_for_status() content = bytes(response.content or b"") except Exception: LOGGER.warning("Failed to download cover image: %s", normalized_url, exc_info=True) return None if not content: return None if len(content) > MAX_COVER_BYTES: LOGGER.warning( "Skipped oversized cover image (> %d bytes): %s", MAX_COVER_BYTES, normalized_url, ) return None extension = _guess_cover_extension(normalized_url, response.headers.get("Content-Type")) normalized_stem = sanitize_path_component(file_stem, "cover") filename = f"{normalized_stem}{extension}" destination_path = covers_dir / filename destination_path.write_bytes(content) return f"{PLAYLIST_COVERS_DIRNAME}/{filename}" def serialize_playlist_yaml(playlist: dict[str, Any], items: list[dict[str, Any]]) -> str: lines = [ "playlist_id: " + yaml_scalar(playlist.get("id")), "playlist_name: " + yaml_scalar(playlist.get("name")), "platform: " + yaml_scalar(playlist.get("platform")), "play_count: " + yaml_scalar(playlist.get("play_count")), "playlist_cover_url: " + yaml_scalar(playlist.get("cover_url")), "playlist_cover_file: " + yaml_scalar(playlist.get("cover_file")), ] if not items: lines.append("songs: []") return "\n".join(lines) + "\n" lines.append("songs:") for song in items: uploaded_locations = song.get("uploaded_locations") normalized_locations = ( list(uploaded_locations) if isinstance(uploaded_locations, list) else [] ) lines.append(" - local_song_id: " + yaml_scalar(song.get("song_id"))) lines.append(" platform_song_id: " + yaml_scalar(song.get("remote_song_id"))) lines.append(" platform: " + yaml_scalar(song.get("platform"))) lines.append(" name: " + yaml_scalar(song.get("name"))) lines.append(" singers: " + yaml_scalar(song.get("singers"))) lines.append(" album: " + yaml_scalar(song.get("album"))) lines.append(" ext: " + yaml_scalar(song.get("ext"))) lines.append(" file_size_bytes: " + yaml_scalar(song.get("file_size_bytes"))) lines.append(" cover_url: " + yaml_scalar(song.get("cover_url"))) lines.append(" cover_file: " + yaml_scalar(song.get("cover_file"))) lines.append(" local_file_path: " + yaml_scalar(song.get("local_file_path"))) if not normalized_locations: lines.append(" uploaded_locations: []") continue lines.append(" uploaded_locations:") for location in normalized_locations: payload = dict(location or {}) lines.append(" - backend_name: " + yaml_scalar(payload.get("backend_name"))) lines.append(" backend_type: " + yaml_scalar(payload.get("backend_type"))) lines.append(" uploaded_url: " + yaml_scalar(payload.get("url"))) lines.append(" container_name: " + yaml_scalar(payload.get("container_name"))) lines.append(" locator: " + yaml_scalar(payload.get("locator"))) return "\n".join(lines) + "\n" def write_playlist_artifacts( *, playlist: dict[str, Any], songs: list[dict[str, Any]], playlists_root: Path, ) -> Path: playlists_root.mkdir(parents=True, exist_ok=True) playlist_dir = ensure_playlist_dir(playlists_root, playlist) covers_dir = playlist_dir / PLAYLIST_COVERS_DIRNAME covers_dir.mkdir(parents=True, exist_ok=True) cover_file_cache: dict[str, str] = {} playlist_cover_url = str(playlist.get("cover_url") or "").strip() playlist_cover_file = None if playlist_cover_url: playlist_cover_file = download_cover_file( cover_url=playlist_cover_url, covers_dir=covers_dir, file_stem="playlist-cover", ) if playlist_cover_file: cover_file_cache[playlist_cover_url] = playlist_cover_file normalized_songs: list[dict[str, Any]] = [] for index, song in enumerate(songs, start=1): payload = dict(song) cover_url = str(payload.get("cover_url") or "").strip() cover_file = None if cover_url: cover_file = cover_file_cache.get(cover_url) if not cover_file: remote_song_id = sanitize_path_component( str(payload.get("remote_song_id") or payload.get("song_id") or index), str(index), ) cover_file = download_cover_file( cover_url=cover_url, covers_dir=covers_dir, file_stem=f"song-{index}-{remote_song_id}", ) if cover_file: cover_file_cache[cover_url] = cover_file payload["cover_url"] = cover_url or None payload["cover_file"] = cover_file normalized_songs.append(payload) playlist_payload = { "id": int(playlist.get("id") or 0), "name": str(playlist.get("name") or ""), "platform": str(playlist.get("platform") or ""), "play_count": playlist.get("play_count"), "cover_url": playlist_cover_url or None, "cover_file": playlist_cover_file, } (playlist_dir / PLAYLIST_YAML_FILENAME).write_text( serialize_playlist_yaml(playlist_payload, normalized_songs), encoding="utf-8", ) (playlist_dir / PLAYLIST_META_FILENAME).write_text( json.dumps(build_playlist_meta_payload(playlist), ensure_ascii=False, indent=2), encoding="utf-8", ) return playlist_dir