685 lines
27 KiB
Python
685 lines
27 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import shutil
|
|
import signal
|
|
import threading
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from contextlib import contextmanager
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
from .models import deserialize_song_info, normalize_source_name
|
|
from .repository import CatalogRepository
|
|
from .resolver import DEFAULT_DOWNLOAD_SOURCES, MultiSourceSongResolver, SOURCE_CLIENT_NAMES, normalize_audio_ext
|
|
from .resolver_stats import ResolverStatsRepository, default_resolver_stats_db_path
|
|
from .runtime import build_download_relative_dir
|
|
from musicdl.modules.utils.lyric import LyricSearchClient
|
|
from musicdl.modules.utils.misc import shortenpathsinsonginfos
|
|
from musicdl.modules.utils.songinfoutils import SongInfoUtils
|
|
|
|
LOSSLESS_EXTENSIONS = {"flac", "wav", "alac", "ape", "wv", "tta", "dsf", "dff"}
|
|
DEFAULT_DOWNLOAD_WORKERS = 10
|
|
DEFAULT_LYRIC_SEARCH_TIMEOUT_SECONDS = 20
|
|
|
|
|
|
class _LyricSearchTimeout(BaseException):
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class ResolvedDownloadPayload:
|
|
row: dict[str, object]
|
|
display_text: str
|
|
default_root: Path
|
|
target_root: Path
|
|
backend_id: int
|
|
expected_bytes: int | None
|
|
resolved_song_info: object
|
|
|
|
|
|
def _progress_percent(completed: int | None, total: int | None) -> int:
|
|
normalized_total = max(int(total or 0), 0)
|
|
normalized_completed = max(int(completed or 0), 0)
|
|
if normalized_total <= 0:
|
|
return 0
|
|
if normalized_completed >= normalized_total:
|
|
return 100
|
|
return int((normalized_completed * 100) / normalized_total)
|
|
|
|
|
|
def _format_progress_text(downloaded_bytes: int | None, total_bytes: int | None) -> str:
|
|
downloaded_value = max(int(downloaded_bytes or 0), 0)
|
|
total_value = max(int(total_bytes or 0), downloaded_value)
|
|
return f"{downloaded_value / 1024 / 1024:.2f}MB/{total_value / 1024 / 1024:.2f}MB"
|
|
|
|
|
|
class DownloadPlanner:
|
|
def __init__(self, repository: CatalogRepository):
|
|
self.repository = repository
|
|
|
|
def build_download_queue(
|
|
self,
|
|
sources: list[str] | None = None,
|
|
limit: int | None = None,
|
|
playlist_ids: list[int] | None = None,
|
|
) -> list[dict]:
|
|
rows = self.repository.list_pending_download_songs(
|
|
sources=sources,
|
|
limit=limit,
|
|
playlist_ids=playlist_ids,
|
|
)
|
|
queue = []
|
|
for row in rows:
|
|
if self.repository.song_has_active_local_file(int(row["id"])):
|
|
continue
|
|
item = dict(row)
|
|
item["song_id"] = int(row["id"])
|
|
queue.append(item)
|
|
return queue
|
|
|
|
|
|
class CatalogDownloader:
|
|
def __init__(
|
|
self,
|
|
repository: CatalogRepository,
|
|
work_dir: str = "musicdl_outputs/catalogsync",
|
|
worker_count: int = DEFAULT_DOWNLOAD_WORKERS,
|
|
):
|
|
self.repository = repository
|
|
self.work_dir = work_dir
|
|
self.worker_count = max(1, worker_count)
|
|
self._clients: dict[str, object] = {}
|
|
self._client_lock = threading.Lock()
|
|
self._space_lock = threading.Lock()
|
|
self._current_library_root: Path | None = None
|
|
self._lyric_search_timeout_seconds = DEFAULT_LYRIC_SEARCH_TIMEOUT_SECONDS
|
|
resolver_stats_repo = ResolverStatsRepository(default_resolver_stats_db_path(self.repository.db_path))
|
|
self._resolver = MultiSourceSongResolver(
|
|
client_factory=lambda platform: self.get_client(platform),
|
|
request_overrides_factory=lambda timeout: self._request_overrides(timeout),
|
|
resolver_stats_repo=resolver_stats_repo,
|
|
)
|
|
|
|
@contextmanager
|
|
def _lyric_search_timeout_guard(self):
|
|
timeout_seconds = float(self._lyric_search_timeout_seconds or 0)
|
|
if timeout_seconds <= 0:
|
|
yield
|
|
return
|
|
if threading.current_thread() is not threading.main_thread():
|
|
yield
|
|
return
|
|
if not hasattr(signal, "SIGALRM") or not hasattr(signal, "setitimer"):
|
|
yield
|
|
return
|
|
|
|
def _handle_timeout(_signum, _frame):
|
|
raise _LyricSearchTimeout()
|
|
|
|
previous_handler = signal.getsignal(signal.SIGALRM)
|
|
signal.signal(signal.SIGALRM, _handle_timeout)
|
|
signal.setitimer(signal.ITIMER_REAL, timeout_seconds)
|
|
try:
|
|
yield
|
|
finally:
|
|
signal.setitimer(signal.ITIMER_REAL, 0)
|
|
signal.signal(signal.SIGALRM, previous_handler)
|
|
|
|
@staticmethod
|
|
def _request_overrides(timeout: tuple[int, int]) -> dict:
|
|
return {"timeout": timeout}
|
|
|
|
def get_client(self, platform: str):
|
|
platform = normalize_source_name(platform)
|
|
client_key = f"{threading.get_ident()}:{platform}"
|
|
if client_key not in self._clients:
|
|
with self._client_lock:
|
|
if client_key not in self._clients:
|
|
from musicdl.modules import BuildMusicClient
|
|
|
|
self._clients[client_key] = BuildMusicClient(
|
|
{
|
|
"type": SOURCE_CLIENT_NAMES[platform],
|
|
"disable_print": True,
|
|
"maintain_session": False,
|
|
"work_dir": self.work_dir,
|
|
"search_size_per_source": 5,
|
|
"search_size_per_page": 5,
|
|
"strict_limit_search_size_per_page": True,
|
|
}
|
|
)
|
|
return self._clients[client_key]
|
|
|
|
def ensure_space(self, root_path: str | Path, required_bytes: int | None) -> Path:
|
|
with self._space_lock:
|
|
root = self._current_library_root or Path(root_path).resolve()
|
|
root.mkdir(parents=True, exist_ok=True)
|
|
if required_bytes is None or required_bytes <= 0:
|
|
self._current_library_root = root
|
|
return root
|
|
while shutil.disk_usage(root).free < required_bytes:
|
|
new_root = input("磁盘空间不足,请输入新的下载目录继续: ").strip()
|
|
if not new_root:
|
|
raise RuntimeError("Disk space is insufficient and no new directory was provided")
|
|
root = Path(new_root).resolve()
|
|
root.mkdir(parents=True, exist_ok=True)
|
|
self._current_library_root = root
|
|
return root
|
|
|
|
def _initialize_library_root(self, root_path: str | Path) -> Path:
|
|
normalized_root = Path(root_path).resolve()
|
|
with self._space_lock:
|
|
if self._current_library_root is None:
|
|
self._current_library_root = normalized_root
|
|
return self._current_library_root
|
|
|
|
@staticmethod
|
|
def _normalize_singers(value: object) -> str | None:
|
|
if not isinstance(value, str):
|
|
return None
|
|
text = value.strip()
|
|
if not text or text.upper() == "NULL":
|
|
return None
|
|
return text
|
|
|
|
@staticmethod
|
|
def _detect_download_platform(song_info: object, fallback_platform: str) -> str:
|
|
detected_platform = normalize_source_name(getattr(song_info, "source", None))
|
|
if detected_platform == "unknown":
|
|
return normalize_source_name(fallback_platform)
|
|
return detected_platform
|
|
|
|
def resolve_song_info_for_download(
|
|
self,
|
|
row: dict,
|
|
song_info: object,
|
|
download_sources: list[str] | None = None,
|
|
progress_callback=None,
|
|
) -> object:
|
|
return self._resolver.resolve_song_info(
|
|
row=row,
|
|
snapshot_song_info=song_info,
|
|
download_sources=download_sources or DEFAULT_DOWNLOAD_SOURCES,
|
|
progress_callback=progress_callback,
|
|
)
|
|
|
|
@staticmethod
|
|
def _detect_quality_label(song_info: object, actual_ext: str | None, fallback: str | None = None) -> str | None:
|
|
raw_data = getattr(song_info, "raw_data", None)
|
|
if isinstance(raw_data, dict):
|
|
quality = raw_data.get("quality")
|
|
if quality:
|
|
return str(quality)
|
|
normalized_ext = normalize_audio_ext(actual_ext or getattr(song_info, "ext", None))
|
|
if normalized_ext in LOSSLESS_EXTENSIONS:
|
|
return "lossless"
|
|
if normalized_ext:
|
|
return "standard"
|
|
return fallback
|
|
|
|
@staticmethod
|
|
def _build_display_text(row: dict) -> str:
|
|
display_name = str(row.get("name") or row.get("id") or "")
|
|
singers = str(row.get("singers") or "").strip()
|
|
return f"{display_name} / {singers}".strip(" /")
|
|
|
|
@staticmethod
|
|
def _normalize_lyric_text(value: object) -> str | None:
|
|
if not isinstance(value, str):
|
|
return None
|
|
text = value.replace("\r\n", "\n").strip()
|
|
if not text or text.upper() == "NULL":
|
|
return None
|
|
return text
|
|
|
|
def _resolve_lyrics_text(self, *, song_info: object | None, row: dict[str, object] | None = None) -> str | None:
|
|
lyric_text = self._normalize_lyric_text(getattr(song_info, "lyric", None))
|
|
if lyric_text:
|
|
return lyric_text
|
|
row = row or {}
|
|
title = self._normalize_lyric_text(getattr(song_info, "song_name", None)) or self._normalize_lyric_text(row.get("name"))
|
|
singers = self._normalize_singers(getattr(song_info, "singers", None)) or self._normalize_singers(row.get("singers"))
|
|
if not title or not singers:
|
|
return None
|
|
try:
|
|
with self._lyric_search_timeout_guard():
|
|
_lyric_result, lyric = LyricSearchClient.search(track_name=title, artist_name=singers)
|
|
except _LyricSearchTimeout:
|
|
return None
|
|
return self._normalize_lyric_text(lyric)
|
|
|
|
def _sync_lyrics_for_saved_song(
|
|
self,
|
|
*,
|
|
row: dict[str, object],
|
|
song_info: object | None,
|
|
saved_path: Path,
|
|
overwrite_lyrics: bool,
|
|
worker_callback=None,
|
|
display_text: str | None = None,
|
|
) -> str:
|
|
try:
|
|
lyric_text = self._resolve_lyrics_text(song_info=song_info, row=row)
|
|
if not lyric_text:
|
|
self._emit_worker_progress(
|
|
row,
|
|
worker_callback,
|
|
display_text=display_text,
|
|
last_progress_text="lyrics unavailable",
|
|
)
|
|
return "skipped"
|
|
if hasattr(song_info, "lyric"):
|
|
song_info.lyric = lyric_text
|
|
lrc_path = saved_path.with_suffix(".lrc")
|
|
if lrc_path.exists() and not overwrite_lyrics:
|
|
self._emit_worker_progress(
|
|
row,
|
|
worker_callback,
|
|
display_text=display_text,
|
|
last_progress_text="lyrics exists, skipped",
|
|
)
|
|
return "skipped"
|
|
saved = SongInfoUtils.savelrctofile(saved_path, lyric_text, overwrite=overwrite_lyrics)
|
|
if saved:
|
|
self._emit_worker_progress(
|
|
row,
|
|
worker_callback,
|
|
display_text=display_text,
|
|
last_progress_text="lyrics saved",
|
|
)
|
|
return "saved"
|
|
self._emit_worker_progress(
|
|
row,
|
|
worker_callback,
|
|
display_text=display_text,
|
|
last_progress_text="lyrics skipped",
|
|
)
|
|
return "skipped"
|
|
except Exception as exc:
|
|
self._emit_worker_progress(
|
|
row,
|
|
worker_callback,
|
|
display_text=display_text,
|
|
last_progress_text=f"lyrics failed: {type(exc).__name__}: {exc}",
|
|
)
|
|
return "failed"
|
|
|
|
@staticmethod
|
|
def _emit_worker_progress(
|
|
row: dict,
|
|
progress_callback,
|
|
*,
|
|
display_text: str | None,
|
|
downloaded_bytes: int | None = None,
|
|
total_bytes: int | None = None,
|
|
speed_bytes_per_sec: int | None = None,
|
|
progress_percent: int | None = None,
|
|
last_progress_text: str | None = None,
|
|
) -> None:
|
|
if progress_callback is None:
|
|
return
|
|
state: dict[str, object] = {
|
|
"current_song_id": int(row["id"]) if row.get("id") is not None else None,
|
|
"current_playlist_id": row.get("playlist_id"),
|
|
"current_display_text": display_text,
|
|
}
|
|
if downloaded_bytes is not None:
|
|
state["downloaded_bytes"] = int(downloaded_bytes)
|
|
if total_bytes is not None:
|
|
state["total_bytes"] = int(total_bytes)
|
|
if speed_bytes_per_sec is not None:
|
|
state["speed_bytes_per_sec"] = int(speed_bytes_per_sec)
|
|
if progress_percent is not None:
|
|
state["progress_percent"] = int(progress_percent)
|
|
if last_progress_text is not None:
|
|
state["last_progress_text"] = str(last_progress_text)
|
|
progress_callback(**state)
|
|
|
|
def _monitor_save_path(
|
|
self,
|
|
*,
|
|
save_path: Path,
|
|
expected_bytes: int | None,
|
|
progress_callback,
|
|
stop_event: threading.Event,
|
|
row: dict,
|
|
display_text: str | None,
|
|
interval_seconds: float = 0.02,
|
|
) -> None:
|
|
last_size = 0
|
|
last_change_at = time.monotonic()
|
|
while not stop_event.wait(interval_seconds):
|
|
if not save_path.exists():
|
|
continue
|
|
try:
|
|
current_size = int(save_path.stat().st_size)
|
|
except OSError:
|
|
continue
|
|
if current_size <= last_size:
|
|
continue
|
|
now = time.monotonic()
|
|
delta_bytes = current_size - last_size
|
|
delta_seconds = max(now - last_change_at, 1e-6)
|
|
total_bytes = int(expected_bytes or current_size)
|
|
self._emit_worker_progress(
|
|
row,
|
|
progress_callback,
|
|
display_text=display_text,
|
|
downloaded_bytes=current_size,
|
|
total_bytes=total_bytes,
|
|
speed_bytes_per_sec=int(delta_bytes / delta_seconds),
|
|
progress_percent=_progress_percent(current_size, total_bytes),
|
|
last_progress_text=_format_progress_text(current_size, total_bytes),
|
|
)
|
|
last_size = current_size
|
|
last_change_at = now
|
|
|
|
def resolve_song_row(
|
|
self,
|
|
row,
|
|
library_root: str | Path,
|
|
download_sources: list[str] | None = None,
|
|
worker_callback=None,
|
|
) -> ResolvedDownloadPayload | None:
|
|
"""Resolve a snapshot into a downloadable payload and choose the active local target."""
|
|
row_dict = dict(row)
|
|
default_root = self._initialize_library_root(library_root)
|
|
self.repository.ensure_local_backend(default_root, name="default-local", is_default=True)
|
|
display_text = self._build_display_text(row_dict)
|
|
self._emit_worker_progress(
|
|
row_dict,
|
|
worker_callback,
|
|
display_text=display_text,
|
|
)
|
|
metadata = json.loads(row_dict["metadata_json"]) if row_dict.get("metadata_json") else {}
|
|
song_info = deserialize_song_info(metadata.get("snapshot"))
|
|
if song_info is None:
|
|
return None
|
|
resolve_progress_callback = None
|
|
if worker_callback is not None:
|
|
resolve_progress_callback = lambda message: self._emit_worker_progress(
|
|
row_dict,
|
|
worker_callback,
|
|
display_text=display_text,
|
|
last_progress_text=message,
|
|
)
|
|
resolved_song_info = self.resolve_song_info_for_download(
|
|
row=row_dict,
|
|
song_info=song_info,
|
|
download_sources=download_sources,
|
|
progress_callback=resolve_progress_callback,
|
|
)
|
|
if resolved_song_info is None:
|
|
return None
|
|
target_root = self.ensure_space(
|
|
default_root,
|
|
getattr(resolved_song_info, "file_size_bytes", None) or row_dict.get("file_size_bytes"),
|
|
)
|
|
is_default_root = target_root.resolve() == default_root
|
|
backend_id = self.repository.ensure_local_backend(
|
|
target_root,
|
|
name="default-local" if is_default_root else None,
|
|
is_default=is_default_root,
|
|
)
|
|
expected_bytes = int(
|
|
getattr(resolved_song_info, "file_size_bytes", None) or row_dict.get("file_size_bytes") or 0
|
|
) or None
|
|
return ResolvedDownloadPayload(
|
|
row=row_dict,
|
|
display_text=display_text,
|
|
default_root=default_root,
|
|
target_root=target_root,
|
|
backend_id=backend_id,
|
|
expected_bytes=expected_bytes,
|
|
resolved_song_info=resolved_song_info,
|
|
)
|
|
|
|
def download_resolved_song(
|
|
self,
|
|
resolved_payload: ResolvedDownloadPayload,
|
|
worker_callback=None,
|
|
lyrics_enabled: bool = True,
|
|
overwrite_lyrics: bool = False,
|
|
) -> bool:
|
|
row = resolved_payload.row
|
|
song_info = resolved_payload.resolved_song_info
|
|
download_platform = self._detect_download_platform(song_info, str(row["platform"]))
|
|
client = self.get_client(download_platform)
|
|
singers = self._normalize_singers(getattr(song_info, "singers", None)) or self._normalize_singers(
|
|
row.get("singers")
|
|
)
|
|
relative_dir = build_download_relative_dir(platform=download_platform, singers=singers)
|
|
target_dir = resolved_payload.target_root / relative_dir
|
|
target_dir.mkdir(parents=True, exist_ok=True)
|
|
song_info.work_dir = str(target_dir)
|
|
if hasattr(song_info, "_save_path"):
|
|
song_info._save_path = None
|
|
save_path: Path | None = None
|
|
monitor_stop: threading.Event | None = None
|
|
monitor_thread: threading.Thread | None = None
|
|
self._emit_worker_progress(
|
|
row,
|
|
worker_callback,
|
|
display_text=resolved_payload.display_text,
|
|
last_progress_text=f"starting download via {download_platform}",
|
|
)
|
|
if worker_callback is not None and hasattr(song_info, "save_path"):
|
|
shortenpathsinsonginfos([song_info])
|
|
save_path = Path(song_info.save_path)
|
|
monitor_stop = threading.Event()
|
|
monitor_thread = threading.Thread(
|
|
target=self._monitor_save_path,
|
|
kwargs={
|
|
"save_path": save_path,
|
|
"expected_bytes": resolved_payload.expected_bytes,
|
|
"progress_callback": worker_callback,
|
|
"stop_event": monitor_stop,
|
|
"row": row,
|
|
"display_text": resolved_payload.display_text,
|
|
},
|
|
daemon=True,
|
|
name=f"download-monitor-{row.get('id')}",
|
|
)
|
|
monitor_thread.start()
|
|
try:
|
|
downloaded = client.download(
|
|
[song_info],
|
|
num_threadings=1,
|
|
request_overrides=self._request_overrides((10, 60)),
|
|
auto_supplement_song=False,
|
|
)
|
|
except TypeError:
|
|
downloaded = client.download(
|
|
[song_info],
|
|
num_threadings=1,
|
|
auto_supplement_song=False,
|
|
)
|
|
finally:
|
|
if monitor_stop is not None:
|
|
monitor_stop.set()
|
|
if monitor_thread is not None:
|
|
monitor_thread.join(timeout=1.0)
|
|
if save_path is not None and save_path.exists():
|
|
try:
|
|
final_size = int(save_path.stat().st_size)
|
|
except OSError:
|
|
final_size = 0
|
|
if final_size > 0:
|
|
total_bytes = int(resolved_payload.expected_bytes or final_size)
|
|
self._emit_worker_progress(
|
|
row,
|
|
worker_callback,
|
|
display_text=resolved_payload.display_text,
|
|
downloaded_bytes=final_size,
|
|
total_bytes=total_bytes,
|
|
progress_percent=_progress_percent(final_size, total_bytes),
|
|
last_progress_text=_format_progress_text(final_size, total_bytes),
|
|
)
|
|
if not downloaded:
|
|
return False
|
|
saved_song = downloaded[0]
|
|
saved_path = Path(saved_song.save_path)
|
|
relative_path = saved_path.relative_to(resolved_payload.target_root).as_posix()
|
|
actual_size = saved_path.stat().st_size if saved_path.exists() else row.get("file_size_bytes")
|
|
actual_ext = saved_path.suffix.lstrip(".") or row.get("ext")
|
|
self.repository.record_local_file(
|
|
song_id=int(row["id"]),
|
|
backend_id=resolved_payload.backend_id,
|
|
relative_path=relative_path,
|
|
file_size_bytes=actual_size,
|
|
ext=actual_ext,
|
|
quality_label=self._detect_quality_label(song_info, actual_ext, fallback=row.get("quality_label")),
|
|
)
|
|
if lyrics_enabled:
|
|
lyrics_song_info = saved_song if self._normalize_lyric_text(getattr(saved_song, "lyric", None)) else song_info
|
|
self._sync_lyrics_for_saved_song(
|
|
row=row,
|
|
song_info=lyrics_song_info,
|
|
saved_path=saved_path,
|
|
overwrite_lyrics=overwrite_lyrics,
|
|
worker_callback=worker_callback,
|
|
display_text=resolved_payload.display_text,
|
|
)
|
|
return True
|
|
|
|
def download_song_row(
|
|
self,
|
|
row,
|
|
library_root: str | Path,
|
|
download_sources: list[str] | None = None,
|
|
worker_callback=None,
|
|
lyrics_enabled: bool = True,
|
|
overwrite_lyrics: bool = False,
|
|
) -> bool:
|
|
resolved_payload = self.resolve_song_row(
|
|
row=row,
|
|
library_root=library_root,
|
|
download_sources=download_sources,
|
|
worker_callback=worker_callback,
|
|
)
|
|
if resolved_payload is None:
|
|
return False
|
|
return self.download_resolved_song(
|
|
resolved_payload=resolved_payload,
|
|
worker_callback=worker_callback,
|
|
lyrics_enabled=lyrics_enabled,
|
|
overwrite_lyrics=overwrite_lyrics,
|
|
)
|
|
|
|
def download_pending(
|
|
self,
|
|
library_root: str | Path,
|
|
sources: list[str] | None = None,
|
|
limit: int | None = None,
|
|
playlist_ids: list[int] | None = None,
|
|
download_sources: list[str] | None = None,
|
|
lyrics_enabled: bool = True,
|
|
overwrite_lyrics: bool = False,
|
|
) -> int:
|
|
planner = DownloadPlanner(self.repository)
|
|
queue = planner.build_download_queue(sources=sources, limit=limit, playlist_ids=playlist_ids)
|
|
default_root = self._initialize_library_root(library_root)
|
|
self.repository.ensure_local_backend(default_root, name="default-local", is_default=True)
|
|
downloaded_count = 0
|
|
|
|
with ThreadPoolExecutor(max_workers=self.worker_count) as executor:
|
|
futures = [
|
|
executor.submit(
|
|
self.download_song_row,
|
|
row=row,
|
|
library_root=default_root,
|
|
download_sources=download_sources,
|
|
lyrics_enabled=lyrics_enabled,
|
|
overwrite_lyrics=overwrite_lyrics,
|
|
)
|
|
for row in queue
|
|
]
|
|
for future in as_completed(futures):
|
|
result = future.result()
|
|
if result:
|
|
downloaded_count += 1
|
|
return downloaded_count
|
|
|
|
def sync_local_lyrics(
|
|
self,
|
|
sources: list[str] | None = None,
|
|
playlist_ids: list[int] | None = None,
|
|
limit: int | None = None,
|
|
overwrite_lyrics: bool = False,
|
|
progress_callback=None,
|
|
) -> dict[str, int]:
|
|
rows = self.repository.list_local_songs_for_lyrics(
|
|
sources=sources,
|
|
playlist_ids=playlist_ids,
|
|
limit=limit,
|
|
)
|
|
summary = {"total": len(rows), "processed": 0, "saved": 0, "skipped": 0, "failed": 0}
|
|
|
|
def emit_progress(*, row_dict: dict[str, object] | None = None, display_text: str | None = None, last_status: str | None = None, last_progress_text: str | None = None) -> None:
|
|
if progress_callback is None:
|
|
return
|
|
state: dict[str, object] = {
|
|
"total": summary["total"],
|
|
"processed": summary["processed"],
|
|
"saved": summary["saved"],
|
|
"skipped": summary["skipped"],
|
|
"failed": summary["failed"],
|
|
"progress_percent": _progress_percent(summary["processed"], summary["total"]),
|
|
}
|
|
if row_dict is not None:
|
|
state["current_song_id"] = int(row_dict["id"]) if row_dict.get("id") is not None else None
|
|
state["current_playlist_id"] = row_dict.get("playlist_id")
|
|
if display_text is not None:
|
|
state["current_display_text"] = display_text
|
|
if last_status is not None:
|
|
state["last_status"] = last_status
|
|
if last_progress_text is not None:
|
|
state["last_progress_text"] = last_progress_text
|
|
progress_callback(**state)
|
|
|
|
def process_row(row) -> tuple[dict[str, object], str, str, str]:
|
|
row_dict = dict(row)
|
|
display_text = self._build_display_text(row_dict)
|
|
try:
|
|
local_file_path = row_dict.get("local_file_path")
|
|
if not local_file_path:
|
|
return row_dict, display_text, "failed", "missing local file path"
|
|
saved_path = Path(str(local_file_path))
|
|
if not saved_path.exists():
|
|
return row_dict, display_text, "failed", "local file missing"
|
|
metadata = json.loads(row_dict["metadata_json"]) if row_dict.get("metadata_json") else {}
|
|
song_info = deserialize_song_info(metadata.get("snapshot")) if isinstance(metadata, dict) else None
|
|
status = self._sync_lyrics_for_saved_song(
|
|
row=row_dict,
|
|
song_info=song_info,
|
|
saved_path=saved_path,
|
|
overwrite_lyrics=overwrite_lyrics,
|
|
display_text=display_text,
|
|
)
|
|
normalized_status = status if status in {"saved", "skipped", "failed"} else "failed"
|
|
status_text = {
|
|
"saved": "lyrics saved",
|
|
"skipped": "lyrics skipped",
|
|
"failed": "lyrics failed",
|
|
}[normalized_status]
|
|
return row_dict, display_text, normalized_status, status_text
|
|
except Exception as exc:
|
|
return row_dict, display_text, "failed", f"lyrics failed: {type(exc).__name__}: {exc}"
|
|
|
|
emit_progress()
|
|
with ThreadPoolExecutor(max_workers=self.worker_count) as executor:
|
|
futures = [executor.submit(process_row, row) for row in rows]
|
|
for future in as_completed(futures):
|
|
row_dict, display_text, status, status_text = future.result()
|
|
summary["processed"] += 1
|
|
summary[status] += 1
|
|
emit_progress(
|
|
row_dict=row_dict,
|
|
display_text=display_text,
|
|
last_status=status,
|
|
last_progress_text=status_text,
|
|
)
|
|
return summary
|