307 lines
13 KiB
Python
307 lines
13 KiB
Python
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
|
|
class OpsRepositoryTaskCenterTests(unittest.TestCase):
|
|
def test_claim_item_resets_stale_throughput_for_reused_worker(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
initialize_database(db_path).close()
|
|
repo = OpsRepository(db_path)
|
|
|
|
job_id = repo.create_job(
|
|
job_type="download_only",
|
|
config_snapshot={},
|
|
status=JobStatus.RUNNING,
|
|
)
|
|
stage_id = repo.create_stage(
|
|
job_run_id=job_id,
|
|
stage_type="download",
|
|
seq_no=1,
|
|
status=StageStatus.RUNNING,
|
|
)
|
|
first_item_id = repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="song_download",
|
|
item_key="song:first",
|
|
song_id=101,
|
|
status=ItemStatus.PENDING,
|
|
)
|
|
repo.claim_item(item_id=first_item_id, worker_name="download-1")
|
|
repo.update_worker_state(
|
|
worker_name="download-1",
|
|
status="running",
|
|
current_job_item_id=first_item_id,
|
|
downloaded_bytes=3 * 1024 * 1024,
|
|
total_bytes=6 * 1024 * 1024,
|
|
speed_bytes_per_sec=3 * 1024 * 1024,
|
|
progress_percent=50,
|
|
last_progress_text="3.00MB/6.00MB",
|
|
)
|
|
self.assertTrue(repo.mark_item_succeeded(first_item_id))
|
|
|
|
second_item_id = repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="song_download",
|
|
item_key="song:second",
|
|
song_id=102,
|
|
status=ItemStatus.PENDING,
|
|
)
|
|
repo.claim_item(item_id=second_item_id, worker_name="download-1")
|
|
|
|
worker_row = repo._fetchone(
|
|
"""
|
|
SELECT current_job_item_id, downloaded_bytes, total_bytes, speed_bytes_per_sec, progress_percent, last_progress_text
|
|
FROM job_workers
|
|
WHERE job_run_id = ? AND job_stage_id = ? AND worker_name = ?
|
|
ORDER BY id DESC
|
|
LIMIT 1
|
|
""",
|
|
(job_id, stage_id, "download-1"),
|
|
)
|
|
|
|
self.assertEqual(second_item_id, int(worker_row["current_job_item_id"]))
|
|
self.assertEqual(0, int(worker_row["downloaded_bytes"] or 0))
|
|
self.assertEqual(0, int(worker_row["total_bytes"] or 0))
|
|
self.assertEqual(0, int(worker_row["speed_bytes_per_sec"] or 0))
|
|
self.assertEqual(0, int(worker_row["progress_percent"] or 0))
|
|
self.assertFalse(str(worker_row["last_progress_text"] or "").strip())
|
|
|
|
def test_update_worker_state_targets_matching_row_for_current_job_item(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
initialize_database(db_path).close()
|
|
repo = OpsRepository(db_path)
|
|
|
|
first_job_id = repo.create_job(
|
|
job_type="download_only",
|
|
config_snapshot={},
|
|
status=JobStatus.RUNNING,
|
|
)
|
|
first_stage_id = repo.create_stage(
|
|
job_run_id=first_job_id,
|
|
stage_type="download",
|
|
seq_no=1,
|
|
status=StageStatus.RUNNING,
|
|
)
|
|
first_item_id = repo.create_item(
|
|
job_stage_id=first_stage_id,
|
|
item_type="song_download",
|
|
item_key="song:first",
|
|
song_id=201,
|
|
status=ItemStatus.PENDING,
|
|
)
|
|
repo.claim_item(item_id=first_item_id, worker_name="download-1")
|
|
first_worker_row = repo._fetchone(
|
|
"""
|
|
SELECT id
|
|
FROM job_workers
|
|
WHERE job_run_id = ? AND job_stage_id = ? AND worker_name = ?
|
|
ORDER BY id DESC
|
|
LIMIT 1
|
|
""",
|
|
(first_job_id, first_stage_id, "download-1"),
|
|
)
|
|
|
|
second_job_id = repo.create_job(
|
|
job_type="download_only",
|
|
config_snapshot={},
|
|
status=JobStatus.RUNNING,
|
|
)
|
|
second_stage_id = repo.create_stage(
|
|
job_run_id=second_job_id,
|
|
stage_type="download",
|
|
seq_no=1,
|
|
status=StageStatus.RUNNING,
|
|
)
|
|
second_item_id = repo.create_item(
|
|
job_stage_id=second_stage_id,
|
|
item_type="song_download",
|
|
item_key="song:second",
|
|
song_id=202,
|
|
status=ItemStatus.PENDING,
|
|
)
|
|
repo.claim_item(item_id=second_item_id, worker_name="download-1")
|
|
second_worker_row = repo._fetchone(
|
|
"""
|
|
SELECT id
|
|
FROM job_workers
|
|
WHERE job_run_id = ? AND job_stage_id = ? AND worker_name = ?
|
|
ORDER BY id DESC
|
|
LIMIT 1
|
|
""",
|
|
(second_job_id, second_stage_id, "download-1"),
|
|
)
|
|
|
|
repo.update_worker_state(
|
|
worker_name="download-1",
|
|
current_job_item_id=first_item_id,
|
|
status="running",
|
|
current_song_id=201,
|
|
current_display_text="Song One",
|
|
speed_bytes_per_sec=1234,
|
|
)
|
|
|
|
worker_rows = repo._fetchall(
|
|
"""
|
|
SELECT id, current_job_item_id, current_song_id, current_display_text, speed_bytes_per_sec
|
|
FROM job_workers
|
|
WHERE id IN (?, ?)
|
|
ORDER BY id ASC
|
|
""",
|
|
(int(first_worker_row["id"]), int(second_worker_row["id"])),
|
|
)
|
|
|
|
rows_by_id = {int(row["id"]): row for row in worker_rows}
|
|
self.assertEqual(first_item_id, int(rows_by_id[int(first_worker_row["id"])]["current_job_item_id"]))
|
|
self.assertEqual(201, int(rows_by_id[int(first_worker_row["id"])]["current_song_id"]))
|
|
self.assertEqual("Song One", rows_by_id[int(first_worker_row["id"])]["current_display_text"])
|
|
self.assertEqual(1234, int(rows_by_id[int(first_worker_row["id"])]["speed_bytes_per_sec"] or 0))
|
|
self.assertEqual(second_item_id, int(rows_by_id[int(second_worker_row["id"])]["current_job_item_id"]))
|
|
self.assertEqual(0, int(rows_by_id[int(second_worker_row["id"])]["speed_bytes_per_sec"] or 0))
|
|
|
|
def test_list_task_center_rows_computes_lane_queue_progress_and_speed(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
initialize_database(db_path).close()
|
|
repo = OpsRepository(db_path)
|
|
|
|
running_download_job_id = repo.create_job(
|
|
job_type="download_only",
|
|
config_snapshot={},
|
|
status=JobStatus.RUNNING,
|
|
playlist_scope={"playlist_ids": [7]},
|
|
)
|
|
running_download_stage_id = repo.create_stage(
|
|
job_run_id=running_download_job_id,
|
|
stage_type="download",
|
|
seq_no=1,
|
|
status=StageStatus.RUNNING,
|
|
)
|
|
repo.create_item(
|
|
job_stage_id=running_download_stage_id,
|
|
item_type="song_download",
|
|
item_key="song:done",
|
|
song_id=101,
|
|
status=ItemStatus.SUCCEEDED,
|
|
)
|
|
running_item_id = repo.create_item(
|
|
job_stage_id=running_download_stage_id,
|
|
item_type="song_download",
|
|
item_key="song:running",
|
|
song_id=102,
|
|
status=ItemStatus.PENDING,
|
|
)
|
|
repo.claim_item(item_id=running_item_id, worker_name="download-1")
|
|
repo.update_worker_state(
|
|
worker_name="download-1",
|
|
status="running",
|
|
current_job_item_id=running_item_id,
|
|
downloaded_bytes=3 * 1024 * 1024,
|
|
total_bytes=6 * 1024 * 1024,
|
|
speed_bytes_per_sec=3 * 1024 * 1024,
|
|
progress_percent=50,
|
|
last_progress_text="1 / 2 songs",
|
|
)
|
|
|
|
queued_download_job_id = repo.create_job(
|
|
job_type="catalog_sync",
|
|
config_snapshot={},
|
|
status=JobStatus.QUEUED,
|
|
playlist_scope={"playlist_ids": [9]},
|
|
)
|
|
running_general_job_id = repo.create_job(
|
|
job_type="sync_only",
|
|
config_snapshot={},
|
|
status=JobStatus.RUNNING,
|
|
playlist_scope={"playlist_ids": [11, 12]},
|
|
)
|
|
|
|
active_job_ids = [job.id for job in repo.list_active_jobs()]
|
|
queued_job_ids = [job.id for job in repo.list_queued_jobs()]
|
|
rows = repo.list_task_center_rows(limit=20)
|
|
|
|
self.assertIn(running_download_job_id, active_job_ids)
|
|
self.assertIn(running_general_job_id, active_job_ids)
|
|
self.assertIn(queued_download_job_id, queued_job_ids)
|
|
|
|
rows_by_id = {int(row["id"]): row for row in rows}
|
|
|
|
running_download = rows_by_id[running_download_job_id]
|
|
self.assertEqual("download", running_download["lane_type"])
|
|
self.assertEqual(1, running_download["active_worker_count"])
|
|
self.assertEqual(50, running_download["primary_progress_percent"])
|
|
self.assertEqual(3 * 1024 * 1024, running_download["speed_bytes_per_sec"])
|
|
self.assertEqual("3.0 MB/s", running_download["speed_text"])
|
|
self.assertIn("1 / 2 songs", str(running_download["primary_progress_text"]))
|
|
|
|
queued_download = rows_by_id[queued_download_job_id]
|
|
self.assertEqual("queued #1", queued_download["queue_label"])
|
|
|
|
running_general = rows_by_id[running_general_job_id]
|
|
self.assertEqual("general", running_general["lane_type"])
|
|
self.assertEqual("2 playlists", running_general["scope_summary"])
|
|
|
|
def test_list_task_center_rows_includes_paused_jobs(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.models import JobStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
initialize_database(db_path).close()
|
|
repo = OpsRepository(db_path)
|
|
|
|
paused_job_id = repo.create_job(
|
|
job_type="download_only",
|
|
config_snapshot={},
|
|
status=JobStatus.PAUSED,
|
|
playlist_scope={"playlist_ids": [23]},
|
|
)
|
|
|
|
rows = repo.list_task_center_rows(limit=20)
|
|
|
|
rows_by_id = {int(row["id"]): row for row in rows}
|
|
self.assertIn(paused_job_id, rows_by_id)
|
|
self.assertEqual("paused", rows_by_id[paused_job_id]["status"])
|
|
|
|
def test_list_task_center_rows_includes_completed_jobs(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.models import JobStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
initialize_database(db_path).close()
|
|
repo = OpsRepository(db_path)
|
|
|
|
completed_job_id = repo.create_job(
|
|
job_type="download_only",
|
|
config_snapshot={},
|
|
status=JobStatus.COMPLETED,
|
|
playlist_scope={"playlist_ids": [42]},
|
|
)
|
|
|
|
rows = repo.list_task_center_rows(limit=20)
|
|
|
|
rows_by_id = {int(row["id"]): row for row in rows}
|
|
self.assertIn(completed_job_id, rows_by_id)
|
|
self.assertEqual("completed", rows_by_id[completed_job_id]["status"])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|