from __future__ import annotations import json import shutil import signal import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed from contextlib import contextmanager from dataclasses import dataclass from pathlib import Path from .models import deserialize_song_info, normalize_source_name from .repository import CatalogRepository from .resolver import DEFAULT_DOWNLOAD_SOURCES, MultiSourceSongResolver, SOURCE_CLIENT_NAMES, normalize_audio_ext from .resolver_stats import ResolverStatsRepository, default_resolver_stats_db_path from .runtime import build_download_relative_dir from musicdl.modules.utils.lyric import LyricSearchClient from musicdl.modules.utils.misc import shortenpathsinsonginfos from musicdl.modules.utils.songinfoutils import SongInfoUtils LOSSLESS_EXTENSIONS = {"flac", "wav", "alac", "ape", "wv", "tta", "dsf", "dff"} DEFAULT_DOWNLOAD_WORKERS = 10 DEFAULT_LYRIC_SEARCH_TIMEOUT_SECONDS = 20 class _LyricSearchTimeout(BaseException): pass @dataclass class ResolvedDownloadPayload: row: dict[str, object] display_text: str default_root: Path target_root: Path backend_id: int expected_bytes: int | None resolved_song_info: object def _progress_percent(completed: int | None, total: int | None) -> int: normalized_total = max(int(total or 0), 0) normalized_completed = max(int(completed or 0), 0) if normalized_total <= 0: return 0 if normalized_completed >= normalized_total: return 100 return int((normalized_completed * 100) / normalized_total) def _format_progress_text(downloaded_bytes: int | None, total_bytes: int | None) -> str: downloaded_value = max(int(downloaded_bytes or 0), 0) total_value = max(int(total_bytes or 0), downloaded_value) return f"{downloaded_value / 1024 / 1024:.2f}MB/{total_value / 1024 / 1024:.2f}MB" class DownloadPlanner: def __init__(self, repository: CatalogRepository): self.repository = repository def build_download_queue( self, sources: list[str] | None = None, limit: int | None = None, playlist_ids: list[int] | None = None, ) -> list[dict]: rows = self.repository.list_pending_download_songs( sources=sources, limit=limit, playlist_ids=playlist_ids, ) queue = [] for row in rows: if self.repository.song_has_active_local_file(int(row["id"])): continue item = dict(row) item["song_id"] = int(row["id"]) queue.append(item) return queue class CatalogDownloader: def __init__( self, repository: CatalogRepository, work_dir: str = "musicdl_outputs/catalogsync", worker_count: int = DEFAULT_DOWNLOAD_WORKERS, ): self.repository = repository self.work_dir = work_dir self.worker_count = max(1, worker_count) self._clients: dict[str, object] = {} self._client_lock = threading.Lock() self._space_lock = threading.Lock() self._current_library_root: Path | None = None self._lyric_search_timeout_seconds = DEFAULT_LYRIC_SEARCH_TIMEOUT_SECONDS resolver_stats_repo = ResolverStatsRepository(default_resolver_stats_db_path(self.repository.db_path)) self._resolver = MultiSourceSongResolver( client_factory=lambda platform: self.get_client(platform), request_overrides_factory=lambda timeout: self._request_overrides(timeout), resolver_stats_repo=resolver_stats_repo, ) @contextmanager def _lyric_search_timeout_guard(self): timeout_seconds = float(self._lyric_search_timeout_seconds or 0) if timeout_seconds <= 0: yield return if threading.current_thread() is not threading.main_thread(): yield return if not hasattr(signal, "SIGALRM") or not hasattr(signal, "setitimer"): yield return def _handle_timeout(_signum, _frame): raise _LyricSearchTimeout() previous_handler = signal.getsignal(signal.SIGALRM) signal.signal(signal.SIGALRM, _handle_timeout) signal.setitimer(signal.ITIMER_REAL, timeout_seconds) try: yield finally: signal.setitimer(signal.ITIMER_REAL, 0) signal.signal(signal.SIGALRM, previous_handler) @staticmethod def _request_overrides(timeout: tuple[int, int]) -> dict: return {"timeout": timeout} def get_client(self, platform: str): platform = normalize_source_name(platform) client_key = f"{threading.get_ident()}:{platform}" if client_key not in self._clients: with self._client_lock: if client_key not in self._clients: from musicdl.modules import BuildMusicClient self._clients[client_key] = BuildMusicClient( { "type": SOURCE_CLIENT_NAMES[platform], "disable_print": True, "maintain_session": False, "work_dir": self.work_dir, "search_size_per_source": 5, "search_size_per_page": 5, "strict_limit_search_size_per_page": True, } ) return self._clients[client_key] def ensure_space(self, root_path: str | Path, required_bytes: int | None) -> Path: with self._space_lock: root = self._current_library_root or Path(root_path).resolve() root.mkdir(parents=True, exist_ok=True) if required_bytes is None or required_bytes <= 0: self._current_library_root = root return root while shutil.disk_usage(root).free < required_bytes: new_root = input("磁盘空间不足,请输入新的下载目录继续: ").strip() if not new_root: raise RuntimeError("Disk space is insufficient and no new directory was provided") root = Path(new_root).resolve() root.mkdir(parents=True, exist_ok=True) self._current_library_root = root return root def _initialize_library_root(self, root_path: str | Path) -> Path: normalized_root = Path(root_path).resolve() with self._space_lock: if self._current_library_root is None: self._current_library_root = normalized_root return self._current_library_root @staticmethod def _normalize_singers(value: object) -> str | None: if not isinstance(value, str): return None text = value.strip() if not text or text.upper() == "NULL": return None return text @staticmethod def _detect_download_platform(song_info: object, fallback_platform: str) -> str: detected_platform = normalize_source_name(getattr(song_info, "source", None)) if detected_platform == "unknown": return normalize_source_name(fallback_platform) return detected_platform def resolve_song_info_for_download( self, row: dict, song_info: object, download_sources: list[str] | None = None, progress_callback=None, ) -> object: return self._resolver.resolve_song_info( row=row, snapshot_song_info=song_info, download_sources=download_sources or DEFAULT_DOWNLOAD_SOURCES, progress_callback=progress_callback, ) @staticmethod def _detect_quality_label(song_info: object, actual_ext: str | None, fallback: str | None = None) -> str | None: raw_data = getattr(song_info, "raw_data", None) if isinstance(raw_data, dict): quality = raw_data.get("quality") if quality: return str(quality) normalized_ext = normalize_audio_ext(actual_ext or getattr(song_info, "ext", None)) if normalized_ext in LOSSLESS_EXTENSIONS: return "lossless" if normalized_ext: return "standard" return fallback @staticmethod def _build_display_text(row: dict) -> str: display_name = str(row.get("name") or row.get("id") or "") singers = str(row.get("singers") or "").strip() return f"{display_name} / {singers}".strip(" /") @staticmethod def _normalize_lyric_text(value: object) -> str | None: if not isinstance(value, str): return None text = value.replace("\r\n", "\n").strip() if not text or text.upper() == "NULL": return None return text def _resolve_lyrics_text(self, *, song_info: object | None, row: dict[str, object] | None = None) -> str | None: lyric_text = self._normalize_lyric_text(getattr(song_info, "lyric", None)) if lyric_text: return lyric_text row = row or {} title = self._normalize_lyric_text(getattr(song_info, "song_name", None)) or self._normalize_lyric_text(row.get("name")) singers = self._normalize_singers(getattr(song_info, "singers", None)) or self._normalize_singers(row.get("singers")) if not title or not singers: return None try: with self._lyric_search_timeout_guard(): _lyric_result, lyric = LyricSearchClient.search(track_name=title, artist_name=singers) except _LyricSearchTimeout: return None return self._normalize_lyric_text(lyric) def _sync_lyrics_for_saved_song( self, *, row: dict[str, object], song_info: object | None, saved_path: Path, overwrite_lyrics: bool, worker_callback=None, display_text: str | None = None, ) -> str: try: lyric_text = self._resolve_lyrics_text(song_info=song_info, row=row) if not lyric_text: self._emit_worker_progress( row, worker_callback, display_text=display_text, last_progress_text="lyrics unavailable", ) return "skipped" if hasattr(song_info, "lyric"): song_info.lyric = lyric_text lrc_path = saved_path.with_suffix(".lrc") if lrc_path.exists() and not overwrite_lyrics: self._emit_worker_progress( row, worker_callback, display_text=display_text, last_progress_text="lyrics exists, skipped", ) return "skipped" saved = SongInfoUtils.savelrctofile(saved_path, lyric_text, overwrite=overwrite_lyrics) if saved: self._emit_worker_progress( row, worker_callback, display_text=display_text, last_progress_text="lyrics saved", ) return "saved" self._emit_worker_progress( row, worker_callback, display_text=display_text, last_progress_text="lyrics skipped", ) return "skipped" except Exception as exc: self._emit_worker_progress( row, worker_callback, display_text=display_text, last_progress_text=f"lyrics failed: {type(exc).__name__}: {exc}", ) return "failed" @staticmethod def _emit_worker_progress( row: dict, progress_callback, *, display_text: str | None, downloaded_bytes: int | None = None, total_bytes: int | None = None, speed_bytes_per_sec: int | None = None, progress_percent: int | None = None, last_progress_text: str | None = None, ) -> None: if progress_callback is None: return state: dict[str, object] = { "current_song_id": int(row["id"]) if row.get("id") is not None else None, "current_playlist_id": row.get("playlist_id"), "current_display_text": display_text, } if downloaded_bytes is not None: state["downloaded_bytes"] = int(downloaded_bytes) if total_bytes is not None: state["total_bytes"] = int(total_bytes) if speed_bytes_per_sec is not None: state["speed_bytes_per_sec"] = int(speed_bytes_per_sec) if progress_percent is not None: state["progress_percent"] = int(progress_percent) if last_progress_text is not None: state["last_progress_text"] = str(last_progress_text) progress_callback(**state) def _monitor_save_path( self, *, save_path: Path, expected_bytes: int | None, progress_callback, stop_event: threading.Event, row: dict, display_text: str | None, interval_seconds: float = 0.02, ) -> None: last_size = 0 last_change_at = time.monotonic() while not stop_event.wait(interval_seconds): if not save_path.exists(): continue try: current_size = int(save_path.stat().st_size) except OSError: continue if current_size <= last_size: continue now = time.monotonic() delta_bytes = current_size - last_size delta_seconds = max(now - last_change_at, 1e-6) total_bytes = int(expected_bytes or current_size) self._emit_worker_progress( row, progress_callback, display_text=display_text, downloaded_bytes=current_size, total_bytes=total_bytes, speed_bytes_per_sec=int(delta_bytes / delta_seconds), progress_percent=_progress_percent(current_size, total_bytes), last_progress_text=_format_progress_text(current_size, total_bytes), ) last_size = current_size last_change_at = now def resolve_song_row( self, row, library_root: str | Path, download_sources: list[str] | None = None, worker_callback=None, ) -> ResolvedDownloadPayload | None: """Resolve a snapshot into a downloadable payload and choose the active local target.""" row_dict = dict(row) default_root = self._initialize_library_root(library_root) self.repository.ensure_local_backend(default_root, name="default-local", is_default=True) display_text = self._build_display_text(row_dict) self._emit_worker_progress( row_dict, worker_callback, display_text=display_text, ) metadata = json.loads(row_dict["metadata_json"]) if row_dict.get("metadata_json") else {} song_info = deserialize_song_info(metadata.get("snapshot")) if song_info is None: return None resolve_progress_callback = None if worker_callback is not None: resolve_progress_callback = lambda message: self._emit_worker_progress( row_dict, worker_callback, display_text=display_text, last_progress_text=message, ) resolved_song_info = self.resolve_song_info_for_download( row=row_dict, song_info=song_info, download_sources=download_sources, progress_callback=resolve_progress_callback, ) if resolved_song_info is None: return None target_root = self.ensure_space( default_root, getattr(resolved_song_info, "file_size_bytes", None) or row_dict.get("file_size_bytes"), ) is_default_root = target_root.resolve() == default_root backend_id = self.repository.ensure_local_backend( target_root, name="default-local" if is_default_root else None, is_default=is_default_root, ) expected_bytes = int( getattr(resolved_song_info, "file_size_bytes", None) or row_dict.get("file_size_bytes") or 0 ) or None return ResolvedDownloadPayload( row=row_dict, display_text=display_text, default_root=default_root, target_root=target_root, backend_id=backend_id, expected_bytes=expected_bytes, resolved_song_info=resolved_song_info, ) def download_resolved_song( self, resolved_payload: ResolvedDownloadPayload, worker_callback=None, lyrics_enabled: bool = True, overwrite_lyrics: bool = False, ) -> bool: row = resolved_payload.row song_info = resolved_payload.resolved_song_info download_platform = self._detect_download_platform(song_info, str(row["platform"])) client = self.get_client(download_platform) singers = self._normalize_singers(getattr(song_info, "singers", None)) or self._normalize_singers( row.get("singers") ) relative_dir = build_download_relative_dir(platform=download_platform, singers=singers) target_dir = resolved_payload.target_root / relative_dir target_dir.mkdir(parents=True, exist_ok=True) song_info.work_dir = str(target_dir) if hasattr(song_info, "_save_path"): song_info._save_path = None save_path: Path | None = None monitor_stop: threading.Event | None = None monitor_thread: threading.Thread | None = None self._emit_worker_progress( row, worker_callback, display_text=resolved_payload.display_text, last_progress_text=f"starting download via {download_platform}", ) if worker_callback is not None and hasattr(song_info, "save_path"): shortenpathsinsonginfos([song_info]) save_path = Path(song_info.save_path) monitor_stop = threading.Event() monitor_thread = threading.Thread( target=self._monitor_save_path, kwargs={ "save_path": save_path, "expected_bytes": resolved_payload.expected_bytes, "progress_callback": worker_callback, "stop_event": monitor_stop, "row": row, "display_text": resolved_payload.display_text, }, daemon=True, name=f"download-monitor-{row.get('id')}", ) monitor_thread.start() try: downloaded = client.download( [song_info], num_threadings=1, request_overrides=self._request_overrides((10, 60)), auto_supplement_song=False, ) except TypeError: downloaded = client.download( [song_info], num_threadings=1, auto_supplement_song=False, ) finally: if monitor_stop is not None: monitor_stop.set() if monitor_thread is not None: monitor_thread.join(timeout=1.0) if save_path is not None and save_path.exists(): try: final_size = int(save_path.stat().st_size) except OSError: final_size = 0 if final_size > 0: total_bytes = int(resolved_payload.expected_bytes or final_size) self._emit_worker_progress( row, worker_callback, display_text=resolved_payload.display_text, downloaded_bytes=final_size, total_bytes=total_bytes, progress_percent=_progress_percent(final_size, total_bytes), last_progress_text=_format_progress_text(final_size, total_bytes), ) if not downloaded: return False saved_song = downloaded[0] saved_path = Path(saved_song.save_path) relative_path = saved_path.relative_to(resolved_payload.target_root).as_posix() actual_size = saved_path.stat().st_size if saved_path.exists() else row.get("file_size_bytes") actual_ext = saved_path.suffix.lstrip(".") or row.get("ext") self.repository.record_local_file( song_id=int(row["id"]), backend_id=resolved_payload.backend_id, relative_path=relative_path, file_size_bytes=actual_size, ext=actual_ext, quality_label=self._detect_quality_label(song_info, actual_ext, fallback=row.get("quality_label")), ) if lyrics_enabled: lyrics_song_info = saved_song if self._normalize_lyric_text(getattr(saved_song, "lyric", None)) else song_info self._sync_lyrics_for_saved_song( row=row, song_info=lyrics_song_info, saved_path=saved_path, overwrite_lyrics=overwrite_lyrics, worker_callback=worker_callback, display_text=resolved_payload.display_text, ) return True def download_song_row( self, row, library_root: str | Path, download_sources: list[str] | None = None, worker_callback=None, lyrics_enabled: bool = True, overwrite_lyrics: bool = False, ) -> bool: resolved_payload = self.resolve_song_row( row=row, library_root=library_root, download_sources=download_sources, worker_callback=worker_callback, ) if resolved_payload is None: return False return self.download_resolved_song( resolved_payload=resolved_payload, worker_callback=worker_callback, lyrics_enabled=lyrics_enabled, overwrite_lyrics=overwrite_lyrics, ) def download_pending( self, library_root: str | Path, sources: list[str] | None = None, limit: int | None = None, playlist_ids: list[int] | None = None, download_sources: list[str] | None = None, lyrics_enabled: bool = True, overwrite_lyrics: bool = False, ) -> int: planner = DownloadPlanner(self.repository) queue = planner.build_download_queue(sources=sources, limit=limit, playlist_ids=playlist_ids) default_root = self._initialize_library_root(library_root) self.repository.ensure_local_backend(default_root, name="default-local", is_default=True) downloaded_count = 0 with ThreadPoolExecutor(max_workers=self.worker_count) as executor: futures = [ executor.submit( self.download_song_row, row=row, library_root=default_root, download_sources=download_sources, lyrics_enabled=lyrics_enabled, overwrite_lyrics=overwrite_lyrics, ) for row in queue ] for future in as_completed(futures): result = future.result() if result: downloaded_count += 1 return downloaded_count def sync_local_lyrics( self, sources: list[str] | None = None, playlist_ids: list[int] | None = None, limit: int | None = None, overwrite_lyrics: bool = False, progress_callback=None, ) -> dict[str, int]: rows = self.repository.list_local_songs_for_lyrics( sources=sources, playlist_ids=playlist_ids, limit=limit, ) summary = {"total": len(rows), "processed": 0, "saved": 0, "skipped": 0, "failed": 0} def emit_progress(*, row_dict: dict[str, object] | None = None, display_text: str | None = None, last_status: str | None = None, last_progress_text: str | None = None) -> None: if progress_callback is None: return state: dict[str, object] = { "total": summary["total"], "processed": summary["processed"], "saved": summary["saved"], "skipped": summary["skipped"], "failed": summary["failed"], "progress_percent": _progress_percent(summary["processed"], summary["total"]), } if row_dict is not None: state["current_song_id"] = int(row_dict["id"]) if row_dict.get("id") is not None else None state["current_playlist_id"] = row_dict.get("playlist_id") if display_text is not None: state["current_display_text"] = display_text if last_status is not None: state["last_status"] = last_status if last_progress_text is not None: state["last_progress_text"] = last_progress_text progress_callback(**state) def process_row(row) -> tuple[dict[str, object], str, str, str]: row_dict = dict(row) display_text = self._build_display_text(row_dict) try: local_file_path = row_dict.get("local_file_path") if not local_file_path: return row_dict, display_text, "failed", "missing local file path" saved_path = Path(str(local_file_path)) if not saved_path.exists(): return row_dict, display_text, "failed", "local file missing" metadata = json.loads(row_dict["metadata_json"]) if row_dict.get("metadata_json") else {} song_info = deserialize_song_info(metadata.get("snapshot")) if isinstance(metadata, dict) else None status = self._sync_lyrics_for_saved_song( row=row_dict, song_info=song_info, saved_path=saved_path, overwrite_lyrics=overwrite_lyrics, display_text=display_text, ) normalized_status = status if status in {"saved", "skipped", "failed"} else "failed" status_text = { "saved": "lyrics saved", "skipped": "lyrics skipped", "failed": "lyrics failed", }[normalized_status] return row_dict, display_text, normalized_status, status_text except Exception as exc: return row_dict, display_text, "failed", f"lyrics failed: {type(exc).__name__}: {exc}" emit_progress() with ThreadPoolExecutor(max_workers=self.worker_count) as executor: futures = [executor.submit(process_row, row) for row in rows] for future in as_completed(futures): row_dict, display_text, status, status_text = future.result() summary["processed"] += 1 summary[status] += 1 emit_progress( row_dict=row_dict, display_text=display_text, last_status=status, last_progress_text=status_text, ) return summary