Files

307 lines
13 KiB
Python

import tempfile
import unittest
from pathlib import Path
class OpsRepositoryTaskCenterTests(unittest.TestCase):
def test_claim_item_resets_stale_throughput_for_reused_worker(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={},
status=JobStatus.RUNNING,
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
first_item_id = repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:first",
song_id=101,
status=ItemStatus.PENDING,
)
repo.claim_item(item_id=first_item_id, worker_name="download-1")
repo.update_worker_state(
worker_name="download-1",
status="running",
current_job_item_id=first_item_id,
downloaded_bytes=3 * 1024 * 1024,
total_bytes=6 * 1024 * 1024,
speed_bytes_per_sec=3 * 1024 * 1024,
progress_percent=50,
last_progress_text="3.00MB/6.00MB",
)
self.assertTrue(repo.mark_item_succeeded(first_item_id))
second_item_id = repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:second",
song_id=102,
status=ItemStatus.PENDING,
)
repo.claim_item(item_id=second_item_id, worker_name="download-1")
worker_row = repo._fetchone(
"""
SELECT current_job_item_id, downloaded_bytes, total_bytes, speed_bytes_per_sec, progress_percent, last_progress_text
FROM job_workers
WHERE job_run_id = ? AND job_stage_id = ? AND worker_name = ?
ORDER BY id DESC
LIMIT 1
""",
(job_id, stage_id, "download-1"),
)
self.assertEqual(second_item_id, int(worker_row["current_job_item_id"]))
self.assertEqual(0, int(worker_row["downloaded_bytes"] or 0))
self.assertEqual(0, int(worker_row["total_bytes"] or 0))
self.assertEqual(0, int(worker_row["speed_bytes_per_sec"] or 0))
self.assertEqual(0, int(worker_row["progress_percent"] or 0))
self.assertFalse(str(worker_row["last_progress_text"] or "").strip())
def test_update_worker_state_targets_matching_row_for_current_job_item(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
first_job_id = repo.create_job(
job_type="download_only",
config_snapshot={},
status=JobStatus.RUNNING,
)
first_stage_id = repo.create_stage(
job_run_id=first_job_id,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
first_item_id = repo.create_item(
job_stage_id=first_stage_id,
item_type="song_download",
item_key="song:first",
song_id=201,
status=ItemStatus.PENDING,
)
repo.claim_item(item_id=first_item_id, worker_name="download-1")
first_worker_row = repo._fetchone(
"""
SELECT id
FROM job_workers
WHERE job_run_id = ? AND job_stage_id = ? AND worker_name = ?
ORDER BY id DESC
LIMIT 1
""",
(first_job_id, first_stage_id, "download-1"),
)
second_job_id = repo.create_job(
job_type="download_only",
config_snapshot={},
status=JobStatus.RUNNING,
)
second_stage_id = repo.create_stage(
job_run_id=second_job_id,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
second_item_id = repo.create_item(
job_stage_id=second_stage_id,
item_type="song_download",
item_key="song:second",
song_id=202,
status=ItemStatus.PENDING,
)
repo.claim_item(item_id=second_item_id, worker_name="download-1")
second_worker_row = repo._fetchone(
"""
SELECT id
FROM job_workers
WHERE job_run_id = ? AND job_stage_id = ? AND worker_name = ?
ORDER BY id DESC
LIMIT 1
""",
(second_job_id, second_stage_id, "download-1"),
)
repo.update_worker_state(
worker_name="download-1",
current_job_item_id=first_item_id,
status="running",
current_song_id=201,
current_display_text="Song One",
speed_bytes_per_sec=1234,
)
worker_rows = repo._fetchall(
"""
SELECT id, current_job_item_id, current_song_id, current_display_text, speed_bytes_per_sec
FROM job_workers
WHERE id IN (?, ?)
ORDER BY id ASC
""",
(int(first_worker_row["id"]), int(second_worker_row["id"])),
)
rows_by_id = {int(row["id"]): row for row in worker_rows}
self.assertEqual(first_item_id, int(rows_by_id[int(first_worker_row["id"])]["current_job_item_id"]))
self.assertEqual(201, int(rows_by_id[int(first_worker_row["id"])]["current_song_id"]))
self.assertEqual("Song One", rows_by_id[int(first_worker_row["id"])]["current_display_text"])
self.assertEqual(1234, int(rows_by_id[int(first_worker_row["id"])]["speed_bytes_per_sec"] or 0))
self.assertEqual(second_item_id, int(rows_by_id[int(second_worker_row["id"])]["current_job_item_id"]))
self.assertEqual(0, int(rows_by_id[int(second_worker_row["id"])]["speed_bytes_per_sec"] or 0))
def test_list_task_center_rows_computes_lane_queue_progress_and_speed(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
running_download_job_id = repo.create_job(
job_type="download_only",
config_snapshot={},
status=JobStatus.RUNNING,
playlist_scope={"playlist_ids": [7]},
)
running_download_stage_id = repo.create_stage(
job_run_id=running_download_job_id,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
repo.create_item(
job_stage_id=running_download_stage_id,
item_type="song_download",
item_key="song:done",
song_id=101,
status=ItemStatus.SUCCEEDED,
)
running_item_id = repo.create_item(
job_stage_id=running_download_stage_id,
item_type="song_download",
item_key="song:running",
song_id=102,
status=ItemStatus.PENDING,
)
repo.claim_item(item_id=running_item_id, worker_name="download-1")
repo.update_worker_state(
worker_name="download-1",
status="running",
current_job_item_id=running_item_id,
downloaded_bytes=3 * 1024 * 1024,
total_bytes=6 * 1024 * 1024,
speed_bytes_per_sec=3 * 1024 * 1024,
progress_percent=50,
last_progress_text="1 / 2 songs",
)
queued_download_job_id = repo.create_job(
job_type="catalog_sync",
config_snapshot={},
status=JobStatus.QUEUED,
playlist_scope={"playlist_ids": [9]},
)
running_general_job_id = repo.create_job(
job_type="sync_only",
config_snapshot={},
status=JobStatus.RUNNING,
playlist_scope={"playlist_ids": [11, 12]},
)
active_job_ids = [job.id for job in repo.list_active_jobs()]
queued_job_ids = [job.id for job in repo.list_queued_jobs()]
rows = repo.list_task_center_rows(limit=20)
self.assertIn(running_download_job_id, active_job_ids)
self.assertIn(running_general_job_id, active_job_ids)
self.assertIn(queued_download_job_id, queued_job_ids)
rows_by_id = {int(row["id"]): row for row in rows}
running_download = rows_by_id[running_download_job_id]
self.assertEqual("download", running_download["lane_type"])
self.assertEqual(1, running_download["active_worker_count"])
self.assertEqual(50, running_download["primary_progress_percent"])
self.assertEqual(3 * 1024 * 1024, running_download["speed_bytes_per_sec"])
self.assertEqual("3.0 MB/s", running_download["speed_text"])
self.assertIn("1 / 2 songs", str(running_download["primary_progress_text"]))
queued_download = rows_by_id[queued_download_job_id]
self.assertEqual("queued #1", queued_download["queue_label"])
running_general = rows_by_id[running_general_job_id]
self.assertEqual("general", running_general["lane_type"])
self.assertEqual("2 playlists", running_general["scope_summary"])
def test_list_task_center_rows_includes_paused_jobs(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import JobStatus
from musicdl.catalogsync.ops.repository import OpsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
paused_job_id = repo.create_job(
job_type="download_only",
config_snapshot={},
status=JobStatus.PAUSED,
playlist_scope={"playlist_ids": [23]},
)
rows = repo.list_task_center_rows(limit=20)
rows_by_id = {int(row["id"]): row for row in rows}
self.assertIn(paused_job_id, rows_by_id)
self.assertEqual("paused", rows_by_id[paused_job_id]["status"])
def test_list_task_center_rows_includes_completed_jobs(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import JobStatus
from musicdl.catalogsync.ops.repository import OpsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
completed_job_id = repo.create_job(
job_type="download_only",
config_snapshot={},
status=JobStatus.COMPLETED,
playlist_scope={"playlist_ids": [42]},
)
rows = repo.list_task_center_rows(limit=20)
rows_by_id = {int(row["id"]): row for row in rows}
self.assertIn(completed_job_id, rows_by_id)
self.assertEqual("completed", rows_by_id[completed_job_id]["status"])
if __name__ == "__main__":
unittest.main()