2446 lines
89 KiB
Python
2446 lines
89 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import sqlite3
|
|
from contextlib import contextmanager
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from musicdl.catalogsync.db import connect_database
|
|
|
|
from .jobdefs import DOWNLOAD_LANE, display_name, job_lane_type, primary_stage_type
|
|
from .models import ItemStatus, JobItem, JobRun, JobStatus, JobStage, StageStatus
|
|
|
|
|
|
def _json_dumps(data: dict[str, Any] | None) -> str | None:
|
|
if data is None:
|
|
return None
|
|
return json.dumps(data, ensure_ascii=False)
|
|
|
|
|
|
def _json_loads(data: str | None) -> dict[str, Any]:
|
|
if not data:
|
|
return {}
|
|
decoded = json.loads(data)
|
|
if isinstance(decoded, dict):
|
|
return decoded
|
|
return {}
|
|
|
|
|
|
def _encode_sources(values: list[str] | None) -> str | None:
|
|
if not values:
|
|
return None
|
|
normalized = [value.strip() for value in values if value and value.strip()]
|
|
if not normalized:
|
|
return None
|
|
return ",".join(normalized)
|
|
|
|
|
|
def _decode_sources(value: str | None) -> list[str]:
|
|
if not value:
|
|
return []
|
|
return [item.strip() for item in str(value).split(",") if item.strip()]
|
|
|
|
|
|
def _progress_percent(completed: int, total: int) -> int:
|
|
normalized_total = max(int(total or 0), 0)
|
|
normalized_completed = max(int(completed or 0), 0)
|
|
if normalized_total <= 0:
|
|
return 0
|
|
if normalized_completed >= normalized_total:
|
|
return 100
|
|
return int((normalized_completed * 100) / normalized_total)
|
|
|
|
|
|
def _format_speed_text(speed_bytes_per_sec: int) -> str:
|
|
speed_value = int(speed_bytes_per_sec or 0)
|
|
if speed_value <= 0:
|
|
return ""
|
|
if speed_value >= 1024 * 1024:
|
|
return f"{speed_value / (1024 * 1024):.1f} MB/s"
|
|
if speed_value >= 1024:
|
|
return f"{speed_value / 1024:.1f} KB/s"
|
|
return f"{speed_value} B/s"
|
|
|
|
|
|
def _scope_summary(playlist_scope: dict[str, Any] | None) -> str:
|
|
playlist_ids = (playlist_scope or {}).get("playlist_ids")
|
|
if not isinstance(playlist_ids, list):
|
|
return "all playlists"
|
|
count = len([value for value in playlist_ids if value is not None])
|
|
if count <= 0:
|
|
return "all playlists"
|
|
suffix = "playlist" if count == 1 else "playlists"
|
|
return f"{count} {suffix}"
|
|
|
|
|
|
def _completed_stage_items(stage: JobStage | None) -> int:
|
|
if stage is None:
|
|
return 0
|
|
total_items = max(int(stage.total_items or 0), 0)
|
|
pending_items = max(int(stage.pending_items or 0), 0)
|
|
running_items = max(int(stage.running_items or 0), 0)
|
|
return max(total_items - pending_items - running_items, 0)
|
|
|
|
|
|
def _primary_progress_percent(stage: JobStage | None, worker_progress_values: list[float]) -> int:
|
|
if stage is not None and int(stage.total_items or 0) > 0:
|
|
return _progress_percent(_completed_stage_items(stage), int(stage.total_items or 0))
|
|
if not worker_progress_values:
|
|
return 0
|
|
return int(max(worker_progress_values))
|
|
|
|
|
|
def _primary_progress_text(
|
|
stage: JobStage | None,
|
|
worker_progress_texts: list[str],
|
|
) -> str:
|
|
for text in worker_progress_texts:
|
|
normalized = str(text or "").strip()
|
|
if normalized:
|
|
return normalized
|
|
if stage is None:
|
|
return ""
|
|
completed_items = _completed_stage_items(stage)
|
|
total_items = max(int(stage.total_items or 0), 0)
|
|
noun = "songs" if str(stage.stage_type) == "download" else "items"
|
|
return f"{completed_items} / {total_items} {noun}"
|
|
|
|
|
|
_STAGE_COUNTER_BY_STATUS: dict[ItemStatus, str] = {
|
|
ItemStatus.PENDING: "pending_items",
|
|
ItemStatus.RUNNING: "running_items",
|
|
ItemStatus.SUCCEEDED: "success_items",
|
|
ItemStatus.FAILED: "failed_items",
|
|
ItemStatus.SKIPPED: "skipped_items",
|
|
}
|
|
|
|
|
|
class OpsRepository:
|
|
def __init__(self, db_path: str | Path):
|
|
self.db_path = Path(db_path)
|
|
|
|
def _connect(self) -> sqlite3.Connection:
|
|
return connect_database(self.db_path)
|
|
|
|
@contextmanager
|
|
def _connection(self):
|
|
conn = self._connect()
|
|
try:
|
|
yield conn
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
def _fetchone(self, query: str, params: tuple[Any, ...] = ()) -> sqlite3.Row | None:
|
|
with self._connection() as conn:
|
|
return conn.execute(query, params).fetchone()
|
|
|
|
def _fetchall(self, query: str, params: tuple[Any, ...] = ()) -> list[sqlite3.Row]:
|
|
with self._connection() as conn:
|
|
return conn.execute(query, params).fetchall()
|
|
|
|
def _row_to_job(self, row: sqlite3.Row) -> JobRun:
|
|
return JobRun(
|
|
id=int(row["id"]),
|
|
job_type=str(row["job_type"]),
|
|
status=JobStatus(str(row["status"])),
|
|
priority=int(row["priority"]),
|
|
requested_by=row["requested_by"],
|
|
config_snapshot=_json_loads(row["config_snapshot_json"]),
|
|
sources=_decode_sources(row["sources"]),
|
|
download_sources=_decode_sources(row["download_sources"]),
|
|
playlist_scope=_json_loads(row["playlist_scope_json"]),
|
|
created_at=row["created_at"],
|
|
started_at=row["started_at"],
|
|
ended_at=row["ended_at"],
|
|
last_error=row["last_error"],
|
|
resume_token=row["resume_token"],
|
|
)
|
|
|
|
def _row_to_stage(self, row: sqlite3.Row) -> JobStage:
|
|
return JobStage(
|
|
id=int(row["id"]),
|
|
job_run_id=int(row["job_run_id"]),
|
|
stage_type=str(row["stage_type"]),
|
|
seq_no=int(row["seq_no"]),
|
|
status=StageStatus(str(row["status"])),
|
|
total_items=int(row["total_items"]),
|
|
pending_items=int(row["pending_items"]),
|
|
running_items=int(row["running_items"]),
|
|
success_items=int(row["success_items"]),
|
|
failed_items=int(row["failed_items"]),
|
|
skipped_items=int(row["skipped_items"]),
|
|
started_at=row["started_at"],
|
|
ended_at=row["ended_at"],
|
|
last_error=row["last_error"],
|
|
)
|
|
|
|
def _row_to_item(self, row: sqlite3.Row) -> JobItem:
|
|
return JobItem(
|
|
id=int(row["id"]),
|
|
job_stage_id=int(row["job_stage_id"]),
|
|
item_type=str(row["item_type"]),
|
|
item_key=str(row["item_key"]),
|
|
playlist_pool_id=row["playlist_pool_id"],
|
|
playlist_id=row["playlist_id"],
|
|
song_id=row["song_id"],
|
|
file_location_id=row["file_location_id"],
|
|
status=ItemStatus(str(row["status"])),
|
|
attempt_count=int(row["attempt_count"]),
|
|
max_attempts=int(row["max_attempts"]),
|
|
worker_id=row["worker_id"],
|
|
started_at=row["started_at"],
|
|
ended_at=row["ended_at"],
|
|
last_error=row["last_error"],
|
|
last_error_code=row["last_error_code"],
|
|
payload=_json_loads(row["payload_json"]),
|
|
)
|
|
|
|
def create_job(
|
|
self,
|
|
job_type: str,
|
|
config_snapshot: dict[str, Any],
|
|
requested_by: str | None = None,
|
|
sources: list[str] | None = None,
|
|
download_sources: list[str] | None = None,
|
|
playlist_scope: dict[str, Any] | None = None,
|
|
status: JobStatus = JobStatus.QUEUED,
|
|
) -> int:
|
|
if config_snapshot is None:
|
|
raise ValueError("config_snapshot is required")
|
|
with self._connection() as conn:
|
|
return int(
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO job_runs (
|
|
job_type,
|
|
status,
|
|
requested_by,
|
|
config_snapshot_json,
|
|
sources,
|
|
download_sources,
|
|
playlist_scope_json
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
job_type,
|
|
status.value,
|
|
requested_by,
|
|
_json_dumps(config_snapshot),
|
|
_encode_sources(sources),
|
|
_encode_sources(download_sources),
|
|
_json_dumps(playlist_scope),
|
|
),
|
|
).lastrowid
|
|
)
|
|
|
|
def get_job(self, job_id: int) -> JobRun | None:
|
|
row = self._fetchone("SELECT * FROM job_runs WHERE id = ?", (job_id,))
|
|
if row is None:
|
|
return None
|
|
return self._row_to_job(row)
|
|
|
|
def _normalize_playlist_scope_ids(self, playlist_scope: dict[str, Any] | None) -> list[int]:
|
|
raw_values = (playlist_scope or {}).get("playlist_ids")
|
|
if not isinstance(raw_values, list):
|
|
return []
|
|
normalized_ids: list[int] = []
|
|
seen: set[int] = set()
|
|
for value in raw_values:
|
|
playlist_id = self._coerce_int(value)
|
|
if playlist_id is None or playlist_id <= 0 or playlist_id in seen:
|
|
continue
|
|
normalized_ids.append(playlist_id)
|
|
seen.add(playlist_id)
|
|
return normalized_ids
|
|
|
|
def create_stage(
|
|
self,
|
|
job_run_id: int,
|
|
stage_type: str,
|
|
seq_no: int,
|
|
status: StageStatus = StageStatus.PENDING,
|
|
) -> int:
|
|
with self._connection() as conn:
|
|
return int(
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO job_stages (job_run_id, stage_type, seq_no, status)
|
|
VALUES (?, ?, ?, ?)
|
|
""",
|
|
(job_run_id, stage_type, seq_no, status.value),
|
|
).lastrowid
|
|
)
|
|
|
|
def get_stage(self, stage_id: int) -> JobStage | None:
|
|
row = self._fetchone("SELECT * FROM job_stages WHERE id = ?", (stage_id,))
|
|
if row is None:
|
|
return None
|
|
return self._row_to_stage(row)
|
|
|
|
def create_item(
|
|
self,
|
|
job_stage_id: int,
|
|
item_type: str,
|
|
item_key: str,
|
|
playlist_pool_id: int | None = None,
|
|
playlist_id: int | None = None,
|
|
song_id: int | None = None,
|
|
file_location_id: int | None = None,
|
|
payload: dict[str, Any] | None = None,
|
|
max_attempts: int = 3,
|
|
status: ItemStatus = ItemStatus.PENDING,
|
|
) -> int:
|
|
with self._connection() as conn:
|
|
item_id = int(
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO job_items (
|
|
job_stage_id,
|
|
item_type,
|
|
item_key,
|
|
playlist_pool_id,
|
|
playlist_id,
|
|
song_id,
|
|
file_location_id,
|
|
status,
|
|
max_attempts,
|
|
payload_json
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
job_stage_id,
|
|
item_type,
|
|
item_key,
|
|
playlist_pool_id,
|
|
playlist_id,
|
|
song_id,
|
|
file_location_id,
|
|
status.value,
|
|
max_attempts,
|
|
_json_dumps(payload),
|
|
),
|
|
).lastrowid
|
|
)
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_stages
|
|
SET
|
|
total_items = total_items + 1,
|
|
pending_items = pending_items + CASE WHEN ? = 'pending' THEN 1 ELSE 0 END,
|
|
running_items = running_items + CASE WHEN ? = 'running' THEN 1 ELSE 0 END,
|
|
success_items = success_items + CASE WHEN ? = 'succeeded' THEN 1 ELSE 0 END,
|
|
failed_items = failed_items + CASE WHEN ? = 'failed' THEN 1 ELSE 0 END,
|
|
skipped_items = skipped_items + CASE WHEN ? = 'skipped' THEN 1 ELSE 0 END
|
|
WHERE id = ?
|
|
""",
|
|
(
|
|
status.value,
|
|
status.value,
|
|
status.value,
|
|
status.value,
|
|
status.value,
|
|
job_stage_id,
|
|
),
|
|
)
|
|
return item_id
|
|
|
|
def get_item(self, item_id: int) -> JobItem | None:
|
|
row = self._fetchone("SELECT * FROM job_items WHERE id = ?", (item_id,))
|
|
if row is None:
|
|
return None
|
|
return self._row_to_item(row)
|
|
|
|
def _adjust_stage_item_counters(
|
|
self,
|
|
conn: sqlite3.Connection,
|
|
*,
|
|
stage_id: int,
|
|
from_status: ItemStatus | None,
|
|
to_status: ItemStatus | None,
|
|
) -> None:
|
|
if from_status == to_status:
|
|
return
|
|
|
|
from_counter = _STAGE_COUNTER_BY_STATUS.get(from_status) if from_status else None
|
|
to_counter = _STAGE_COUNTER_BY_STATUS.get(to_status) if to_status else None
|
|
|
|
if from_counter:
|
|
conn.execute(
|
|
f"""
|
|
UPDATE job_stages
|
|
SET {from_counter} = CASE
|
|
WHEN {from_counter} > 0 THEN {from_counter} - 1
|
|
ELSE 0
|
|
END
|
|
WHERE id = ?
|
|
""",
|
|
(stage_id,),
|
|
)
|
|
if to_counter:
|
|
conn.execute(
|
|
f"""
|
|
UPDATE job_stages
|
|
SET {to_counter} = {to_counter} + 1
|
|
WHERE id = ?
|
|
""",
|
|
(stage_id,),
|
|
)
|
|
|
|
def _recalculate_stage_counters(self, conn: sqlite3.Connection, stage_id: int) -> None:
|
|
row = conn.execute(
|
|
"""
|
|
SELECT
|
|
COUNT(*) AS total_items,
|
|
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) AS pending_items,
|
|
SUM(CASE WHEN status = 'running' THEN 1 ELSE 0 END) AS running_items,
|
|
SUM(CASE WHEN status = 'succeeded' THEN 1 ELSE 0 END) AS success_items,
|
|
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS failed_items,
|
|
SUM(CASE WHEN status = 'skipped' THEN 1 ELSE 0 END) AS skipped_items
|
|
FROM job_items
|
|
WHERE job_stage_id = ?
|
|
""",
|
|
(stage_id,),
|
|
).fetchone()
|
|
if row is None:
|
|
return
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_stages
|
|
SET
|
|
total_items = ?,
|
|
pending_items = ?,
|
|
running_items = ?,
|
|
success_items = ?,
|
|
failed_items = ?,
|
|
skipped_items = ?
|
|
WHERE id = ?
|
|
""",
|
|
(
|
|
int(row["total_items"] or 0),
|
|
int(row["pending_items"] or 0),
|
|
int(row["running_items"] or 0),
|
|
int(row["success_items"] or 0),
|
|
int(row["failed_items"] or 0),
|
|
int(row["skipped_items"] or 0),
|
|
int(stage_id),
|
|
),
|
|
)
|
|
|
|
def list_recoverable_jobs(self) -> list[JobRun]:
|
|
rows = self._fetchall(
|
|
"""
|
|
SELECT *
|
|
FROM job_runs
|
|
WHERE status IN (?, ?)
|
|
ORDER BY priority ASC, created_at ASC, id ASC
|
|
""",
|
|
(JobStatus.RUNNING.value, JobStatus.PAUSE_REQUESTED.value),
|
|
)
|
|
return [self._row_to_job(row) for row in rows]
|
|
|
|
def pause_job_for_recovery(self, job_id: int) -> None:
|
|
with self._connection() as conn:
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_runs
|
|
SET status = ?, ended_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""",
|
|
(JobStatus.PAUSED.value, job_id),
|
|
)
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_stages
|
|
SET status = ?, ended_at = CURRENT_TIMESTAMP
|
|
WHERE job_run_id = ? AND status IN (?, ?)
|
|
""",
|
|
(
|
|
StageStatus.PAUSED.value,
|
|
job_id,
|
|
StageStatus.RUNNING.value,
|
|
StageStatus.PAUSE_REQUESTED.value,
|
|
),
|
|
)
|
|
|
|
def list_running_items(self, job_id: int) -> list[JobItem]:
|
|
rows = self._fetchall(
|
|
"""
|
|
SELECT i.*
|
|
FROM job_items AS i
|
|
JOIN job_stages AS s ON s.id = i.job_stage_id
|
|
WHERE s.job_run_id = ? AND i.status = ?
|
|
ORDER BY i.id ASC
|
|
""",
|
|
(job_id, ItemStatus.RUNNING.value),
|
|
)
|
|
return [self._row_to_item(row) for row in rows]
|
|
|
|
def mark_item_interrupted(self, item_id: int, last_error: str | None = None) -> bool:
|
|
with self._connection() as conn:
|
|
item_row = conn.execute(
|
|
"SELECT job_stage_id, status, worker_id FROM job_items WHERE id = ?",
|
|
(item_id,),
|
|
).fetchone()
|
|
if item_row is None:
|
|
return False
|
|
from_status = ItemStatus(str(item_row["status"]))
|
|
if from_status is not ItemStatus.RUNNING:
|
|
return False
|
|
|
|
cursor = conn.execute(
|
|
"""
|
|
UPDATE job_items
|
|
SET
|
|
status = ?,
|
|
worker_id = NULL,
|
|
ended_at = CURRENT_TIMESTAMP,
|
|
last_error = ?,
|
|
last_error_code = NULL
|
|
WHERE id = ? AND status = ?
|
|
""",
|
|
(ItemStatus.INTERRUPTED.value, last_error, item_id, from_status.value),
|
|
)
|
|
if cursor.rowcount != 1:
|
|
return False
|
|
self._adjust_stage_item_counters(
|
|
conn,
|
|
stage_id=int(item_row["job_stage_id"]),
|
|
from_status=from_status,
|
|
to_status=ItemStatus.INTERRUPTED,
|
|
)
|
|
worker_id = item_row["worker_id"]
|
|
if worker_id is not None:
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_workers
|
|
SET
|
|
status = 'idle',
|
|
current_job_item_id = NULL,
|
|
current_song_id = NULL,
|
|
current_playlist_id = NULL,
|
|
current_display_text = NULL,
|
|
heartbeat_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""",
|
|
(int(worker_id),),
|
|
)
|
|
return True
|
|
|
|
def add_job_event(
|
|
self,
|
|
job_id: int,
|
|
event_type: str,
|
|
message: str | None = None,
|
|
*,
|
|
stage_id: int | None = None,
|
|
item_id: int | None = None,
|
|
worker_id: int | None = None,
|
|
details: dict[str, Any] | None = None,
|
|
) -> int:
|
|
with self._connection() as conn:
|
|
return int(
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO job_events (
|
|
job_run_id,
|
|
job_stage_id,
|
|
job_item_id,
|
|
worker_id,
|
|
event_type,
|
|
message,
|
|
details_json
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
job_id,
|
|
stage_id,
|
|
item_id,
|
|
worker_id,
|
|
event_type,
|
|
message,
|
|
_json_dumps(details),
|
|
),
|
|
).lastrowid
|
|
)
|
|
|
|
def create_command(
|
|
self,
|
|
job_run_id: int,
|
|
command_type: str,
|
|
target_item_id: int | None = None,
|
|
payload: dict[str, Any] | None = None,
|
|
) -> int:
|
|
with self._connection() as conn:
|
|
return int(
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO job_commands (
|
|
job_run_id,
|
|
command_type,
|
|
target_item_id,
|
|
payload_json
|
|
)
|
|
VALUES (?, ?, ?, ?)
|
|
""",
|
|
(job_run_id, command_type, target_item_id, _json_dumps(payload)),
|
|
).lastrowid
|
|
)
|
|
|
|
def list_pending_commands(self) -> list[dict[str, Any]]:
|
|
rows = self._fetchall(
|
|
"""
|
|
SELECT *
|
|
FROM job_commands
|
|
WHERE status = 'pending'
|
|
ORDER BY id ASC
|
|
"""
|
|
)
|
|
return [dict(row) for row in rows]
|
|
|
|
def request_job_pause(self, job_id: int) -> None:
|
|
with self._connection() as conn:
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_runs
|
|
SET status = ?
|
|
WHERE id = ? AND status IN (?, ?, ?)
|
|
""",
|
|
(
|
|
JobStatus.PAUSE_REQUESTED.value,
|
|
job_id,
|
|
JobStatus.RUNNING.value,
|
|
JobStatus.QUEUED.value,
|
|
JobStatus.PAUSE_REQUESTED.value,
|
|
),
|
|
)
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_stages
|
|
SET status = ?
|
|
WHERE job_run_id = ? AND status = ?
|
|
""",
|
|
(
|
|
StageStatus.PAUSE_REQUESTED.value,
|
|
job_id,
|
|
StageStatus.RUNNING.value,
|
|
),
|
|
)
|
|
|
|
def resume_job(self, job_id: int) -> None:
|
|
with self._connection() as conn:
|
|
interrupted_counts = conn.execute(
|
|
"""
|
|
SELECT job_stage_id, COUNT(*) AS count_value
|
|
FROM job_items AS i
|
|
JOIN job_stages AS s ON s.id = i.job_stage_id
|
|
WHERE s.job_run_id = ? AND i.status = ?
|
|
GROUP BY job_stage_id
|
|
""",
|
|
(job_id, ItemStatus.INTERRUPTED.value),
|
|
).fetchall()
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_runs
|
|
SET status = ?, ended_at = NULL
|
|
WHERE id = ? AND status IN (?, ?)
|
|
""",
|
|
(
|
|
JobStatus.QUEUED.value,
|
|
job_id,
|
|
JobStatus.PAUSED.value,
|
|
JobStatus.PAUSE_REQUESTED.value,
|
|
),
|
|
)
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_stages
|
|
SET status = ?, ended_at = NULL
|
|
WHERE job_run_id = ? AND status IN (?, ?)
|
|
""",
|
|
(
|
|
StageStatus.PENDING.value,
|
|
job_id,
|
|
StageStatus.PAUSED.value,
|
|
StageStatus.PAUSE_REQUESTED.value,
|
|
),
|
|
)
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_items
|
|
SET
|
|
status = ?,
|
|
worker_id = NULL,
|
|
started_at = NULL,
|
|
ended_at = NULL,
|
|
last_error = NULL,
|
|
last_error_code = NULL
|
|
WHERE job_stage_id IN (
|
|
SELECT id FROM job_stages WHERE job_run_id = ?
|
|
) AND status = ?
|
|
""",
|
|
(
|
|
ItemStatus.PENDING.value,
|
|
job_id,
|
|
ItemStatus.INTERRUPTED.value,
|
|
),
|
|
)
|
|
for row in interrupted_counts:
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_stages
|
|
SET pending_items = pending_items + ?
|
|
WHERE id = ?
|
|
""",
|
|
(int(row["count_value"] or 0), int(row["job_stage_id"])),
|
|
)
|
|
|
|
def cancel_job(self, job_id: int) -> bool:
|
|
with self._connection() as conn:
|
|
job_row = conn.execute(
|
|
"SELECT id, status FROM job_runs WHERE id = ?",
|
|
(job_id,),
|
|
).fetchone()
|
|
if job_row is None:
|
|
return False
|
|
if str(job_row["status"]) not in {
|
|
JobStatus.QUEUED.value,
|
|
JobStatus.RUNNING.value,
|
|
JobStatus.PAUSE_REQUESTED.value,
|
|
JobStatus.PAUSED.value,
|
|
}:
|
|
return False
|
|
|
|
stage_rows = conn.execute(
|
|
"SELECT id, status FROM job_stages WHERE job_run_id = ?",
|
|
(job_id,),
|
|
).fetchall()
|
|
stage_ids = [int(row["id"]) for row in stage_rows]
|
|
|
|
if stage_ids:
|
|
placeholders = ", ".join("?" for _ in stage_ids)
|
|
conn.execute(
|
|
f"""
|
|
UPDATE job_items
|
|
SET
|
|
status = ?,
|
|
worker_id = NULL,
|
|
ended_at = CURRENT_TIMESTAMP,
|
|
last_error = ?,
|
|
last_error_code = NULL
|
|
WHERE job_stage_id IN ({placeholders}) AND status = ?
|
|
""",
|
|
(
|
|
ItemStatus.CANCELED.value,
|
|
"Job canceled by operator.",
|
|
*stage_ids,
|
|
ItemStatus.PENDING.value,
|
|
),
|
|
)
|
|
for stage_id in stage_ids:
|
|
self._recalculate_stage_counters(conn, stage_id)
|
|
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_stages
|
|
SET
|
|
status = ?,
|
|
ended_at = CURRENT_TIMESTAMP,
|
|
last_error = COALESCE(last_error, ?)
|
|
WHERE job_run_id = ? AND status IN (?, ?, ?)
|
|
""",
|
|
(
|
|
StageStatus.SKIPPED.value,
|
|
"Job canceled by operator.",
|
|
job_id,
|
|
StageStatus.PENDING.value,
|
|
StageStatus.PAUSE_REQUESTED.value,
|
|
StageStatus.PAUSED.value,
|
|
),
|
|
)
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_stages
|
|
SET
|
|
status = ?,
|
|
ended_at = CURRENT_TIMESTAMP,
|
|
last_error = COALESCE(last_error, ?)
|
|
WHERE job_run_id = ? AND status = ? AND running_items = 0
|
|
""",
|
|
(
|
|
StageStatus.SKIPPED.value,
|
|
"Job canceled by operator.",
|
|
job_id,
|
|
StageStatus.RUNNING.value,
|
|
),
|
|
)
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_workers
|
|
SET
|
|
status = 'idle',
|
|
current_job_item_id = NULL,
|
|
current_song_id = NULL,
|
|
current_playlist_id = NULL,
|
|
current_display_text = NULL,
|
|
heartbeat_at = CURRENT_TIMESTAMP
|
|
WHERE job_run_id = ?
|
|
""",
|
|
(job_id,),
|
|
)
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_runs
|
|
SET
|
|
status = ?,
|
|
ended_at = CURRENT_TIMESTAMP,
|
|
last_error = COALESCE(last_error, ?)
|
|
WHERE id = ?
|
|
""",
|
|
(
|
|
JobStatus.CANCELED.value,
|
|
"Job canceled by operator.",
|
|
job_id,
|
|
),
|
|
)
|
|
return True
|
|
|
|
def finalize_canceled_job(self, job_id: int) -> None:
|
|
with self._connection() as conn:
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_stages
|
|
SET
|
|
status = ?,
|
|
ended_at = CURRENT_TIMESTAMP,
|
|
last_error = COALESCE(last_error, ?)
|
|
WHERE job_run_id = ? AND status = ? AND running_items = 0
|
|
""",
|
|
(
|
|
StageStatus.SKIPPED.value,
|
|
"Job canceled by operator.",
|
|
job_id,
|
|
StageStatus.RUNNING.value,
|
|
),
|
|
)
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_runs
|
|
SET
|
|
status = ?,
|
|
ended_at = COALESCE(ended_at, CURRENT_TIMESTAMP),
|
|
last_error = COALESCE(last_error, ?)
|
|
WHERE id = ?
|
|
""",
|
|
(
|
|
JobStatus.CANCELED.value,
|
|
"Job canceled by operator.",
|
|
job_id,
|
|
),
|
|
)
|
|
|
|
def requeue_item(self, item_id: int, force: bool, job_id: int | None = None) -> bool:
|
|
with self._connection() as conn:
|
|
query = """
|
|
SELECT
|
|
i.job_stage_id,
|
|
i.status,
|
|
i.attempt_count,
|
|
i.max_attempts,
|
|
s.job_run_id,
|
|
s.status AS stage_status,
|
|
j.status AS job_status
|
|
FROM job_items AS i
|
|
JOIN job_stages AS s ON s.id = i.job_stage_id
|
|
JOIN job_runs AS j ON j.id = s.job_run_id
|
|
WHERE i.id = ?
|
|
"""
|
|
params: list[Any] = [item_id]
|
|
if job_id is not None:
|
|
query += " AND s.job_run_id = ?"
|
|
params.append(job_id)
|
|
row = conn.execute(query, tuple(params)).fetchone()
|
|
if row is None:
|
|
return False
|
|
|
|
status = ItemStatus(str(row["status"]))
|
|
if status not in {ItemStatus.FAILED, ItemStatus.INTERRUPTED, ItemStatus.CANCELED}:
|
|
return False
|
|
|
|
is_exhausted = int(row["attempt_count"]) >= int(row["max_attempts"])
|
|
if is_exhausted and not force:
|
|
return False
|
|
|
|
cursor = conn.execute(
|
|
"""
|
|
UPDATE job_items
|
|
SET
|
|
status = ?,
|
|
worker_id = NULL,
|
|
started_at = NULL,
|
|
ended_at = NULL,
|
|
last_error = NULL,
|
|
last_error_code = NULL
|
|
WHERE id = ? AND status = ?
|
|
""",
|
|
(ItemStatus.PENDING.value, item_id, status.value),
|
|
)
|
|
if cursor.rowcount != 1:
|
|
return False
|
|
self._adjust_stage_item_counters(
|
|
conn,
|
|
stage_id=int(row["job_stage_id"]),
|
|
from_status=status,
|
|
to_status=ItemStatus.PENDING,
|
|
)
|
|
if str(row["stage_status"]) in {
|
|
StageStatus.COMPLETED.value,
|
|
StageStatus.FAILED.value,
|
|
StageStatus.PAUSED.value,
|
|
StageStatus.SKIPPED.value,
|
|
}:
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_stages
|
|
SET status = ?, ended_at = NULL, last_error = NULL
|
|
WHERE id = ?
|
|
""",
|
|
(StageStatus.PENDING.value, int(row["job_stage_id"])),
|
|
)
|
|
if str(row["job_status"]) in {
|
|
JobStatus.COMPLETED.value,
|
|
JobStatus.COMPLETED_WITH_ERRORS.value,
|
|
JobStatus.FAILED.value,
|
|
JobStatus.PAUSED.value,
|
|
JobStatus.CANCELED.value,
|
|
}:
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_runs
|
|
SET status = ?, ended_at = NULL, last_error = NULL
|
|
WHERE id = ?
|
|
""",
|
|
(JobStatus.QUEUED.value, int(row["job_run_id"])),
|
|
)
|
|
return True
|
|
|
|
def mark_command_applied(self, command_id: int) -> None:
|
|
with self._connection() as conn:
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_commands
|
|
SET status = 'applied', applied_at = COALESCE(applied_at, CURRENT_TIMESTAMP)
|
|
WHERE id = ?
|
|
""",
|
|
(command_id,),
|
|
)
|
|
|
|
def job_has_running_items(self, job_id: int) -> bool:
|
|
row = self._fetchone(
|
|
"""
|
|
SELECT COUNT(1) AS cnt
|
|
FROM job_items AS i
|
|
JOIN job_stages AS s ON s.id = i.job_stage_id
|
|
WHERE s.job_run_id = ? AND i.status = ?
|
|
""",
|
|
(job_id, ItemStatus.RUNNING.value),
|
|
)
|
|
return bool(row and int(row["cnt"]) > 0)
|
|
|
|
def finalize_pause(self, job_id: int) -> None:
|
|
with self._connection() as conn:
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_runs
|
|
SET status = ?, ended_at = CURRENT_TIMESTAMP
|
|
WHERE id = ? AND status = ?
|
|
""",
|
|
(
|
|
JobStatus.PAUSED.value,
|
|
job_id,
|
|
JobStatus.PAUSE_REQUESTED.value,
|
|
),
|
|
)
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_stages
|
|
SET status = ?, ended_at = CURRENT_TIMESTAMP
|
|
WHERE job_run_id = ? AND status = ?
|
|
""",
|
|
(
|
|
StageStatus.PAUSED.value,
|
|
job_id,
|
|
StageStatus.PAUSE_REQUESTED.value,
|
|
),
|
|
)
|
|
|
|
def claim_next_runnable_job(self) -> JobRun | None:
|
|
row = self._fetchone(
|
|
"""
|
|
SELECT *
|
|
FROM job_runs
|
|
WHERE status = ?
|
|
ORDER BY priority ASC, created_at ASC, id ASC
|
|
LIMIT 1
|
|
""",
|
|
(JobStatus.QUEUED.value,),
|
|
)
|
|
if row is None:
|
|
return None
|
|
return self._row_to_job(row)
|
|
|
|
def claim_and_mark_next_runnable_job(self) -> JobRun | None:
|
|
with self._connection() as conn:
|
|
while True:
|
|
row = conn.execute(
|
|
"""
|
|
SELECT *
|
|
FROM job_runs
|
|
WHERE status = ?
|
|
ORDER BY priority ASC, created_at ASC, id ASC
|
|
LIMIT 1
|
|
""",
|
|
(JobStatus.QUEUED.value,),
|
|
).fetchone()
|
|
if row is None:
|
|
return None
|
|
|
|
cursor = conn.execute(
|
|
"""
|
|
UPDATE job_runs
|
|
SET
|
|
status = ?,
|
|
started_at = COALESCE(started_at, CURRENT_TIMESTAMP),
|
|
ended_at = NULL
|
|
WHERE id = ? AND status = ?
|
|
""",
|
|
(JobStatus.RUNNING.value, int(row["id"]), JobStatus.QUEUED.value),
|
|
)
|
|
if cursor.rowcount != 1:
|
|
continue
|
|
|
|
updated = conn.execute(
|
|
"SELECT * FROM job_runs WHERE id = ?",
|
|
(int(row["id"]),),
|
|
).fetchone()
|
|
if updated is None:
|
|
return None
|
|
return self._row_to_job(updated)
|
|
|
|
def mark_job_running(self, job_id: int) -> bool:
|
|
with self._connection() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
UPDATE job_runs
|
|
SET
|
|
status = ?,
|
|
started_at = COALESCE(started_at, CURRENT_TIMESTAMP),
|
|
ended_at = NULL
|
|
WHERE id = ?
|
|
AND status IN (?, ?)
|
|
""",
|
|
(
|
|
JobStatus.RUNNING.value,
|
|
job_id,
|
|
JobStatus.QUEUED.value,
|
|
JobStatus.RUNNING.value,
|
|
),
|
|
)
|
|
return cursor.rowcount == 1
|
|
|
|
def mark_stage_running(self, stage_id: int) -> None:
|
|
with self._connection() as conn:
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_stages
|
|
SET
|
|
status = ?,
|
|
started_at = COALESCE(started_at, CURRENT_TIMESTAMP),
|
|
ended_at = NULL
|
|
WHERE id = ?
|
|
""",
|
|
(StageStatus.RUNNING.value, stage_id),
|
|
)
|
|
|
|
def get_active_job(self) -> JobRun | None:
|
|
row = self._fetchone(
|
|
"""
|
|
SELECT *
|
|
FROM job_runs
|
|
WHERE status IN (?, ?)
|
|
ORDER BY
|
|
CASE status
|
|
WHEN ? THEN 0
|
|
WHEN ? THEN 1
|
|
ELSE 2
|
|
END,
|
|
COALESCE(started_at, created_at) ASC,
|
|
id ASC
|
|
LIMIT 1
|
|
""",
|
|
(
|
|
JobStatus.RUNNING.value,
|
|
JobStatus.PAUSE_REQUESTED.value,
|
|
JobStatus.RUNNING.value,
|
|
JobStatus.PAUSE_REQUESTED.value,
|
|
),
|
|
)
|
|
if row is None:
|
|
return None
|
|
return self._row_to_job(row)
|
|
|
|
def list_active_jobs(self) -> list[JobRun]:
|
|
rows = self._fetchall(
|
|
"""
|
|
SELECT *
|
|
FROM job_runs
|
|
WHERE status IN (?, ?)
|
|
ORDER BY
|
|
CASE status
|
|
WHEN ? THEN 0
|
|
WHEN ? THEN 1
|
|
ELSE 2
|
|
END,
|
|
priority ASC,
|
|
COALESCE(started_at, created_at) ASC,
|
|
id ASC
|
|
""",
|
|
(
|
|
JobStatus.RUNNING.value,
|
|
JobStatus.PAUSE_REQUESTED.value,
|
|
JobStatus.RUNNING.value,
|
|
JobStatus.PAUSE_REQUESTED.value,
|
|
),
|
|
)
|
|
return [self._row_to_job(row) for row in rows]
|
|
|
|
def list_queued_jobs(self, limit: int | None = None) -> list[JobRun]:
|
|
query = [
|
|
"""
|
|
SELECT *
|
|
FROM job_runs
|
|
WHERE status = ?
|
|
ORDER BY priority ASC, created_at ASC, id ASC
|
|
"""
|
|
]
|
|
params: list[Any] = [JobStatus.QUEUED.value]
|
|
if limit is not None:
|
|
query.append("LIMIT ?")
|
|
params.append(int(limit))
|
|
rows = self._fetchall("\n".join(query), tuple(params))
|
|
return [self._row_to_job(row) for row in rows]
|
|
|
|
def claim_job_if_queued(self, job_id: int) -> JobRun | None:
|
|
with self._connection() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
UPDATE job_runs
|
|
SET
|
|
status = ?,
|
|
started_at = COALESCE(started_at, CURRENT_TIMESTAMP),
|
|
ended_at = NULL
|
|
WHERE id = ? AND status = ?
|
|
""",
|
|
(JobStatus.RUNNING.value, int(job_id), JobStatus.QUEUED.value),
|
|
)
|
|
if cursor.rowcount != 1:
|
|
return None
|
|
row = conn.execute("SELECT * FROM job_runs WHERE id = ?", (int(job_id),)).fetchone()
|
|
if row is None:
|
|
return None
|
|
return self._row_to_job(row)
|
|
|
|
def list_job_stages(self, job_id: int) -> list[JobStage]:
|
|
rows = self._fetchall(
|
|
"""
|
|
SELECT *
|
|
FROM job_stages
|
|
WHERE job_run_id = ?
|
|
ORDER BY seq_no ASC, id ASC
|
|
""",
|
|
(job_id,),
|
|
)
|
|
return [self._row_to_stage(row) for row in rows]
|
|
|
|
def mark_stage_finished(
|
|
self,
|
|
stage_id: int,
|
|
*,
|
|
status: StageStatus,
|
|
last_error: str | None = None,
|
|
) -> None:
|
|
with self._connection() as conn:
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_stages
|
|
SET
|
|
status = ?,
|
|
ended_at = CURRENT_TIMESTAMP,
|
|
last_error = ?
|
|
WHERE id = ?
|
|
""",
|
|
(status.value, last_error, stage_id),
|
|
)
|
|
|
|
def mark_job_finished(
|
|
self,
|
|
job_id: int,
|
|
*,
|
|
status: JobStatus,
|
|
last_error: str | None = None,
|
|
) -> None:
|
|
with self._connection() as conn:
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_runs
|
|
SET
|
|
status = ?,
|
|
ended_at = CURRENT_TIMESTAMP,
|
|
last_error = ?
|
|
WHERE id = ?
|
|
""",
|
|
(status.value, last_error, job_id),
|
|
)
|
|
|
|
def claim_next_stage_item(self, stage_id: int, worker_name: str) -> JobItem | None:
|
|
normalized_worker = str(worker_name).strip() or "worker"
|
|
conn = self._connect()
|
|
try:
|
|
conn.execute("BEGIN IMMEDIATE")
|
|
row = conn.execute(
|
|
"""
|
|
SELECT i.*, s.job_run_id
|
|
FROM job_items AS i
|
|
JOIN job_stages AS s ON s.id = i.job_stage_id
|
|
WHERE i.job_stage_id = ? AND i.status = ?
|
|
ORDER BY i.id ASC
|
|
LIMIT 1
|
|
""",
|
|
(stage_id, ItemStatus.PENDING.value),
|
|
).fetchone()
|
|
if row is None:
|
|
conn.commit()
|
|
return None
|
|
|
|
worker_row = conn.execute(
|
|
"""
|
|
SELECT id
|
|
FROM job_workers
|
|
WHERE worker_name = ? AND job_stage_id = ?
|
|
ORDER BY id DESC
|
|
LIMIT 1
|
|
""",
|
|
(normalized_worker, stage_id),
|
|
).fetchone()
|
|
if worker_row is None:
|
|
worker_id = int(
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO job_workers (
|
|
job_run_id,
|
|
job_stage_id,
|
|
worker_name,
|
|
status,
|
|
current_job_item_id,
|
|
heartbeat_at
|
|
)
|
|
VALUES (?, ?, ?, 'running', ?, CURRENT_TIMESTAMP)
|
|
""",
|
|
(
|
|
int(row["job_run_id"]),
|
|
stage_id,
|
|
normalized_worker,
|
|
int(row["id"]),
|
|
),
|
|
).lastrowid
|
|
)
|
|
else:
|
|
worker_id = int(worker_row["id"])
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_workers
|
|
SET
|
|
status = 'running',
|
|
current_job_item_id = ?,
|
|
current_song_id = NULL,
|
|
current_playlist_id = NULL,
|
|
current_display_text = NULL,
|
|
last_progress_text = NULL,
|
|
downloaded_bytes = 0,
|
|
total_bytes = 0,
|
|
speed_bytes_per_sec = 0,
|
|
progress_percent = 0,
|
|
heartbeat_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""",
|
|
(int(row["id"]), worker_id),
|
|
)
|
|
|
|
updated = conn.execute(
|
|
"""
|
|
UPDATE job_items
|
|
SET
|
|
status = ?,
|
|
worker_id = ?,
|
|
attempt_count = attempt_count + 1,
|
|
started_at = COALESCE(started_at, CURRENT_TIMESTAMP),
|
|
ended_at = NULL
|
|
WHERE id = ? AND status = ?
|
|
""",
|
|
(
|
|
ItemStatus.RUNNING.value,
|
|
worker_id,
|
|
int(row["id"]),
|
|
ItemStatus.PENDING.value,
|
|
),
|
|
)
|
|
if updated.rowcount != 1:
|
|
conn.rollback()
|
|
return None
|
|
|
|
self._adjust_stage_item_counters(
|
|
conn,
|
|
stage_id=stage_id,
|
|
from_status=ItemStatus.PENDING,
|
|
to_status=ItemStatus.RUNNING,
|
|
)
|
|
claimed = conn.execute(
|
|
"SELECT * FROM job_items WHERE id = ?",
|
|
(int(row["id"]),),
|
|
).fetchone()
|
|
conn.commit()
|
|
if claimed is None:
|
|
return None
|
|
return self._row_to_item(claimed)
|
|
finally:
|
|
conn.close()
|
|
|
|
def mark_item_running(self, item_id: int, worker_id: int | None) -> bool:
|
|
with self._connection() as conn:
|
|
row = conn.execute(
|
|
"SELECT job_stage_id, status FROM job_items WHERE id = ?",
|
|
(item_id,),
|
|
).fetchone()
|
|
if row is None:
|
|
return False
|
|
from_status = ItemStatus(str(row["status"]))
|
|
if from_status is not ItemStatus.PENDING:
|
|
return False
|
|
|
|
cursor = conn.execute(
|
|
"""
|
|
UPDATE job_items
|
|
SET
|
|
status = ?,
|
|
worker_id = ?,
|
|
attempt_count = attempt_count + 1,
|
|
started_at = COALESCE(started_at, CURRENT_TIMESTAMP),
|
|
ended_at = NULL
|
|
WHERE id = ? AND status = ?
|
|
""",
|
|
(ItemStatus.RUNNING.value, worker_id, item_id, from_status.value),
|
|
)
|
|
if cursor.rowcount != 1:
|
|
return False
|
|
self._adjust_stage_item_counters(
|
|
conn,
|
|
stage_id=int(row["job_stage_id"]),
|
|
from_status=from_status,
|
|
to_status=ItemStatus.RUNNING,
|
|
)
|
|
return True
|
|
|
|
def claim_item(self, item_id: int, worker_name: str) -> JobItem:
|
|
normalized_worker = str(worker_name).strip() or "worker"
|
|
with self._connection() as conn:
|
|
row = conn.execute(
|
|
"""
|
|
SELECT i.*, s.job_run_id
|
|
FROM job_items AS i
|
|
JOIN job_stages AS s ON s.id = i.job_stage_id
|
|
WHERE i.id = ?
|
|
""",
|
|
(item_id,),
|
|
).fetchone()
|
|
if row is None:
|
|
raise RuntimeError(f"Unknown item: {item_id}")
|
|
|
|
from_status = ItemStatus(str(row["status"]))
|
|
if from_status is not ItemStatus.PENDING:
|
|
raise RuntimeError(
|
|
f"Item {item_id} is not claimable: expected {ItemStatus.PENDING.value}, got {from_status.value}"
|
|
)
|
|
|
|
worker_row = conn.execute(
|
|
"""
|
|
SELECT id
|
|
FROM job_workers
|
|
WHERE worker_name = ? AND job_stage_id = ?
|
|
ORDER BY id DESC
|
|
LIMIT 1
|
|
""",
|
|
(normalized_worker, int(row["job_stage_id"])),
|
|
).fetchone()
|
|
if worker_row is None:
|
|
worker_id = int(
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO job_workers (
|
|
job_run_id,
|
|
job_stage_id,
|
|
worker_name,
|
|
status,
|
|
current_job_item_id,
|
|
heartbeat_at
|
|
)
|
|
VALUES (?, ?, ?, 'running', ?, CURRENT_TIMESTAMP)
|
|
""",
|
|
(int(row["job_run_id"]), int(row["job_stage_id"]), normalized_worker, item_id),
|
|
).lastrowid
|
|
)
|
|
else:
|
|
worker_id = int(worker_row["id"])
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_workers
|
|
SET
|
|
status = 'running',
|
|
current_job_item_id = ?,
|
|
current_song_id = NULL,
|
|
current_playlist_id = NULL,
|
|
current_display_text = NULL,
|
|
last_progress_text = NULL,
|
|
downloaded_bytes = 0,
|
|
total_bytes = 0,
|
|
speed_bytes_per_sec = 0,
|
|
progress_percent = 0,
|
|
heartbeat_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""",
|
|
(item_id, worker_id),
|
|
)
|
|
|
|
cursor = conn.execute(
|
|
"""
|
|
UPDATE job_items
|
|
SET
|
|
status = ?,
|
|
worker_id = ?,
|
|
attempt_count = attempt_count + 1,
|
|
started_at = COALESCE(started_at, CURRENT_TIMESTAMP),
|
|
ended_at = NULL
|
|
WHERE id = ? AND status = ?
|
|
""",
|
|
(ItemStatus.RUNNING.value, worker_id, item_id, from_status.value),
|
|
)
|
|
if cursor.rowcount != 1:
|
|
raise RuntimeError(f"Failed to claim item {item_id}")
|
|
|
|
self._adjust_stage_item_counters(
|
|
conn,
|
|
stage_id=int(row["job_stage_id"]),
|
|
from_status=from_status,
|
|
to_status=ItemStatus.RUNNING,
|
|
)
|
|
claimed = conn.execute("SELECT * FROM job_items WHERE id = ?", (item_id,)).fetchone()
|
|
if claimed is None:
|
|
raise RuntimeError(f"Failed to load claimed item: {item_id}")
|
|
return self._row_to_item(claimed)
|
|
|
|
@staticmethod
|
|
def _coerce_int(value: Any) -> int | None:
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, bool):
|
|
return None
|
|
if isinstance(value, int):
|
|
return value
|
|
text = str(value).strip()
|
|
if not text:
|
|
return None
|
|
if text.isdigit() or (text.startswith("-") and text[1:].isdigit()):
|
|
try:
|
|
return int(text)
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
|
|
def build_download_row(self, item_id: int) -> dict[str, Any]:
|
|
with self._connection() as conn:
|
|
item_row = conn.execute(
|
|
"SELECT id, song_id, playlist_id, payload_json FROM job_items WHERE id = ?",
|
|
(item_id,),
|
|
).fetchone()
|
|
if item_row is None:
|
|
raise RuntimeError(f"Unknown item: {item_id}")
|
|
|
|
payload = _json_loads(item_row["payload_json"])
|
|
payload_row = payload.get("row")
|
|
row = dict(payload_row) if isinstance(payload_row, dict) else {}
|
|
|
|
song_id = next(
|
|
(
|
|
candidate
|
|
for candidate in (
|
|
self._coerce_int(item_row["song_id"]),
|
|
self._coerce_int(row.get("id")),
|
|
self._coerce_int(row.get("song_id")),
|
|
self._coerce_int(payload.get("song_id")),
|
|
)
|
|
if candidate is not None
|
|
),
|
|
None,
|
|
)
|
|
if song_id is None:
|
|
raise RuntimeError(f"Item {item_id} does not have song context")
|
|
|
|
song_row = conn.execute("SELECT * FROM songs WHERE id = ?", (song_id,)).fetchone()
|
|
merged: dict[str, Any] = dict(song_row) if song_row is not None else {}
|
|
merged.update(row)
|
|
merged.setdefault("id", song_id)
|
|
merged.setdefault("song_id", song_id)
|
|
if item_row["playlist_id"] is not None:
|
|
merged.setdefault("playlist_id", int(item_row["playlist_id"]))
|
|
|
|
missing = [key for key in ("id", "platform") if merged.get(key) in (None, "")]
|
|
if missing:
|
|
raise RuntimeError(
|
|
f"Download row for item {item_id} is incomplete; missing: {', '.join(missing)}"
|
|
)
|
|
return merged
|
|
|
|
def get_playlist_row_for_item(self, item_id: int) -> dict[str, Any]:
|
|
with self._connection() as conn:
|
|
item_row = conn.execute(
|
|
"SELECT id, playlist_id, payload_json FROM job_items WHERE id = ?",
|
|
(item_id,),
|
|
).fetchone()
|
|
if item_row is None:
|
|
raise RuntimeError(f"Unknown item: {item_id}")
|
|
|
|
payload = _json_loads(item_row["payload_json"])
|
|
payload_row = payload.get("playlist_row")
|
|
row = dict(payload_row) if isinstance(payload_row, dict) else {}
|
|
|
|
playlist_id = next(
|
|
(
|
|
candidate
|
|
for candidate in (
|
|
self._coerce_int(item_row["playlist_id"]),
|
|
self._coerce_int(row.get("id")),
|
|
self._coerce_int(row.get("playlist_id")),
|
|
self._coerce_int(payload.get("playlist_id")),
|
|
)
|
|
if candidate is not None
|
|
),
|
|
None,
|
|
)
|
|
if playlist_id is None:
|
|
raise RuntimeError(f"Item {item_id} does not have playlist context")
|
|
|
|
playlist_row = conn.execute("SELECT * FROM playlists WHERE id = ?", (playlist_id,)).fetchone()
|
|
merged: dict[str, Any] = dict(playlist_row) if playlist_row is not None else {}
|
|
merged.update(row)
|
|
merged.setdefault("id", playlist_id)
|
|
merged.setdefault("playlist_id", playlist_id)
|
|
|
|
missing = [key for key in ("id",) if merged.get(key) in (None, "")]
|
|
if missing:
|
|
raise RuntimeError(
|
|
f"Playlist row for item {item_id} is incomplete; missing: {', '.join(missing)}"
|
|
)
|
|
return merged
|
|
|
|
def get_upload_row_for_item(self, item_id: int) -> dict[str, Any]:
|
|
with self._connection() as conn:
|
|
item_row = conn.execute(
|
|
"SELECT id, item_key, file_location_id, payload_json FROM job_items WHERE id = ?",
|
|
(item_id,),
|
|
).fetchone()
|
|
if item_row is None:
|
|
raise RuntimeError(f"Unknown item: {item_id}")
|
|
|
|
payload = _json_loads(item_row["payload_json"])
|
|
payload_row = payload.get("upload_row")
|
|
row = dict(payload_row) if isinstance(payload_row, dict) else {}
|
|
|
|
upload_task_candidates: list[int] = []
|
|
for value in (
|
|
payload.get("upload_task_id"),
|
|
payload.get("task_id"),
|
|
payload.get("id"),
|
|
row.get("upload_task_id"),
|
|
row.get("task_id"),
|
|
row.get("id"),
|
|
):
|
|
normalized = self._coerce_int(value)
|
|
if normalized is not None and normalized not in upload_task_candidates:
|
|
upload_task_candidates.append(normalized)
|
|
suffix = str(item_row["item_key"] or "").rsplit(":", 1)[-1].strip()
|
|
suffix_id = self._coerce_int(suffix)
|
|
if suffix_id is not None and suffix_id not in upload_task_candidates:
|
|
upload_task_candidates.append(suffix_id)
|
|
|
|
source_location_candidates: list[int] = []
|
|
for value in (
|
|
item_row["file_location_id"],
|
|
payload.get("file_location_id"),
|
|
payload.get("source_location_id"),
|
|
row.get("file_location_id"),
|
|
row.get("source_location_id"),
|
|
):
|
|
normalized = self._coerce_int(value)
|
|
if normalized is not None and normalized not in source_location_candidates:
|
|
source_location_candidates.append(normalized)
|
|
|
|
query = """
|
|
SELECT ut.*, fl.absolute_path, fl.locator AS source_locator, fa.song_id
|
|
FROM upload_tasks AS ut
|
|
JOIN file_locations AS fl ON fl.id = ut.source_location_id
|
|
JOIN file_assets AS fa ON fa.id = ut.file_asset_id
|
|
"""
|
|
task_row: sqlite3.Row | None = None
|
|
for upload_task_id in upload_task_candidates:
|
|
task_row = conn.execute(query + " WHERE ut.id = ?", (upload_task_id,)).fetchone()
|
|
if task_row is not None:
|
|
break
|
|
if task_row is None:
|
|
for source_location_id in source_location_candidates:
|
|
task_row = conn.execute(
|
|
query + " WHERE ut.source_location_id = ? ORDER BY ut.id DESC LIMIT 1",
|
|
(source_location_id,),
|
|
).fetchone()
|
|
if task_row is not None:
|
|
break
|
|
|
|
merged: dict[str, Any] = dict(task_row) if task_row is not None else {}
|
|
merged.update(row)
|
|
if merged.get("id") is None:
|
|
first_id = upload_task_candidates[0] if upload_task_candidates else None
|
|
if first_id is not None:
|
|
merged["id"] = first_id
|
|
if merged.get("source_location_id") is None and source_location_candidates:
|
|
merged["source_location_id"] = source_location_candidates[0]
|
|
if task_row is None and not merged:
|
|
raise RuntimeError(f"Upload task row is unavailable for item {item_id}")
|
|
|
|
for optional_key in ("absolute_path", "target_container_name", "source_locator", "song_id"):
|
|
merged.setdefault(optional_key, None)
|
|
|
|
for optional_key in ("file_asset_id", "source_location_id", "target_backend_id"):
|
|
merged.setdefault(optional_key, None)
|
|
|
|
missing = [key for key in ("id", "target_locator") if merged.get(key) in (None, "")]
|
|
if missing:
|
|
raise RuntimeError(
|
|
f"Upload row for item {item_id} is incomplete; missing: {', '.join(missing)}"
|
|
)
|
|
return merged
|
|
|
|
def mark_item_succeeded(
|
|
self,
|
|
item_id: int,
|
|
result_payload: dict[str, Any] | None = None,
|
|
) -> bool:
|
|
with self._connection() as conn:
|
|
row = conn.execute(
|
|
"SELECT job_stage_id, status, worker_id, payload_json FROM job_items WHERE id = ?",
|
|
(item_id,),
|
|
).fetchone()
|
|
if row is None:
|
|
return False
|
|
from_status = ItemStatus(str(row["status"]))
|
|
if from_status is not ItemStatus.RUNNING:
|
|
return False
|
|
|
|
payload = _json_loads(row["payload_json"])
|
|
if result_payload:
|
|
payload.update(result_payload)
|
|
|
|
cursor = conn.execute(
|
|
"""
|
|
UPDATE job_items
|
|
SET
|
|
status = ?,
|
|
payload_json = ?,
|
|
worker_id = NULL,
|
|
ended_at = CURRENT_TIMESTAMP,
|
|
last_error = NULL,
|
|
last_error_code = NULL
|
|
WHERE id = ? AND status = ?
|
|
""",
|
|
(
|
|
ItemStatus.SUCCEEDED.value,
|
|
_json_dumps(payload),
|
|
item_id,
|
|
from_status.value,
|
|
),
|
|
)
|
|
if cursor.rowcount != 1:
|
|
return False
|
|
|
|
self._adjust_stage_item_counters(
|
|
conn,
|
|
stage_id=int(row["job_stage_id"]),
|
|
from_status=from_status,
|
|
to_status=ItemStatus.SUCCEEDED,
|
|
)
|
|
worker_id = row["worker_id"]
|
|
if worker_id is not None:
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_workers
|
|
SET
|
|
status = 'idle',
|
|
current_job_item_id = NULL,
|
|
current_song_id = NULL,
|
|
current_playlist_id = NULL,
|
|
current_display_text = NULL,
|
|
processed_count = processed_count + 1,
|
|
heartbeat_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""",
|
|
(int(worker_id),),
|
|
)
|
|
return True
|
|
|
|
def mark_item_failed(self, item_id: int, error_message: str) -> bool:
|
|
with self._connection() as conn:
|
|
row = conn.execute(
|
|
"SELECT job_stage_id, status, worker_id FROM job_items WHERE id = ?",
|
|
(item_id,),
|
|
).fetchone()
|
|
if row is None:
|
|
return False
|
|
from_status = ItemStatus(str(row["status"]))
|
|
if from_status is not ItemStatus.RUNNING:
|
|
return False
|
|
|
|
cursor = conn.execute(
|
|
"""
|
|
UPDATE job_items
|
|
SET
|
|
status = ?,
|
|
worker_id = NULL,
|
|
ended_at = CURRENT_TIMESTAMP,
|
|
last_error = ?,
|
|
last_error_code = NULL
|
|
WHERE id = ? AND status = ?
|
|
""",
|
|
(
|
|
ItemStatus.FAILED.value,
|
|
str(error_message),
|
|
item_id,
|
|
from_status.value,
|
|
),
|
|
)
|
|
if cursor.rowcount != 1:
|
|
return False
|
|
|
|
self._adjust_stage_item_counters(
|
|
conn,
|
|
stage_id=int(row["job_stage_id"]),
|
|
from_status=from_status,
|
|
to_status=ItemStatus.FAILED,
|
|
)
|
|
worker_id = row["worker_id"]
|
|
if worker_id is not None:
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_workers
|
|
SET
|
|
status = 'idle',
|
|
current_job_item_id = NULL,
|
|
current_song_id = NULL,
|
|
current_playlist_id = NULL,
|
|
current_display_text = NULL,
|
|
error_count = error_count + 1,
|
|
heartbeat_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""",
|
|
(int(worker_id),),
|
|
)
|
|
return True
|
|
|
|
def mark_item_skipped(
|
|
self,
|
|
item_id: int,
|
|
*,
|
|
reason_message: str | None = None,
|
|
reason_code: str | None = None,
|
|
) -> bool:
|
|
with self._connection() as conn:
|
|
row = conn.execute(
|
|
"SELECT job_stage_id, status, worker_id FROM job_items WHERE id = ?",
|
|
(item_id,),
|
|
).fetchone()
|
|
if row is None:
|
|
return False
|
|
from_status = ItemStatus(str(row["status"]))
|
|
if from_status is not ItemStatus.RUNNING:
|
|
return False
|
|
|
|
cursor = conn.execute(
|
|
"""
|
|
UPDATE job_items
|
|
SET
|
|
status = ?,
|
|
worker_id = NULL,
|
|
ended_at = CURRENT_TIMESTAMP,
|
|
last_error = ?,
|
|
last_error_code = ?
|
|
WHERE id = ? AND status = ?
|
|
""",
|
|
(
|
|
ItemStatus.SKIPPED.value,
|
|
str(reason_message or ""),
|
|
str(reason_code or ""),
|
|
item_id,
|
|
from_status.value,
|
|
),
|
|
)
|
|
if cursor.rowcount != 1:
|
|
return False
|
|
|
|
self._adjust_stage_item_counters(
|
|
conn,
|
|
stage_id=int(row["job_stage_id"]),
|
|
from_status=from_status,
|
|
to_status=ItemStatus.SKIPPED,
|
|
)
|
|
worker_id = row["worker_id"]
|
|
if worker_id is not None:
|
|
conn.execute(
|
|
"""
|
|
UPDATE job_workers
|
|
SET
|
|
status = 'idle',
|
|
current_job_item_id = NULL,
|
|
current_song_id = NULL,
|
|
current_playlist_id = NULL,
|
|
current_display_text = NULL,
|
|
processed_count = processed_count + 1,
|
|
heartbeat_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""",
|
|
(int(worker_id),),
|
|
)
|
|
return True
|
|
|
|
def update_worker_state(self, worker_name: str, **state: Any) -> None:
|
|
normalized_worker = str(worker_name).strip() or "worker"
|
|
with self._connection() as conn:
|
|
row = None
|
|
current_item_id = self._coerce_int(state.get("current_job_item_id"))
|
|
if current_item_id is not None:
|
|
row = conn.execute(
|
|
"""
|
|
SELECT id
|
|
FROM job_workers
|
|
WHERE worker_name = ? AND current_job_item_id = ?
|
|
ORDER BY id DESC
|
|
LIMIT 1
|
|
""",
|
|
(normalized_worker, current_item_id),
|
|
).fetchone()
|
|
if row is None:
|
|
row = conn.execute(
|
|
"""
|
|
SELECT jw.id
|
|
FROM job_workers AS jw
|
|
JOIN job_items AS ji ON ji.job_stage_id = jw.job_stage_id
|
|
WHERE jw.worker_name = ? AND ji.id = ?
|
|
ORDER BY
|
|
CASE
|
|
WHEN jw.current_job_item_id = ? THEN 0
|
|
ELSE 1
|
|
END,
|
|
jw.id DESC
|
|
LIMIT 1
|
|
""",
|
|
(normalized_worker, current_item_id, current_item_id),
|
|
).fetchone()
|
|
if row is None:
|
|
row = conn.execute(
|
|
"""
|
|
SELECT id
|
|
FROM job_workers
|
|
WHERE worker_name = ?
|
|
ORDER BY id DESC
|
|
LIMIT 1
|
|
""",
|
|
(normalized_worker,),
|
|
).fetchone()
|
|
if row is None:
|
|
worker_id = int(
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO job_workers (worker_name, status, heartbeat_at)
|
|
VALUES (?, ?, CURRENT_TIMESTAMP)
|
|
""",
|
|
(normalized_worker, str(state.get("status") or "running")),
|
|
).lastrowid
|
|
)
|
|
else:
|
|
worker_id = int(row["id"])
|
|
|
|
updates = ["heartbeat_at = CURRENT_TIMESTAMP"]
|
|
params: list[Any] = []
|
|
for field_name in (
|
|
"status",
|
|
"current_job_item_id",
|
|
"current_song_id",
|
|
"current_playlist_id",
|
|
"current_display_text",
|
|
"last_progress_text",
|
|
"downloaded_bytes",
|
|
"total_bytes",
|
|
"speed_bytes_per_sec",
|
|
"progress_percent",
|
|
):
|
|
if field_name in state:
|
|
updates.append(f"{field_name} = ?")
|
|
params.append(state[field_name])
|
|
|
|
processed_increment = int(state.get("processed_increment") or 0)
|
|
error_increment = int(state.get("error_increment") or 0)
|
|
if processed_increment:
|
|
updates.append("processed_count = processed_count + ?")
|
|
params.append(processed_increment)
|
|
if error_increment:
|
|
updates.append("error_count = error_count + ?")
|
|
params.append(error_increment)
|
|
|
|
params.append(worker_id)
|
|
conn.execute(
|
|
f"UPDATE job_workers SET {', '.join(updates)} WHERE id = ?",
|
|
tuple(params),
|
|
)
|
|
|
|
def list_task_center_rows(self, limit: int = 50) -> list[dict[str, Any]]:
|
|
queued_positions: dict[int, int] = {}
|
|
lane_queue_counts: dict[str, int] = {}
|
|
for job in self.list_queued_jobs():
|
|
lane_type = job_lane_type(job.job_type)
|
|
lane_queue_counts[lane_type] = lane_queue_counts.get(lane_type, 0) + 1
|
|
queued_positions[int(job.id)] = lane_queue_counts[lane_type]
|
|
|
|
rows = self._fetchall(
|
|
"""
|
|
SELECT *
|
|
FROM job_runs
|
|
WHERE status IN (?, ?, ?, ?, ?, ?, ?, ?)
|
|
ORDER BY
|
|
CASE status
|
|
WHEN ? THEN 0
|
|
WHEN ? THEN 1
|
|
WHEN ? THEN 2
|
|
WHEN ? THEN 3
|
|
WHEN ? THEN 4
|
|
WHEN ? THEN 5
|
|
WHEN ? THEN 6
|
|
WHEN ? THEN 7
|
|
ELSE 8
|
|
END,
|
|
priority ASC,
|
|
COALESCE(started_at, created_at) DESC,
|
|
id DESC
|
|
LIMIT ?
|
|
""",
|
|
(
|
|
JobStatus.RUNNING.value,
|
|
JobStatus.PAUSE_REQUESTED.value,
|
|
JobStatus.QUEUED.value,
|
|
JobStatus.PAUSED.value,
|
|
JobStatus.COMPLETED.value,
|
|
JobStatus.COMPLETED_WITH_ERRORS.value,
|
|
JobStatus.FAILED.value,
|
|
JobStatus.CANCELED.value,
|
|
JobStatus.RUNNING.value,
|
|
JobStatus.PAUSE_REQUESTED.value,
|
|
JobStatus.QUEUED.value,
|
|
JobStatus.PAUSED.value,
|
|
JobStatus.COMPLETED.value,
|
|
JobStatus.COMPLETED_WITH_ERRORS.value,
|
|
JobStatus.FAILED.value,
|
|
JobStatus.CANCELED.value,
|
|
int(limit),
|
|
),
|
|
)
|
|
items: list[dict[str, Any]] = []
|
|
for row in rows:
|
|
job = self._row_to_job(row)
|
|
lane_type = job_lane_type(job.job_type)
|
|
stages = self.list_job_stages(job.id)
|
|
primary_type = primary_stage_type(job.job_type)
|
|
primary_stage = next(
|
|
(stage for stage in stages if str(stage.stage_type) == str(primary_type)),
|
|
None,
|
|
)
|
|
workers = self.list_job_workers(job.id, active_only=True, limit=100)
|
|
primary_workers = workers
|
|
if primary_stage is not None:
|
|
matched_workers = [
|
|
worker for worker in workers if int(worker.get("job_stage_id") or 0) == int(primary_stage.id)
|
|
]
|
|
if matched_workers:
|
|
primary_workers = matched_workers
|
|
worker_progress_values = [
|
|
float(worker["progress_percent"])
|
|
for worker in primary_workers
|
|
if worker.get("progress_percent") is not None
|
|
]
|
|
worker_progress_texts = [
|
|
str(worker.get("last_progress_text") or "")
|
|
for worker in primary_workers
|
|
if str(worker.get("last_progress_text") or "").strip()
|
|
]
|
|
speed_bytes_per_sec = sum(int(worker.get("speed_bytes_per_sec") or 0) for worker in primary_workers)
|
|
downloaded_bytes = sum(int(worker.get("downloaded_bytes") or 0) for worker in primary_workers)
|
|
total_bytes = sum(int(worker.get("total_bytes") or 0) for worker in primary_workers)
|
|
queue_position = queued_positions.get(int(job.id))
|
|
items.append(
|
|
{
|
|
"id": int(job.id),
|
|
"job_type": str(job.job_type),
|
|
"status": job.status.value,
|
|
"priority": int(job.priority),
|
|
"display_name": display_name(job.job_type, job.playlist_scope),
|
|
"lane_type": lane_type,
|
|
"queue_label": f"queued #{queue_position}" if queue_position else "",
|
|
"scope_summary": _scope_summary(job.playlist_scope),
|
|
"primary_stage_type": str(primary_stage.stage_type) if primary_stage is not None else primary_type,
|
|
"active_worker_count": len(primary_workers),
|
|
"primary_progress_percent": _primary_progress_percent(
|
|
primary_stage,
|
|
worker_progress_values,
|
|
),
|
|
"primary_progress_text": _primary_progress_text(
|
|
primary_stage,
|
|
worker_progress_texts,
|
|
),
|
|
"speed_bytes_per_sec": speed_bytes_per_sec,
|
|
"speed_text": _format_speed_text(speed_bytes_per_sec),
|
|
"downloaded_bytes": downloaded_bytes,
|
|
"total_bytes": total_bytes,
|
|
"created_at": job.created_at,
|
|
"started_at": job.started_at,
|
|
"ended_at": job.ended_at,
|
|
"last_error": job.last_error,
|
|
"playlist_scope": dict(job.playlist_scope or {}),
|
|
}
|
|
)
|
|
return items
|
|
|
|
def list_job_workers(
|
|
self,
|
|
job_id: int,
|
|
*,
|
|
active_only: bool = False,
|
|
limit: int = 50,
|
|
) -> list[dict[str, Any]]:
|
|
where_parts = ["jw.job_run_id = ?"]
|
|
params: list[Any] = [int(job_id)]
|
|
if active_only:
|
|
where_parts.append("(jw.status != 'idle' OR jw.current_job_item_id IS NOT NULL)")
|
|
params.append(int(limit))
|
|
rows = self._fetchall(
|
|
f"""
|
|
SELECT
|
|
jw.*,
|
|
js.stage_type,
|
|
ji.item_type,
|
|
ji.item_key,
|
|
s.name AS song_name,
|
|
p.name AS playlist_name
|
|
FROM job_workers AS jw
|
|
LEFT JOIN job_stages AS js ON js.id = jw.job_stage_id
|
|
LEFT JOIN job_items AS ji ON ji.id = jw.current_job_item_id
|
|
LEFT JOIN songs AS s ON s.id = COALESCE(jw.current_song_id, ji.song_id)
|
|
LEFT JOIN playlists AS p ON p.id = COALESCE(jw.current_playlist_id, ji.playlist_id)
|
|
WHERE {' AND '.join(where_parts)}
|
|
ORDER BY
|
|
CASE
|
|
WHEN jw.status = 'running' THEN 0
|
|
WHEN jw.status = 'paused' THEN 1
|
|
ELSE 2
|
|
END,
|
|
COALESCE(jw.heartbeat_at, '') DESC,
|
|
jw.id DESC
|
|
LIMIT ?
|
|
""",
|
|
tuple(params),
|
|
)
|
|
return [dict(row) for row in rows]
|
|
|
|
def stage_has_open_items(self, stage_id: int) -> bool:
|
|
row = self._fetchone(
|
|
"""
|
|
SELECT COUNT(1) AS cnt
|
|
FROM job_items
|
|
WHERE job_stage_id = ? AND status IN (?, ?, ?)
|
|
""",
|
|
(
|
|
int(stage_id),
|
|
ItemStatus.PENDING.value,
|
|
ItemStatus.RUNNING.value,
|
|
ItemStatus.INTERRUPTED.value,
|
|
),
|
|
)
|
|
return bool(row and int(row["cnt"]) > 0)
|
|
|
|
def playlist_has_open_items(self, stage_id: int, playlist_id: int) -> bool:
|
|
row = self._fetchone(
|
|
"""
|
|
SELECT COUNT(1) AS cnt
|
|
FROM job_items
|
|
WHERE job_stage_id = ? AND playlist_id = ? AND status IN (?, ?, ?)
|
|
""",
|
|
(
|
|
int(stage_id),
|
|
int(playlist_id),
|
|
ItemStatus.PENDING.value,
|
|
ItemStatus.RUNNING.value,
|
|
ItemStatus.INTERRUPTED.value,
|
|
),
|
|
)
|
|
return bool(row and int(row["cnt"]) > 0)
|
|
|
|
def list_job_items_with_context(
|
|
self,
|
|
job_id: int,
|
|
*,
|
|
statuses: list[str] | None = None,
|
|
limit: int = 100,
|
|
) -> list[dict[str, Any]]:
|
|
query = [
|
|
"""
|
|
SELECT
|
|
i.*,
|
|
s.stage_type,
|
|
p.name AS playlist_name,
|
|
sg.name AS song_name,
|
|
sg.singers AS song_singers
|
|
FROM job_items AS i
|
|
JOIN job_stages AS s ON s.id = i.job_stage_id
|
|
LEFT JOIN playlists AS p ON p.id = i.playlist_id
|
|
LEFT JOIN songs AS sg ON sg.id = i.song_id
|
|
WHERE s.job_run_id = ?
|
|
"""
|
|
]
|
|
params: list[Any] = [int(job_id)]
|
|
if statuses:
|
|
placeholders = ", ".join("?" for _ in statuses)
|
|
query.append(f"AND i.status IN ({placeholders})")
|
|
params.extend(str(status) for status in statuses)
|
|
query.append("ORDER BY i.id DESC LIMIT ?")
|
|
params.append(int(limit))
|
|
rows = self._fetchall("\n".join(query), tuple(params))
|
|
items: list[dict[str, Any]] = []
|
|
for row in rows:
|
|
item = dict(row)
|
|
payload = _json_loads(row["payload_json"])
|
|
item["payload"] = payload
|
|
item["display_name"] = (
|
|
item.get("song_name")
|
|
or item.get("playlist_name")
|
|
or str(payload.get("display_name") or payload.get("name") or row["item_key"])
|
|
)
|
|
items.append(item)
|
|
return items
|
|
|
|
def list_job_playlist_progress(self, job_id: int) -> list[dict[str, Any]]:
|
|
job = self.get_job(job_id)
|
|
if job is None:
|
|
return []
|
|
playlist_ids = self._normalize_playlist_scope_ids(job.playlist_scope)
|
|
if not playlist_ids:
|
|
return []
|
|
placeholders = ", ".join("?" for _ in playlist_ids)
|
|
rows = self._fetchall(
|
|
f"""
|
|
WITH scoped_playlists AS (
|
|
SELECT
|
|
p.id AS playlist_id,
|
|
p.platform,
|
|
p.remote_playlist_id,
|
|
p.name AS playlist_name
|
|
FROM playlists AS p
|
|
WHERE p.id IN ({placeholders})
|
|
),
|
|
local_downloaded_songs AS (
|
|
SELECT DISTINCT fa.song_id
|
|
FROM file_locations AS fl
|
|
JOIN file_assets AS fa ON fa.id = fl.file_asset_id
|
|
JOIN storage_backends AS sb ON sb.id = fl.backend_id
|
|
WHERE fl.status = 'active'
|
|
AND sb.backend_type = 'local_fs'
|
|
),
|
|
job_song_status AS (
|
|
SELECT
|
|
ji.song_id,
|
|
MAX(CASE WHEN ji.status = 'succeeded' THEN 1 ELSE 0 END) AS succeeded_flag,
|
|
MAX(CASE WHEN ji.status = 'running' THEN 1 ELSE 0 END) AS running_flag,
|
|
MAX(CASE WHEN ji.status IN ('pending', 'interrupted') THEN 1 ELSE 0 END) AS pending_flag,
|
|
MAX(CASE WHEN ji.status = 'failed' THEN 1 ELSE 0 END) AS failed_flag,
|
|
MAX(CASE WHEN ji.status = 'skipped' THEN 1 ELSE 0 END) AS skipped_flag
|
|
FROM job_items AS ji
|
|
JOIN job_stages AS js ON js.id = ji.job_stage_id
|
|
WHERE js.job_run_id = ?
|
|
AND js.stage_type = 'download'
|
|
AND ji.song_id IS NOT NULL
|
|
GROUP BY ji.song_id
|
|
),
|
|
song_states AS (
|
|
SELECT
|
|
sp.playlist_id,
|
|
sp.platform,
|
|
sp.remote_playlist_id,
|
|
sp.playlist_name,
|
|
ps.song_id,
|
|
CASE
|
|
WHEN lds.song_id IS NOT NULL OR COALESCE(jss.succeeded_flag, 0) = 1 THEN 'downloaded'
|
|
WHEN COALESCE(jss.running_flag, 0) = 1 THEN 'running'
|
|
WHEN COALESCE(jss.pending_flag, 0) = 1 THEN 'pending'
|
|
WHEN COALESCE(jss.failed_flag, 0) = 1 THEN 'failed'
|
|
WHEN COALESCE(jss.skipped_flag, 0) = 1 THEN 'skipped'
|
|
ELSE 'pending'
|
|
END AS song_state
|
|
FROM scoped_playlists AS sp
|
|
LEFT JOIN playlist_songs AS ps ON ps.playlist_id = sp.playlist_id
|
|
LEFT JOIN local_downloaded_songs AS lds ON lds.song_id = ps.song_id
|
|
LEFT JOIN job_song_status AS jss ON jss.song_id = ps.song_id
|
|
)
|
|
SELECT
|
|
playlist_id,
|
|
platform,
|
|
remote_playlist_id,
|
|
playlist_name,
|
|
COUNT(DISTINCT song_id) AS total_songs,
|
|
COUNT(DISTINCT CASE WHEN song_state = 'downloaded' THEN song_id END) AS downloaded_songs,
|
|
COUNT(DISTINCT CASE WHEN song_state = 'running' THEN song_id END) AS running_songs,
|
|
COUNT(DISTINCT CASE WHEN song_state = 'pending' THEN song_id END) AS pending_songs,
|
|
COUNT(DISTINCT CASE WHEN song_state = 'failed' THEN song_id END) AS failed_songs,
|
|
COUNT(DISTINCT CASE WHEN song_state = 'skipped' THEN song_id END) AS skipped_songs
|
|
FROM song_states
|
|
GROUP BY playlist_id, platform, remote_playlist_id, playlist_name
|
|
ORDER BY playlist_name ASC, playlist_id ASC
|
|
""",
|
|
tuple(playlist_ids + [int(job_id)]),
|
|
)
|
|
items: list[dict[str, Any]] = []
|
|
for row in rows:
|
|
payload = dict(row)
|
|
payload["progress_percent"] = _progress_percent(
|
|
int(payload.get("downloaded_songs") or 0),
|
|
int(payload.get("total_songs") or 0),
|
|
)
|
|
items.append(payload)
|
|
return items
|
|
|
|
def list_job_playlist_song_progress(
|
|
self,
|
|
job_id: int,
|
|
playlist_id: int,
|
|
*,
|
|
limit: int = 500,
|
|
) -> list[dict[str, Any]]:
|
|
job = self.get_job(job_id)
|
|
if job is None:
|
|
return []
|
|
scoped_playlist_ids = set(self._normalize_playlist_scope_ids(job.playlist_scope))
|
|
if not scoped_playlist_ids or int(playlist_id) not in scoped_playlist_ids:
|
|
return []
|
|
normalized_limit = max(min(int(limit), 2000), 1)
|
|
|
|
rows = self._fetchall(
|
|
"""
|
|
WITH local_downloaded_songs AS (
|
|
SELECT DISTINCT fa.song_id
|
|
FROM file_locations AS fl
|
|
JOIN file_assets AS fa ON fa.id = fl.file_asset_id
|
|
JOIN storage_backends AS sb ON sb.id = fl.backend_id
|
|
WHERE fl.status = 'active'
|
|
AND sb.backend_type = 'local_fs'
|
|
),
|
|
job_song_status AS (
|
|
SELECT
|
|
ji.song_id,
|
|
MAX(CASE WHEN ji.status = 'succeeded' THEN 1 ELSE 0 END) AS succeeded_flag,
|
|
MAX(CASE WHEN ji.status = 'running' THEN 1 ELSE 0 END) AS running_flag,
|
|
MAX(CASE WHEN ji.status IN ('pending', 'interrupted') THEN 1 ELSE 0 END) AS pending_flag,
|
|
MAX(CASE WHEN ji.status = 'failed' THEN 1 ELSE 0 END) AS failed_flag,
|
|
MAX(CASE WHEN ji.status = 'skipped' THEN 1 ELSE 0 END) AS skipped_flag,
|
|
MAX(CASE WHEN ji.status = 'failed' THEN ji.last_error ELSE '' END) AS failed_reason,
|
|
MAX(CASE WHEN ji.status = 'skipped' THEN ji.last_error ELSE '' END) AS skipped_reason
|
|
FROM job_items AS ji
|
|
JOIN job_stages AS js ON js.id = ji.job_stage_id
|
|
WHERE js.job_run_id = ?
|
|
AND js.stage_type = 'download'
|
|
AND ji.song_id IS NOT NULL
|
|
GROUP BY ji.song_id
|
|
)
|
|
SELECT
|
|
ps.song_id,
|
|
ps.position,
|
|
s.platform,
|
|
s.remote_song_id,
|
|
s.name AS song_name,
|
|
s.singers,
|
|
s.metadata_json,
|
|
CASE
|
|
WHEN lds.song_id IS NOT NULL OR COALESCE(jss.succeeded_flag, 0) = 1 THEN 'downloaded'
|
|
WHEN COALESCE(jss.running_flag, 0) = 1 THEN 'running'
|
|
WHEN COALESCE(jss.pending_flag, 0) = 1 THEN 'pending'
|
|
WHEN COALESCE(jss.failed_flag, 0) = 1 THEN 'failed'
|
|
WHEN COALESCE(jss.skipped_flag, 0) = 1 THEN 'skipped'
|
|
ELSE 'pending'
|
|
END AS song_state,
|
|
CASE
|
|
WHEN COALESCE(jss.failed_reason, '') != '' THEN jss.failed_reason
|
|
WHEN COALESCE(jss.skipped_reason, '') != '' THEN jss.skipped_reason
|
|
ELSE ''
|
|
END AS status_note
|
|
FROM playlist_songs AS ps
|
|
JOIN songs AS s ON s.id = ps.song_id
|
|
LEFT JOIN local_downloaded_songs AS lds ON lds.song_id = ps.song_id
|
|
LEFT JOIN job_song_status AS jss ON jss.song_id = ps.song_id
|
|
WHERE ps.playlist_id = ?
|
|
ORDER BY COALESCE(ps.position, 2147483647) ASC, ps.song_id ASC
|
|
LIMIT ?
|
|
""",
|
|
(int(job_id), int(playlist_id), int(normalized_limit)),
|
|
)
|
|
items: list[dict[str, Any]] = []
|
|
for row in rows:
|
|
payload = dict(row)
|
|
metadata = _json_loads(payload.get("metadata_json"))
|
|
snapshot = metadata.get("snapshot") if isinstance(metadata, dict) else {}
|
|
raw_data = snapshot.get("raw_data") if isinstance(snapshot, dict) else {}
|
|
search = raw_data.get("search") if isinstance(raw_data, dict) else {}
|
|
is_non_music_resource = bool(
|
|
isinstance(search, dict)
|
|
and bool(search.get("qq_toplist_fallback"))
|
|
) or str(payload.get("remote_song_id") or "").strip().lower().startswith("qqtop_")
|
|
status_note = str(payload.get("status_note") or "").strip()
|
|
if is_non_music_resource and "非音乐资源" not in status_note:
|
|
status_note = "非音乐资源(有声榜条目)"
|
|
items.append(
|
|
{
|
|
"song_id": int(payload["song_id"]),
|
|
"position": payload.get("position"),
|
|
"platform": payload.get("platform"),
|
|
"remote_song_id": payload.get("remote_song_id"),
|
|
"song_name": payload.get("song_name"),
|
|
"singers": payload.get("singers"),
|
|
"status": payload.get("song_state"),
|
|
"status_note": status_note,
|
|
"is_non_music_resource": bool(is_non_music_resource),
|
|
}
|
|
)
|
|
return items
|
|
|
|
def get_download_stats(self) -> dict[str, int]:
|
|
total_songs_row = self._fetchone("SELECT COUNT(*) AS count_value FROM songs")
|
|
local_summary = self._fetchone(
|
|
"""
|
|
SELECT
|
|
COUNT(DISTINCT fa.song_id) AS downloaded_songs,
|
|
COUNT(DISTINCT fa.id) AS local_file_assets,
|
|
COUNT(DISTINCT fl.id) AS local_file_locations
|
|
FROM file_locations AS fl
|
|
JOIN file_assets AS fa ON fa.id = fl.file_asset_id
|
|
JOIN storage_backends AS sb ON sb.id = fl.backend_id
|
|
WHERE fl.status = 'active' AND sb.backend_type = 'local_fs'
|
|
"""
|
|
)
|
|
return {
|
|
"total_songs": int(total_songs_row["count_value"]) if total_songs_row else 0,
|
|
"downloaded_songs": int(local_summary["downloaded_songs"]) if local_summary else 0,
|
|
"local_file_assets": int(local_summary["local_file_assets"]) if local_summary else 0,
|
|
"local_file_locations": int(local_summary["local_file_locations"]) if local_summary else 0,
|
|
}
|
|
|
|
def _row_to_config_revision(self, row: sqlite3.Row) -> dict[str, Any]:
|
|
return {
|
|
"id": int(row["id"]),
|
|
"source_type": str(row["source_type"]),
|
|
"file_path": str(row["file_path"]),
|
|
"content_text": str(row["content_text"]),
|
|
"content_hash": str(row["content_hash"]),
|
|
"created_at": row["created_at"],
|
|
"applied_at": row["applied_at"],
|
|
"note": row["note"],
|
|
}
|
|
|
|
def create_config_revision(
|
|
self,
|
|
*,
|
|
source_type: str = "env_file",
|
|
file_path: str,
|
|
content_text: str,
|
|
content_hash: str,
|
|
note: str | None = None,
|
|
) -> int:
|
|
with self._connection() as conn:
|
|
try:
|
|
return int(
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO config_revisions (
|
|
source_type,
|
|
file_path,
|
|
content_text,
|
|
content_hash,
|
|
note
|
|
)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
(source_type, file_path, content_text, content_hash, note),
|
|
).lastrowid
|
|
)
|
|
except sqlite3.IntegrityError:
|
|
row = conn.execute(
|
|
"""
|
|
SELECT id FROM config_revisions
|
|
WHERE source_type = ? AND file_path = ? AND content_hash = ?
|
|
""",
|
|
(source_type, file_path, content_hash),
|
|
).fetchone()
|
|
if row is None:
|
|
raise
|
|
return int(row["id"])
|
|
|
|
def get_config_revision(self, revision_id: int) -> dict[str, Any] | None:
|
|
row = self._fetchone("SELECT * FROM config_revisions WHERE id = ?", (revision_id,))
|
|
if row is None:
|
|
return None
|
|
return self._row_to_config_revision(row)
|
|
|
|
def list_config_revisions(self, limit: int = 50) -> list[dict[str, Any]]:
|
|
with self._connection() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT * FROM config_revisions
|
|
ORDER BY id DESC
|
|
LIMIT ?
|
|
""",
|
|
(int(limit),),
|
|
).fetchall()
|
|
return [self._row_to_config_revision(row) for row in rows]
|
|
|
|
def mark_config_revision_applied(self, revision_id: int) -> None:
|
|
with self._connection() as conn:
|
|
conn.execute(
|
|
"""
|
|
UPDATE config_revisions
|
|
SET applied_at = COALESCE(applied_at, CURRENT_TIMESTAMP)
|
|
WHERE id = ?
|
|
""",
|
|
(revision_id,),
|
|
)
|