from __future__ import annotations import json import threading from dataclasses import dataclass from pathlib import Path from typing import Callable from musicdl.catalogsync.downloader import CatalogDownloader from musicdl.catalogsync.repository import CatalogRepository from musicdl.catalogsync.services import CatalogSyncService from musicdl.catalogsync.uploader import CatalogUploader from .repository import OpsRepository NON_MUSIC_RESOURCE_REASON = "非音乐资源(有声榜条目)" NON_MUSIC_RESOURCE_CODE = "NON_MUSIC_RESOURCE" @dataclass class ResolvedStageDownloadTask: item_id: int playlist_id: int | None row: dict[str, object] resolved_payload: object def _format_error(exc: Exception) -> str: return f"{type(exc).__name__}: {exc}" class _TransitionUpdateError(RuntimeError): pass def _ensure_transition_applied(applied: bool, *, item_id: int, action: str) -> None: if applied: return raise _TransitionUpdateError( f"CAS transition failed for item {item_id}: {action} returned False" ) def _mark_failed_or_raise(ops_repo: OpsRepository, *, item_id: int, error_message: str, cause: Exception) -> None: if ops_repo.mark_item_failed(item_id=item_id, error_message=error_message): return raise RuntimeError( f"CAS transition failed for item {item_id}: mark_item_failed returned False while handling error: {error_message}" ) from cause def _mark_non_music_resource_skipped_or_raise(ops_repo: OpsRepository, *, item_id: int) -> None: _ensure_transition_applied( ops_repo.mark_item_skipped( item_id=item_id, reason_message=NON_MUSIC_RESOURCE_REASON, reason_code=NON_MUSIC_RESOURCE_CODE, ), item_id=item_id, action="mark_item_skipped", ) def _is_non_music_resource_download_row(row: dict[str, object] | None) -> bool: row = row or {} remote_song_id = str(row.get("remote_song_id") or "").strip().lower() if remote_song_id.startswith("qqtop_"): return True metadata_json = row.get("metadata_json") if not metadata_json: return False try: metadata = json.loads(str(metadata_json)) except Exception: return False if not isinstance(metadata, dict): return False snapshot = metadata.get("snapshot") if not isinstance(snapshot, dict): return False raw_data = snapshot.get("raw_data") if not isinstance(raw_data, dict): return False search = raw_data.get("search") if not isinstance(search, dict): return False return bool(search.get("qq_toplist_fallback")) class CollectStageExecutor: def __init__( self, db_path: str | Path, service: CatalogSyncService | None = None, ops_repo: OpsRepository | None = None, ): self.db_path = Path(db_path) self.ops_repo = ops_repo or OpsRepository(self.db_path) self.catalog_repo = CatalogRepository(self.db_path) self.service = service or CatalogSyncService(repository=self.catalog_repo) def process_item(self, item_id: int, worker_name: str, *, already_claimed: bool = False) -> None: if not already_claimed: self.ops_repo.claim_item(item_id=item_id, worker_name=worker_name) try: item = self.ops_repo.get_item(item_id) if item is None: raise RuntimeError(f"Unknown item: {item_id}") source = str(item.payload.get("source") or "").strip() if not source: raise RuntimeError(f"Collect item {item_id} is missing source") display_text = f"collect:{source}" self.ops_repo.update_worker_state( worker_name=worker_name, current_job_item_id=item_id, status="running", current_display_text=display_text, last_progress_text="starting playlist collection", ) counts = self.service.collect_playlists( sources=[source], include_playlist_square=bool(item.payload.get("include_playlist_square", True)), include_toplist=bool(item.payload.get("include_toplist", True)), progress_callback=lambda event_type, payload: self.ops_repo.update_worker_state( worker_name=worker_name, current_job_item_id=item_id, status="running", current_display_text=display_text, last_progress_text=self._format_progress_text(event_type, payload), ), ) _ensure_transition_applied( self.ops_repo.mark_item_succeeded(item_id=item_id, result_payload={"counts": counts}), item_id=item_id, action="mark_item_succeeded", ) except Exception as exc: failure_message = _format_error(exc) _mark_failed_or_raise( self.ops_repo, item_id=item_id, error_message=failure_message, cause=exc, ) if isinstance(exc, _TransitionUpdateError): raise @staticmethod def _format_progress_text(event_type: str, payload: dict[str, object]) -> str: if event_type == "playlist_square_page": page = int(payload.get("page") or 0) total = int(payload.get("total") or 0) new_count = int(payload.get("new_count") or 0) if payload.get("duplicate_page"): return f"page {page}: duplicate page detected, stopping at {total}" return f"page {page}: +{new_count}, total {total}" if event_type == "toplist_collected": return f"toplist: {int(payload.get('count') or 0)}" if event_type == "source_finished": counts = payload.get("counts") if isinstance(payload.get("counts"), dict) else {} playlist_square = int(counts.get("playlist_square") or 0) toplist = int(counts.get("toplist") or 0) return f"done: square {playlist_square}, toplist {toplist}" return str(event_type).replace("_", " ") class DownloadStageExecutor: def __init__( self, db_path: str | Path, library_root: str | Path, download_sources: list[str] | None = None, downloader: CatalogDownloader | None = None, ops_repo: OpsRepository | None = None, ): self.db_path = Path(db_path) self.library_root = Path(library_root) self.download_sources = list(download_sources or []) self.ops_repo = ops_repo or OpsRepository(self.db_path) self.catalog_repo = CatalogRepository(self.db_path) self.downloader = downloader or CatalogDownloader(repository=self.catalog_repo) def process_resolve_item( self, item_id: int, worker_name: str, *, ready_queue, already_claimed: bool = False, ) -> None: if not already_claimed: self.ops_repo.claim_item(item_id=item_id, worker_name=worker_name) row = self.ops_repo.build_download_row(item_id=item_id) song_id = int(row.get("id") or row.get("song_id") or 0) if song_id > 0 and self.catalog_repo.song_has_active_local_file(song_id): self.ops_repo.update_worker_state( worker_name=worker_name, current_job_item_id=item_id, status="running", current_song_id=song_id, current_playlist_id=row.get("playlist_id"), current_display_text=str(row.get("name") or row.get("id") or song_id), last_progress_text="already downloaded", ) _ensure_transition_applied( self.ops_repo.mark_item_succeeded( item_id=item_id, result_payload={"already_downloaded": True}, ), item_id=item_id, action="mark_item_succeeded", ) return resolved_payload = self.downloader.resolve_song_row( row=row, library_root=self.library_root, download_sources=self.download_sources, worker_callback=lambda **state: self.ops_repo.update_worker_state( worker_name=worker_name, current_job_item_id=item_id, status="running", **state, ), ) if resolved_payload is None: if _is_non_music_resource_download_row(row): _mark_non_music_resource_skipped_or_raise(self.ops_repo, item_id=item_id) return _ensure_transition_applied( self.ops_repo.mark_item_failed( item_id=item_id, error_message="resolve returned no downloadable song", ), item_id=item_id, action="mark_item_failed", ) return ready_queue.put( ResolvedStageDownloadTask( item_id=item_id, playlist_id=row.get("playlist_id"), row=row, resolved_payload=resolved_payload, ) ) def process_download_task(self, task: ResolvedStageDownloadTask, worker_name: str) -> None: try: succeeded = self.downloader.download_resolved_song( resolved_payload=task.resolved_payload, worker_callback=lambda **state: self.ops_repo.update_worker_state( worker_name=worker_name, current_job_item_id=task.item_id, status="running", **state, ), ) if succeeded: _ensure_transition_applied( self.ops_repo.mark_item_succeeded(item_id=task.item_id), item_id=task.item_id, action="mark_item_succeeded", ) return if _is_non_music_resource_download_row(task.row): _mark_non_music_resource_skipped_or_raise(self.ops_repo, item_id=task.item_id) return _ensure_transition_applied( self.ops_repo.mark_item_failed( item_id=task.item_id, error_message="download returned no file", ), item_id=task.item_id, action="mark_item_failed", ) except Exception as exc: if _is_non_music_resource_download_row(task.row): _mark_non_music_resource_skipped_or_raise(self.ops_repo, item_id=task.item_id) if isinstance(exc, _TransitionUpdateError): raise return failure_message = _format_error(exc) _mark_failed_or_raise( self.ops_repo, item_id=task.item_id, error_message=failure_message, cause=exc, ) if isinstance(exc, _TransitionUpdateError): raise def process_item(self, item_id: int, worker_name: str, *, already_claimed: bool = False) -> None: if not already_claimed: self.ops_repo.claim_item(item_id=item_id, worker_name=worker_name) row: dict[str, object] | None = None try: row = self.ops_repo.build_download_row(item_id=item_id) song_id = int(row.get("id") or row.get("song_id") or 0) if song_id > 0 and self.catalog_repo.song_has_active_local_file(song_id): self.ops_repo.update_worker_state( worker_name=worker_name, current_job_item_id=item_id, status="running", current_song_id=song_id, current_playlist_id=row.get("playlist_id"), current_display_text=str(row.get("name") or row.get("id") or song_id), last_progress_text="already downloaded", ) _ensure_transition_applied( self.ops_repo.mark_item_succeeded( item_id=item_id, result_payload={"already_downloaded": True}, ), item_id=item_id, action="mark_item_succeeded", ) return succeeded = self.downloader.download_song_row( row=row, library_root=self.library_root, download_sources=self.download_sources, worker_callback=lambda **state: self.ops_repo.update_worker_state( worker_name=worker_name, current_job_item_id=item_id, status="running", **state, ), ) if succeeded: _ensure_transition_applied( self.ops_repo.mark_item_succeeded(item_id=item_id), item_id=item_id, action="mark_item_succeeded", ) return if _is_non_music_resource_download_row(row): _mark_non_music_resource_skipped_or_raise(self.ops_repo, item_id=item_id) return _ensure_transition_applied( self.ops_repo.mark_item_failed( item_id=item_id, error_message="download returned no file", ), item_id=item_id, action="mark_item_failed", ) except Exception as exc: if _is_non_music_resource_download_row(row): _mark_non_music_resource_skipped_or_raise(self.ops_repo, item_id=item_id) if isinstance(exc, _TransitionUpdateError): raise return failure_message = _format_error(exc) _mark_failed_or_raise( self.ops_repo, item_id=item_id, error_message=failure_message, cause=exc, ) if isinstance(exc, _TransitionUpdateError): raise class SyncStageExecutor: def __init__( self, db_path: str | Path, service: CatalogSyncService | None = None, service_factory: Callable[[], CatalogSyncService] | None = None, ops_repo: OpsRepository | None = None, ): self.db_path = Path(db_path) self.ops_repo = ops_repo or OpsRepository(self.db_path) self.catalog_repo = CatalogRepository(self.db_path) if service_factory is not None: self._service_factory = service_factory elif service is not None: self._service_factory = lambda: service else: self._service_factory = lambda: CatalogSyncService(repository=self.catalog_repo) self._service_local = threading.local() def _get_service(self) -> CatalogSyncService: service = getattr(self._service_local, "service", None) if service is None: service = self._service_factory() self._service_local.service = service return service def process_item(self, item_id: int, worker_name: str, *, already_claimed: bool = False) -> None: if not already_claimed: self.ops_repo.claim_item(item_id=item_id, worker_name=worker_name) try: playlist_row = self.ops_repo.get_playlist_row_for_item(item_id=item_id) linked_count = int(self._get_service().sync_playlist_row(playlist_row)) _ensure_transition_applied( self.ops_repo.mark_item_succeeded( item_id=item_id, result_payload={"linked_count": linked_count}, ), item_id=item_id, action="mark_item_succeeded", ) except Exception as exc: failure_message = _format_error(exc) _mark_failed_or_raise( self.ops_repo, item_id=item_id, error_message=failure_message, cause=exc, ) if isinstance(exc, _TransitionUpdateError): raise class UploadStageExecutor: def __init__( self, db_path: str | Path, backend_name: str, uploader: CatalogUploader | None = None, ops_repo: OpsRepository | None = None, ): self.db_path = Path(db_path) self.backend_name = str(backend_name) self.ops_repo = ops_repo or OpsRepository(self.db_path) self.catalog_repo = CatalogRepository(self.db_path) self.uploader = uploader or CatalogUploader(repository=self.catalog_repo) def process_item(self, item_id: int, worker_name: str, *, already_claimed: bool = False) -> None: if not already_claimed: self.ops_repo.claim_item(item_id=item_id, worker_name=worker_name) try: upload_row = self.ops_repo.get_upload_row_for_item(item_id=item_id) result = str( self.uploader.process_upload_task_row( task_row=upload_row, backend_name=self.backend_name, ) ) if result == "succeeded": _ensure_transition_applied( self.ops_repo.mark_item_succeeded(item_id=item_id), item_id=item_id, action="mark_item_succeeded", ) else: _ensure_transition_applied( self.ops_repo.mark_item_failed( item_id=item_id, error_message=f"upload result: {result}", ), item_id=item_id, action="mark_item_failed", ) except Exception as exc: failure_message = _format_error(exc) _mark_failed_or_raise( self.ops_repo, item_id=item_id, error_message=failure_message, cause=exc, ) if isinstance(exc, _TransitionUpdateError): raise