from __future__ import annotations import logging import threading import time from collections import Counter from concurrent.futures import Future, ThreadPoolExecutor from pathlib import Path from queue import Queue from typing import Any from musicdl.catalogsync.catalog_export import run_catalog_export_command from musicdl.catalogsync.downloader import DownloadPlanner from musicdl.catalogsync.repository import CatalogRepository from musicdl.catalogsync.services import CatalogSyncService from musicdl.catalogsync.uploader import CatalogUploader from .jobdefs import DOWNLOAD_LANE, JOB_STAGE_SEQUENCES, job_lane_type from .executors import ( CollectStageExecutor, DownloadStageExecutor, SyncStageExecutor, UploadStageExecutor, ) from .models import JobStatus, StageStatus from .repository import OpsRepository DEFAULT_DOWNLOAD_WORKERS = 10 DEFAULT_SYNC_WORKERS = 4 logger = logging.getLogger(__name__) def _unique_preserve_order(values: list[str]) -> list[str]: normalized: list[str] = [] seen: set[str] = set() for value in values: item = str(value).strip() if not item or item in seen: continue normalized.append(item) seen.add(item) return normalized def _split_csv(value: Any) -> list[str]: if isinstance(value, list): return [str(item).strip() for item in value if str(item).strip()] if not value: return [] return [part.strip() for part in str(value).split(",") if part.strip()] def _int_value(value: Any, default: int) -> int: try: parsed = int(value) except (TypeError, ValueError): return default return parsed if parsed > 0 else default class OpsRunner: def __init__( self, repository: OpsRepository, sleep_seconds: float = 1.0, *, download_lane_concurrency: int = 1, general_lane_concurrency: int = 3, ): self.repository = repository self.sleep_seconds = max(float(sleep_seconds), 0.1) self.download_lane_concurrency = 1 self.general_lane_concurrency = max(int(general_lane_concurrency), 1) self._job_pool = ThreadPoolExecutor( max_workers=self.download_lane_concurrency + self.general_lane_concurrency ) self._futures: dict[int, Future[None]] = {} self._futures_lock = threading.Lock() self._playlist_export_lock = threading.Lock() self._catalog_export_lock = threading.Lock() self._exported_stage_playlists: set[tuple[int, int]] = set() self.db_path = Path(self.repository.db_path) self.catalog_repo = CatalogRepository(self.db_path) def recover_incomplete_jobs(self) -> None: for job in self.repository.list_recoverable_jobs(): self.repository.pause_job_for_recovery(job.id) for item in self.repository.list_running_items(job.id): self.repository.mark_item_interrupted( item.id, last_error="Recovery interrupted running item after runner restart.", ) self.repository.add_job_event( job.id, "recovery_requeued", "Recovered incomplete job and re-queued resumable work.", ) self.repository.resume_job(job.id) def apply_pending_commands(self) -> None: for command in self.repository.list_pending_commands(): command_type = str(command["command_type"]) job_id = int(command["job_run_id"]) command_id = int(command["id"]) target_item_id = command["target_item_id"] if command_type == "pause": self.repository.request_job_pause(job_id) elif command_type == "resume": self.repository.resume_job(job_id) elif command_type == "cancel": self.repository.cancel_job(job_id) elif command_type == "retry_item": if target_item_id is None: self.repository.add_job_event( job_id, "ignored_command", "retry_item command missing target_item_id.", details={"command_type": command_type, "command_id": command_id}, ) elif not self.repository.requeue_item( int(target_item_id), force=False, job_id=job_id ): self.repository.add_job_event( job_id, "retry_rejected", "retry_item command rejected.", item_id=int(target_item_id), details={"command_type": command_type, "command_id": command_id}, ) elif command_type == "force_retry_item": if target_item_id is None: self.repository.add_job_event( job_id, "ignored_command", "force_retry_item command missing target_item_id.", details={"command_type": command_type, "command_id": command_id}, ) elif not self.repository.requeue_item( int(target_item_id), force=True, job_id=job_id ): self.repository.add_job_event( job_id, "retry_rejected", "force_retry_item command rejected.", item_id=int(target_item_id), details={"command_type": command_type, "command_id": command_id}, ) else: self.repository.add_job_event( job_id, "ignored_command", "Unsupported command type.", details={"command_type": command_type, "command_id": command_id}, ) self.repository.mark_command_applied(command_id) def reconcile_pause_state(self, job_id: int) -> None: if self.repository.job_has_running_items(job_id): return self.repository.finalize_pause(job_id) def run_forever(self, stop_event=None) -> None: self.recover_incomplete_jobs() while stop_event is None or not stop_event.is_set(): worked = self.loop_once() if worked: continue if stop_event is not None: stop_event.wait(self.sleep_seconds) else: time.sleep(self.sleep_seconds) def loop_once(self) -> bool: had_commands = bool(self.repository.list_pending_commands()) self.apply_pending_commands() finished = self._reap_finished_jobs() started = self._start_eligible_jobs() return bool(had_commands or finished or started) def _reap_finished_jobs(self) -> int: finished_count = 0 finished_futures: list[tuple[int, Future[None]]] = [] with self._futures_lock: for job_id, future in list(self._futures.items()): if not future.done(): continue del self._futures[job_id] finished_futures.append((job_id, future)) for job_id, future in finished_futures: try: future.result() except Exception as exc: self.repository.add_job_event( job_id, "job_future_error", str(exc), ) job = self.repository.get_job(job_id) if job is not None and job.status not in { JobStatus.COMPLETED, JobStatus.COMPLETED_WITH_ERRORS, JobStatus.FAILED, JobStatus.CANCELED, JobStatus.PAUSED, }: self.repository.mark_job_finished( job_id, status=JobStatus.FAILED, last_error=str(exc), ) finished_count += 1 return finished_count def _submit_job(self, job_id: int) -> bool: with self._futures_lock: if job_id in self._futures: return False self._futures[job_id] = self._job_pool.submit(self._run_job, job_id) return True def _start_eligible_jobs(self) -> int: started_count = 0 active_jobs = self.repository.list_active_jobs() lane_counts = Counter(job_lane_type(job.job_type) for job in active_jobs) for active_job in active_jobs: if active_job.status == JobStatus.PAUSE_REQUESTED: self.reconcile_pause_state(active_job.id) continue if self._submit_job(active_job.id): started_count += 1 for queued_job in self.repository.list_queued_jobs(): lane_type = job_lane_type(queued_job.job_type) lane_limit = ( self.download_lane_concurrency if lane_type == DOWNLOAD_LANE else self.general_lane_concurrency ) if lane_counts[lane_type] >= lane_limit: continue claimed = self.repository.claim_job_if_queued(queued_job.id) if claimed is None: continue lane_counts[lane_type] += 1 if self._submit_job(claimed.id): started_count += 1 return started_count def _run_job(self, job_id: int) -> None: try: current_job = self.repository.get_job(job_id) if current_job is None: return if current_job.status == JobStatus.CANCELED: self.repository.finalize_canceled_job(job_id) return if current_job.status == JobStatus.PAUSE_REQUESTED: self.reconcile_pause_state(job_id) return if current_job.status == JobStatus.PAUSED: return if not self.repository.mark_job_running(job_id): current_job = self.repository.get_job(job_id) if current_job is not None: if current_job.status == JobStatus.CANCELED: self.repository.finalize_canceled_job(job_id) elif current_job.status == JobStatus.PAUSE_REQUESTED: self.reconcile_pause_state(job_id) return current_job = self.repository.get_job(job_id) if current_job is None: return self._ensure_job_stages(current_job) while True: current_job = self.repository.get_job(job_id) if current_job is None: return if current_job.status == JobStatus.CANCELED: self.repository.finalize_canceled_job(job_id) return if current_job.status == JobStatus.PAUSE_REQUESTED: self.reconcile_pause_state(job_id) return stage = self._next_runnable_stage(job_id) if stage is None: if self._job_is_finished(job_id): self._finalize_job(job_id) return stages = self.repository.list_job_stages(job_id) if any( stage_row.status in {StageStatus.PAUSED, StageStatus.PAUSE_REQUESTED} for stage_row in stages ): self.repository.pause_job_for_recovery(job_id) return raise RuntimeError("Job has no runnable stages but is not finished.") self._run_stage(current_job, stage) refreshed_job = self.repository.get_job(job_id) if refreshed_job is None: return if refreshed_job.status == JobStatus.CANCELED: self.repository.finalize_canceled_job(job_id) return if refreshed_job.status == JobStatus.PAUSE_REQUESTED: self.reconcile_pause_state(job_id) return if self._job_is_finished(job_id): self._finalize_job(job_id) return except Exception as exc: self.repository.add_job_event( job_id, "job_execution_error", str(exc), ) job = self.repository.get_job(job_id) if job is not None and job.status not in { JobStatus.COMPLETED, JobStatus.COMPLETED_WITH_ERRORS, JobStatus.FAILED, JobStatus.CANCELED, JobStatus.PAUSED, }: self.repository.mark_job_finished( job_id, status=JobStatus.FAILED, last_error=str(exc), ) def _ensure_job_stages(self, job) -> None: existing = self.repository.list_job_stages(job.id) if existing: return for seq_no, stage_type in enumerate( JOB_STAGE_SEQUENCES.get(str(job.job_type), []), start=1 ): self.repository.create_stage(job_run_id=job.id, stage_type=stage_type, seq_no=seq_no) def _next_runnable_stage(self, job_id: int): for stage in self.repository.list_job_stages(job_id): if stage.status in {StageStatus.PENDING, StageStatus.RUNNING}: return stage return None def _job_sources(self, job) -> list[str]: return _unique_preserve_order( list(job.sources or _split_csv(job.config_snapshot.get("SOURCES"))) ) def _job_download_sources(self, job) -> list[str]: return _unique_preserve_order( list( job.download_sources or _split_csv(job.config_snapshot.get("download_sources")) or _split_csv(job.config_snapshot.get("DOWNLOAD_SOURCES")) ) ) def _job_playlist_ids(self, job) -> list[int] | None: raw_value = job.playlist_scope.get("playlist_ids") if not isinstance(raw_value, list): return None playlist_ids = [] for item in raw_value: try: playlist_ids.append(int(item)) except (TypeError, ValueError): continue return playlist_ids or None def _resolve_library_root(self, job) -> Path: mapping = dict(job.config_snapshot or {}) library_dir = mapping.get("LIBRARY_DIR") or mapping.get("library_dir") if library_dir: return Path(str(library_dir)).resolve() try: backend = self.catalog_repo.get_backend(self.catalog_repo.get_default_backend_id()) except Exception: backend = None if backend and backend["base_path"]: return Path(str(backend["base_path"])).resolve() raise RuntimeError("No library root configured for download stage") def _resolve_playlists_root(self, job) -> Path | None: mapping = dict(job.config_snapshot or {}) root_dir = mapping.get("ROOT_DIR") or mapping.get("root_dir") if root_dir: path = Path(str(root_dir)).resolve() / "playlists" path.mkdir(parents=True, exist_ok=True) return path library_dir = mapping.get("LIBRARY_DIR") or mapping.get("library_dir") if library_dir: path = Path(str(library_dir)).resolve().parent / "playlists" path.mkdir(parents=True, exist_ok=True) return path library_root = self.catalog_repo.get_default_local_library_root() if library_root is None: return None path = library_root.parent / "playlists" path.mkdir(parents=True, exist_ok=True) return path def _mark_playlist_exported(self, stage_id: int, playlist_id: int) -> bool: key = (int(stage_id), int(playlist_id)) with self._playlist_export_lock: if key in self._exported_stage_playlists: return False self._exported_stage_playlists.add(key) return True def _forget_playlist_exported(self, stage_id: int, playlist_id: int) -> None: key = (int(stage_id), int(playlist_id)) with self._playlist_export_lock: self._exported_stage_playlists.discard(key) def _export_playlist_artifacts_for_playlist_if_ready(self, job, stage, playlist_id: int | None) -> bool: if str(stage.stage_type) != "download" or playlist_id is None: return False scoped_playlist_ids = self._job_playlist_ids(job) normalized_playlist_id = int(playlist_id) if not scoped_playlist_ids or normalized_playlist_id not in scoped_playlist_ids: return False if self.repository.playlist_has_open_items(stage.id, normalized_playlist_id): return False if not self._mark_playlist_exported(stage.id, normalized_playlist_id): return False playlists_root = self._resolve_playlists_root(job) if playlists_root is None: self.repository.add_job_event( job.id, "playlist_export_skipped", "Playlists root is not configured for scoped download export.", stage_id=stage.id, details={"playlist_id": normalized_playlist_id}, ) return False service = CatalogSyncService( repository=self.catalog_repo, playlists_root=playlists_root, ) try: folder_path = service.ensure_playlist_artifacts_for_playlist(normalized_playlist_id) except Exception as exc: self._forget_playlist_exported(stage.id, normalized_playlist_id) self.repository.add_job_event( job.id, "playlist_export_error", str(exc), stage_id=stage.id, details={"playlist_id": normalized_playlist_id}, ) return False if folder_path is None: self.repository.add_job_event( job.id, "playlist_export_skipped", "Playlist export row is unavailable.", stage_id=stage.id, details={"playlist_id": normalized_playlist_id}, ) return False self.repository.add_job_event( job.id, "playlist_export_ready", f"Exported playlist artifacts for playlist {normalized_playlist_id}.", stage_id=stage.id, details={"playlist_id": normalized_playlist_id, "playlist_dir": str(folder_path)}, ) return True def _refresh_ready_playlist_artifacts(self, job, stage) -> list[int]: if str(stage.stage_type) != "download": return [] playlist_ids = self._job_playlist_ids(job) if not playlist_ids: return [] exported_ids: list[int] = [] for playlist_id in playlist_ids: if self._export_playlist_artifacts_for_playlist_if_ready(job, stage, int(playlist_id)): exported_ids.append(int(playlist_id)) return exported_ids def _resolve_backend_name(self, job) -> str: value = ( job.config_snapshot.get("OBJECT_BACKEND_NAME") or job.config_snapshot.get("object_backend_name") or "" ) return str(value).strip() def _worker_count(self, job, stage_type: str) -> int: mapping = dict(job.config_snapshot or {}) if stage_type == "download": return _int_value(mapping.get("DOWNLOAD_WORKERS"), DEFAULT_DOWNLOAD_WORKERS) if stage_type == "sync": return _int_value(mapping.get("SYNC_WORKERS"), DEFAULT_SYNC_WORKERS) if stage_type == "upload": return _int_value(mapping.get("UPLOAD_WORKERS"), 4) return 1 def _download_stage_worker_split(self, total_workers: int) -> tuple[int, int]: normalized_total = max(int(total_workers or 0), 1) if normalized_total == 1: return 1, 0 if normalized_total == 2: return 1, 1 if normalized_total <= 5: download_workers = 1 else: download_workers = 2 resolver_workers = max(1, normalized_total - download_workers) return resolver_workers, download_workers def _materialize_stage_items(self, job, stage) -> None: refreshed_stage = self.repository.get_stage(stage.id) if refreshed_stage is None or refreshed_stage.total_items > 0: return playlist_ids = self._job_playlist_ids(job) if stage.stage_type == "collect": for source in self._job_sources(job): self.repository.create_item( job_stage_id=stage.id, item_type="collect_source", item_key=f"collect:{source}", payload={ "source": source, "include_playlist_square": True, "include_toplist": True, }, ) return if stage.stage_type == "sync": if playlist_ids: playlist_rows = self.catalog_repo.list_playlists_by_ids(playlist_ids) else: playlist_rows = self.catalog_repo.list_playlists(sources=self._job_sources(job)) for row in playlist_rows: playlist_id = int(row["id"]) self.repository.create_item( job_stage_id=stage.id, item_type="playlist_sync", item_key=f"playlist:{playlist_id}", playlist_id=playlist_id, payload={"playlist_row": dict(row)}, ) return if stage.stage_type == "download": planner = DownloadPlanner(self.catalog_repo) for row in planner.build_download_queue( sources=self._job_sources(job), playlist_ids=playlist_ids, ): song_id = int(row.get("song_id") or row["id"]) self.repository.create_item( job_stage_id=stage.id, item_type="song_download", item_key=f"song:{song_id}", song_id=song_id, playlist_id=row.get("playlist_id"), payload={"row": dict(row)}, ) return if stage.stage_type == "upload": backend_name = self._resolve_backend_name(job) if not backend_name: return uploader = CatalogUploader(self.catalog_repo) uploader.enqueue_missing_uploads( backend_name=backend_name, sources=self._job_sources(job) or None, playlist_ids=playlist_ids, ) backend = self.catalog_repo.get_backend_by_name(backend_name) if backend is None: return rows = self.catalog_repo.list_pending_upload_tasks(target_backend_id=int(backend["id"])) for row in rows: upload_task_id = int(row["id"]) self.repository.create_item( job_stage_id=stage.id, item_type="file_upload", item_key=f"upload:{upload_task_id}", file_location_id=row["source_location_id"], payload={ "upload_task_id": upload_task_id, "upload_row": dict(row), }, ) def _build_executor(self, job, stage): if stage.stage_type == "collect": return CollectStageExecutor(self.db_path, ops_repo=self.repository) if stage.stage_type == "sync": return SyncStageExecutor(self.db_path, ops_repo=self.repository) if stage.stage_type == "download": return DownloadStageExecutor( self.db_path, library_root=self._resolve_library_root(job), download_sources=self._job_download_sources(job), ops_repo=self.repository, ) if stage.stage_type == "upload": backend_name = self._resolve_backend_name(job) if not backend_name: raise RuntimeError("No object backend configured for upload stage") return UploadStageExecutor( self.db_path, backend_name=backend_name, ops_repo=self.repository, ) raise RuntimeError(f"Unsupported stage type: {stage.stage_type}") def _export_playlist_artifacts_for_job(self, job, stage) -> None: exported_ids = self._refresh_ready_playlist_artifacts(job, stage) playlist_ids = self._job_playlist_ids(job) or [] if str(stage.stage_type) != "download" or not playlist_ids: return try: self.repository.add_job_event( job.id, "playlist_exported", f"Refreshed playlist export folders for {len(exported_ids)} playlists.", stage_id=stage.id, details={"playlist_ids": exported_ids, "scoped_playlist_ids": playlist_ids}, ) except Exception: logger.warning( "Failed to persist playlist_exported event for job %s stage %s.", job.id, stage.id, exc_info=True, ) def _run_catalog_export_for_stage(self, job, stage) -> None: if str(stage.stage_type) != "download": return with self._catalog_export_lock: refreshed_job = self.repository.get_job(job.id) or job if refreshed_job.status in { JobStatus.CANCELED, JobStatus.PAUSE_REQUESTED, JobStatus.PAUSED, }: return self.repository.add_job_event( job.id, "catalog_export_started", "Started post-download catalog export command.", stage_id=stage.id, ) try: result = run_catalog_export_command(refreshed_job.config_snapshot) except Exception as exc: self.repository.add_job_event( job.id, "catalog_export_failed", f"Catalog export command raised an error: {exc}", stage_id=stage.id, details={"error": str(exc) or exc.__class__.__name__}, ) return details: dict[str, Any] = {} if result.command: details["command"] = result.command if result.workdir: details["workdir"] = result.workdir if result.returncode is not None: details["returncode"] = result.returncode if result.stdout: details["stdout"] = result.stdout if result.stderr: details["stderr"] = result.stderr normalized_status = str(result.status).strip().lower() if normalized_status == "succeeded": event_type = "catalog_export_succeeded" message = "Catalog export command completed successfully." elif normalized_status == "skipped": event_type = "catalog_export_skipped" message = "Catalog export command was skipped." else: event_type = "catalog_export_failed" message = "Catalog export command failed." self.repository.add_job_event( job.id, event_type, message, stage_id=stage.id, details=details or None, ) def _run_stage_with_single_pool(self, job, stage, executor, worker_count: int) -> None: def worker_loop(worker_index: int) -> None: worker_name = f"{stage.stage_type}-{worker_index + 1}" while True: active_job = self.repository.get_job(job.id) if active_job is None or active_job.status in { JobStatus.PAUSE_REQUESTED, JobStatus.CANCELED, }: return item = self.repository.claim_next_stage_item(stage.id, worker_name) if item is None: return try: executor.process_item(item.id, worker_name, already_claimed=True) self._export_playlist_artifacts_for_playlist_if_ready( job, stage, item.playlist_id, ) except Exception as exc: self.repository.add_job_event( job.id, "item_execution_error", str(exc), stage_id=stage.id, item_id=item.id, ) with ThreadPoolExecutor(max_workers=worker_count) as pool: futures = [pool.submit(worker_loop, index) for index in range(worker_count)] for future in futures: future.result() def _run_download_stage_pipeline(self, job, stage, executor, worker_count: int) -> None: resolver_workers, download_workers = self._download_stage_worker_split(worker_count) if download_workers == 0: self._run_stage_with_single_pool(job, stage, executor, worker_count) return ready_queue: Queue = Queue(maxsize=max(1, download_workers * 2)) stop_event = threading.Event() sentinel = object() def resolver_loop(worker_index: int) -> None: worker_name = f"resolve-{worker_index + 1}" while not stop_event.is_set(): active_job = self.repository.get_job(job.id) if active_job is None or active_job.status in { JobStatus.PAUSE_REQUESTED, JobStatus.CANCELED, }: stop_event.set() return item = self.repository.claim_next_stage_item(stage.id, worker_name) if item is None: return try: executor.process_resolve_item( item.id, worker_name, ready_queue=ready_queue, already_claimed=True, ) self._export_playlist_artifacts_for_playlist_if_ready( job, stage, item.playlist_id, ) except Exception as exc: self.repository.add_job_event( job.id, "item_execution_error", str(exc), stage_id=stage.id, item_id=item.id, ) def download_loop(worker_index: int) -> None: worker_name = f"download-{worker_index + 1}" while True: task = ready_queue.get() if task is sentinel: return try: executor.process_download_task(task, worker_name) self._export_playlist_artifacts_for_playlist_if_ready( job, stage, getattr(task, "playlist_id", None), ) except Exception as exc: self.repository.add_job_event( job.id, "item_execution_error", str(exc), stage_id=stage.id, item_id=getattr(task, "item_id", None), ) with ThreadPoolExecutor(max_workers=resolver_workers + download_workers) as pool: resolver_futures = [pool.submit(resolver_loop, index) for index in range(resolver_workers)] download_futures = [pool.submit(download_loop, index) for index in range(download_workers)] for future in resolver_futures: future.result() for _ in range(download_workers): ready_queue.put(sentinel) for future in download_futures: future.result() def _run_stage(self, job, stage) -> None: if stage.status == StageStatus.PENDING: self.repository.mark_stage_running(stage.id) self.repository.add_job_event( job.id, "stage_started", f"Started stage {stage.stage_type}.", stage_id=stage.id, ) self._materialize_stage_items(job, stage) refreshed_stage = self.repository.get_stage(stage.id) if refreshed_stage is None: return if refreshed_stage.total_items == 0: self.repository.mark_stage_finished(stage.id, status=StageStatus.COMPLETED) final_stage = self.repository.get_stage(stage.id) if final_stage is not None: self._export_playlist_artifacts_for_job(job, final_stage) self._run_catalog_export_for_stage(job, final_stage) return executor = self._build_executor(job, refreshed_stage) worker_count = self._worker_count(job, refreshed_stage.stage_type) if refreshed_stage.stage_type == "download": self._run_download_stage_pipeline(job, refreshed_stage, executor, worker_count) else: self._run_stage_with_single_pool(job, refreshed_stage, executor, worker_count) current_job = self.repository.get_job(job.id) if current_job is not None: if current_job.status == JobStatus.CANCELED: self.repository.finalize_canceled_job(job.id) return if current_job.status == JobStatus.PAUSE_REQUESTED: self.reconcile_pause_state(job.id) return current_stage = self.repository.get_stage(stage.id) if current_stage is None: return if self.repository.stage_has_open_items(stage.id): return if current_stage.failed_items > 0: self.repository.mark_stage_finished( stage.id, status=StageStatus.FAILED, last_error="One or more stage items failed.", ) else: self.repository.mark_stage_finished(stage.id, status=StageStatus.COMPLETED) final_stage = self.repository.get_stage(stage.id) if final_stage is not None: self._export_playlist_artifacts_for_job(job, final_stage) self._run_catalog_export_for_stage(job, final_stage) def _job_is_finished(self, job_id: int) -> bool: stages = self.repository.list_job_stages(job_id) if not stages: return True return all( stage.status in {StageStatus.COMPLETED, StageStatus.FAILED, StageStatus.SKIPPED} for stage in stages ) def _finalize_job(self, job_id: int) -> None: stages = self.repository.list_job_stages(job_id) if not stages: self.repository.mark_job_finished(job_id, status=JobStatus.COMPLETED) return has_errors = any( stage.status == StageStatus.FAILED or stage.failed_items > 0 for stage in stages ) self.repository.mark_job_finished( job_id, status=JobStatus.COMPLETED_WITH_ERRORS if has_errors else JobStatus.COMPLETED, last_error="One or more stage items failed." if has_errors else None, )