Files

2070 lines
89 KiB
Python

from contextlib import closing
import sqlite3
import tempfile
import threading
import time
import unittest
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import patch
class OpsRunnerStateMachineTests(unittest.TestCase):
def _wait_for(self, predicate, *, timeout: float = 1.5, interval: float = 0.02) -> None:
deadline = time.time() + timeout
while time.time() < deadline:
if predicate():
return
time.sleep(interval)
self.assertTrue(predicate())
def _read_command(self, db_path: Path, command_id: int) -> sqlite3.Row:
with closing(sqlite3.connect(db_path)) as conn:
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT status, applied_at FROM job_commands WHERE id = ?",
(command_id,),
).fetchone()
self.assertIsNotNone(row)
return row
def _set_attempt_count(self, db_path: Path, item_id: int, attempt_count: int) -> None:
with closing(sqlite3.connect(db_path)) as conn, conn:
conn.execute(
"UPDATE job_items SET attempt_count = ? WHERE id = ?",
(attempt_count, item_id),
)
def _read_job_event_types(self, db_path: Path, job_id: int) -> list[str]:
with closing(sqlite3.connect(db_path)) as conn:
return [
str(row[0])
for row in conn.execute(
"SELECT event_type FROM job_events WHERE job_run_id = ? ORDER BY id",
(job_id,),
).fetchall()
]
def test_recover_incomplete_jobs_requeues_running_item_and_job_for_auto_resume(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(
job_type="catalog_sync",
config_snapshot={},
status=JobStatus.RUNNING,
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="collect",
seq_no=1,
status=StageStatus.RUNNING,
)
item_id = repo.create_item(
job_stage_id=stage_id,
item_type="song_sync",
item_key="qq:song:1",
status=ItemStatus.RUNNING,
)
repo.mark_item_running(item_id=item_id, worker_id=7)
runner.recover_incomplete_jobs()
job = repo.get_job(job_id)
stage = repo.get_stage(stage_id)
item = repo.get_item(item_id)
with closing(sqlite3.connect(db_path)) as conn:
event_types = [
row[0]
for row in conn.execute(
"SELECT event_type FROM job_events WHERE job_run_id = ? ORDER BY id",
(job_id,),
).fetchall()
]
self.assertEqual(JobStatus.QUEUED, job.status)
self.assertEqual(StageStatus.PENDING, stage.status)
self.assertEqual(ItemStatus.PENDING, item.status)
self.assertIsNone(item.worker_id)
self.assertIn("recovery_requeued", event_types)
def test_recovered_job_is_immediately_startable(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(
job_type="catalog_sync",
config_snapshot={},
status=JobStatus.RUNNING,
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="collect",
seq_no=1,
status=StageStatus.RUNNING,
)
item_id = repo.create_item(
job_stage_id=stage_id,
item_type="song_sync",
item_key="qq:song:1",
status=ItemStatus.RUNNING,
)
repo.mark_item_running(item_id=item_id, worker_id=7)
started: list[int] = []
def fake_run_job(started_job_id: int) -> None:
started.append(started_job_id)
time.sleep(0.05)
runner._run_job = fake_run_job
runner.recover_incomplete_jobs()
started_count = runner._start_eligible_jobs()
self.assertEqual(1, started_count)
self._wait_for(lambda: started == [job_id])
job = repo.get_job(job_id)
self.assertEqual(JobStatus.RUNNING, job.status)
def test_apply_pending_commands_pause_requests_pause_and_marks_command_applied(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(
job_type="catalog_sync",
config_snapshot={},
status=JobStatus.RUNNING,
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="collect",
seq_no=1,
status=StageStatus.RUNNING,
)
command_id = repo.create_command(job_run_id=job_id, command_type="pause")
runner.apply_pending_commands()
job = repo.get_job(job_id)
stage = repo.get_stage(stage_id)
command = self._read_command(db_path, command_id)
self.assertEqual(JobStatus.PAUSE_REQUESTED, job.status)
self.assertEqual(StageStatus.PAUSE_REQUESTED, stage.status)
self.assertEqual("applied", command["status"])
self.assertIsNotNone(command["applied_at"])
def test_apply_pending_commands_cancel_marks_job_canceled_and_items_recoverable(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(
job_type="catalog_sync",
config_snapshot={},
status=JobStatus.RUNNING,
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
item_id = repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:cancel:1",
song_id=1,
status=ItemStatus.PENDING,
)
command_id = repo.create_command(job_run_id=job_id, command_type="cancel")
runner.apply_pending_commands()
job = repo.get_job(job_id)
stage = repo.get_stage(stage_id)
item = repo.get_item(item_id)
command = self._read_command(db_path, command_id)
requeued = repo.requeue_item(item_id=item_id, force=False, job_id=job_id)
requeued_item = repo.get_item(item_id)
requeued_job = repo.get_job(job_id)
self.assertEqual(JobStatus.CANCELED, job.status)
self.assertEqual(StageStatus.SKIPPED, stage.status)
self.assertEqual(ItemStatus.CANCELED, item.status)
self.assertEqual("applied", command["status"])
self.assertTrue(requeued)
self.assertEqual(ItemStatus.PENDING, requeued_item.status)
self.assertEqual(JobStatus.QUEUED, requeued_job.status)
def test_apply_pending_commands_retry_and_force_retry_requeue_failed_items(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import ItemStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(job_type="catalog_sync", config_snapshot={})
stage_id = repo.create_stage(job_run_id=job_id, stage_type="sync", seq_no=1)
retryable_item_id = repo.create_item(
job_stage_id=stage_id,
item_type="song_sync",
item_key="song:retryable",
status=ItemStatus.FAILED,
max_attempts=3,
)
exhausted_item_id = repo.create_item(
job_stage_id=stage_id,
item_type="song_sync",
item_key="song:exhausted",
status=ItemStatus.FAILED,
max_attempts=3,
)
self._set_attempt_count(db_path, retryable_item_id, 1)
self._set_attempt_count(db_path, exhausted_item_id, 3)
repo.create_command(
job_run_id=job_id,
command_type="retry_item",
target_item_id=retryable_item_id,
)
repo.create_command(
job_run_id=job_id,
command_type="retry_item",
target_item_id=exhausted_item_id,
)
runner.apply_pending_commands()
retryable_item = repo.get_item(retryable_item_id)
exhausted_item = repo.get_item(exhausted_item_id)
repo.create_command(
job_run_id=job_id,
command_type="force_retry_item",
target_item_id=exhausted_item_id,
)
runner.apply_pending_commands()
exhausted_item_after_force = repo.get_item(exhausted_item_id)
self.assertEqual(ItemStatus.PENDING, retryable_item.status)
self.assertEqual(1, retryable_item.attempt_count)
self.assertEqual(ItemStatus.FAILED, exhausted_item.status)
self.assertEqual(ItemStatus.PENDING, exhausted_item_after_force.status)
def test_reconcile_pause_state_finalizes_pause_when_no_running_items(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(
job_type="catalog_sync",
config_snapshot={},
status=JobStatus.RUNNING,
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="collect",
seq_no=1,
status=StageStatus.RUNNING,
)
repo.request_job_pause(job_id)
runner.reconcile_pause_state(job_id)
job = repo.get_job(job_id)
stage = repo.get_stage(stage_id)
self.assertEqual(JobStatus.PAUSED, job.status)
self.assertEqual(StageStatus.PAUSED, stage.status)
def test_loop_once_does_not_false_complete_sync_only_job_with_paused_stage(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(
job_type="sync_only",
config_snapshot={},
status=JobStatus.RUNNING,
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="sync",
seq_no=1,
status=StageStatus.PAUSED,
)
item_id = repo.create_item(
job_stage_id=stage_id,
item_type="playlist_sync",
item_key="playlist:recovery:1",
playlist_id=1,
status=ItemStatus.INTERRUPTED,
)
runner.loop_once()
self._wait_for(lambda: repo.get_job(job_id).status != JobStatus.RUNNING)
job = repo.get_job(job_id)
stage = repo.get_stage(stage_id)
item = repo.get_item(item_id)
self.assertEqual(JobStatus.PAUSED, job.status)
self.assertEqual(StageStatus.PAUSED, stage.status)
self.assertEqual(ItemStatus.INTERRUPTED, item.status)
def test_loop_once_claims_queued_job_and_records_started_at(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import JobStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(job_type="catalog_sync", config_snapshot={})
runner.loop_once()
self._wait_for(lambda: repo.get_job(job_id).status == JobStatus.COMPLETED)
job = repo.get_job(job_id)
self.assertEqual(JobStatus.COMPLETED, job.status)
self.assertIsNotNone(job.started_at)
def test_loop_once_does_not_stop_after_creating_initial_stages(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import JobStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={"LIBRARY_DIR": str(library_root), "DOWNLOAD_WORKERS": "1"},
sources=["qq"],
download_sources=["qq"],
)
with patch(
"musicdl.catalogsync.downloader.DownloadPlanner.build_download_queue",
return_value=[{"id": 1, "song_id": 1, "platform": "qq", "name": "Song Runner"}],
):
with patch(
"musicdl.catalogsync.downloader.CatalogDownloader.download_song_row",
return_value=True,
):
runner.loop_once()
self._wait_for(lambda: repo.get_job(job_id).status == JobStatus.COMPLETED)
job = repo.get_job(job_id)
stages = repo.list_job_stages(job_id)
self.assertEqual(JobStatus.COMPLETED, job.status)
self.assertTrue(any(stage.success_items == 1 for stage in stages))
def test_loop_once_advances_pending_item_in_running_stage(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import ItemStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(job_type="catalog_sync", config_snapshot={"DOWNLOAD_WORKERS": "1"})
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
item_id = repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:runner:1",
song_id=1,
payload={"row": {"id": 1, "platform": "qq", "name": "Runner Song"}},
status=ItemStatus.PENDING,
)
with patch(
"musicdl.catalogsync.downloader.CatalogDownloader.download_song_row",
return_value=True,
):
runner.loop_once()
self._wait_for(lambda: repo.get_item(item_id).status != ItemStatus.PENDING)
item = repo.get_item(item_id)
self.assertNotEqual(ItemStatus.PENDING, item.status)
def test_download_stage_runs_catalog_export_after_completion(self):
from musicdl.catalogsync.catalog_export import CatalogExportResult
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={
"LIBRARY_DIR": str(library_root),
"CATALOG_EXPORT_COMMAND": "echo catalog-export",
},
status=JobStatus.RUNNING,
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:catalog-export:1",
song_id=1,
payload={"row": {"id": 1, "song_id": 1, "platform": "qq", "name": "Catalog Export Song"}},
)
def fake_resolve(executor, item_id, worker_name, *, ready_queue, already_claimed=False):
ready_queue.put(
SimpleNamespace(
item_id=item_id,
playlist_id=None,
row={"id": item_id},
resolved_payload=f"payload-{item_id}",
)
)
def fake_download_task(executor, task, worker_name):
executor.ops_repo.mark_item_succeeded(item_id=task.item_id)
with patch(
"musicdl.catalogsync.ops.executors.DownloadStageExecutor.process_resolve_item",
new=fake_resolve,
):
with patch(
"musicdl.catalogsync.ops.executors.DownloadStageExecutor.process_download_task",
new=fake_download_task,
):
with patch(
"musicdl.catalogsync.ops.runner.run_catalog_export_command",
return_value=CatalogExportResult(
status="succeeded",
command="echo catalog-export",
returncode=0,
stdout="ok",
),
) as run_export:
runner._run_stage(repo.get_job(job_id), repo.get_stage(stage_id))
stage = repo.get_stage(stage_id)
event_types = self._read_job_event_types(db_path, job_id)
self.assertEqual(StageStatus.COMPLETED, stage.status)
run_export.assert_called_once()
self.assertIn("catalog_export_started", event_types)
self.assertIn("catalog_export_succeeded", event_types)
self.assertLess(
event_types.index("catalog_export_started"),
event_types.index("catalog_export_succeeded"),
)
def test_paused_or_pause_requested_job_does_not_trigger_catalog_export(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
for status in (JobStatus.PAUSE_REQUESTED, JobStatus.PAUSED):
with self.subTest(status=status.value):
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={
"LIBRARY_DIR": str(library_root),
"CATALOG_EXPORT_COMMAND": "echo catalog-export",
},
status=status,
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="download",
seq_no=1,
status=StageStatus.PAUSED
if status == JobStatus.PAUSED
else StageStatus.PAUSE_REQUESTED,
)
with patch("musicdl.catalogsync.ops.runner.run_catalog_export_command") as run_export:
runner._run_catalog_export_for_stage(
repo.get_job(job_id),
repo.get_stage(stage_id),
)
event_types = self._read_job_event_types(db_path, job_id)
run_export.assert_not_called()
self.assertNotIn("catalog_export_started", event_types)
def test_catalog_export_skipped_emits_started_then_skipped(self):
from musicdl.catalogsync.catalog_export import CatalogExportResult
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={"LIBRARY_DIR": str(library_root)},
status=JobStatus.RUNNING,
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
with patch(
"musicdl.catalogsync.ops.runner.run_catalog_export_command",
return_value=CatalogExportResult(status="skipped"),
) as run_export:
runner._run_catalog_export_for_stage(
repo.get_job(job_id),
repo.get_stage(stage_id),
)
event_types = self._read_job_event_types(db_path, job_id)
run_export.assert_called_once()
self.assertIn("catalog_export_started", event_types)
self.assertIn("catalog_export_skipped", event_types)
self.assertLess(
event_types.index("catalog_export_started"),
event_types.index("catalog_export_skipped"),
)
def test_catalog_export_failed_emits_started_then_failed(self):
from musicdl.catalogsync.catalog_export import CatalogExportResult
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={"LIBRARY_DIR": str(library_root)},
status=JobStatus.RUNNING,
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
with patch(
"musicdl.catalogsync.ops.runner.run_catalog_export_command",
return_value=CatalogExportResult(status="failed", returncode=1, stderr="boom"),
) as run_export:
runner._run_catalog_export_for_stage(
repo.get_job(job_id),
repo.get_stage(stage_id),
)
event_types = self._read_job_event_types(db_path, job_id)
run_export.assert_called_once()
self.assertIn("catalog_export_started", event_types)
self.assertIn("catalog_export_failed", event_types)
self.assertLess(
event_types.index("catalog_export_started"),
event_types.index("catalog_export_failed"),
)
def test_pause_requested_while_waiting_catalog_export_lock_skips_export(self):
from musicdl.catalogsync.catalog_export import CatalogExportResult
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
class _SignalLock:
def __init__(self):
self._lock = threading.Lock()
self.enter_attempted = threading.Event()
def acquire(self):
self._lock.acquire()
def release(self):
self._lock.release()
def __enter__(self):
self.enter_attempted.set()
self._lock.acquire()
return self
def __exit__(self, exc_type, exc, tb):
self._lock.release()
return False
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={
"LIBRARY_DIR": str(library_root),
"CATALOG_EXPORT_COMMAND": "echo catalog-export",
},
status=JobStatus.RUNNING,
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
lock = _SignalLock()
lock.acquire()
runner._catalog_export_lock = lock
worker_done = threading.Event()
with patch(
"musicdl.catalogsync.ops.runner.run_catalog_export_command",
return_value=CatalogExportResult(status="succeeded"),
) as run_export:
thread = threading.Thread(
target=lambda: (
runner._run_catalog_export_for_stage(
repo.get_job(job_id),
repo.get_stage(stage_id),
),
worker_done.set(),
),
daemon=True,
)
thread.start()
self.assertTrue(lock.enter_attempted.wait(timeout=1.0))
repo.request_job_pause(job_id)
lock.release()
thread.join(timeout=1.0)
event_types = self._read_job_event_types(db_path, job_id)
self.assertTrue(worker_done.is_set())
run_export.assert_not_called()
self.assertNotIn("catalog_export_started", event_types)
def test_canceled_download_job_does_not_trigger_catalog_export(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={
"LIBRARY_DIR": str(library_root),
"CATALOG_EXPORT_COMMAND": "echo catalog-export",
},
status=JobStatus.RUNNING,
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
def cancel_job_before_fast_path(job, stage):
repo.mark_job_finished(job.id, status=JobStatus.CANCELED, last_error="canceled for test")
with patch.object(runner, "_materialize_stage_items", side_effect=cancel_job_before_fast_path):
with patch("musicdl.catalogsync.ops.runner.run_catalog_export_command") as run_export:
runner._run_stage(repo.get_job(job_id), repo.get_stage(stage_id))
event_types = self._read_job_event_types(db_path, job_id)
run_export.assert_not_called()
self.assertNotIn("catalog_export_started", event_types)
def test_claim_and_mark_next_runnable_job_is_atomic_semantic(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import JobStatus
from musicdl.catalogsync.ops.repository import OpsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
job1_id = repo.create_job(job_type="catalog_sync", config_snapshot={})
job2_id = repo.create_job(job_type="catalog_sync", config_snapshot={})
claimed_first = repo.claim_and_mark_next_runnable_job()
claimed_second = repo.claim_and_mark_next_runnable_job()
claimed_third = repo.claim_and_mark_next_runnable_job()
job1 = repo.get_job(job1_id)
job2 = repo.get_job(job2_id)
self.assertIsNotNone(claimed_first)
self.assertIsNotNone(claimed_second)
self.assertIsNone(claimed_third)
self.assertNotEqual(claimed_first.id, claimed_second.id)
self.assertEqual(JobStatus.RUNNING, claimed_first.status)
self.assertEqual(JobStatus.RUNNING, claimed_second.status)
self.assertEqual(JobStatus.RUNNING, job1.status)
self.assertEqual(JobStatus.RUNNING, job2.status)
def test_cross_job_retry_item_is_rejected_and_recorded(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import ItemStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_a = repo.create_job(job_type="catalog_sync", config_snapshot={})
stage_a = repo.create_stage(job_run_id=job_a, stage_type="sync", seq_no=1)
_ = repo.create_item(
job_stage_id=stage_a,
item_type="song_sync",
item_key="song:a",
status=ItemStatus.FAILED,
)
job_b = repo.create_job(job_type="catalog_sync", config_snapshot={})
stage_b = repo.create_stage(job_run_id=job_b, stage_type="sync", seq_no=1)
item_b = repo.create_item(
job_stage_id=stage_b,
item_type="song_sync",
item_key="song:b",
status=ItemStatus.FAILED,
)
self._set_attempt_count(db_path, item_b, 1)
repo.create_command(
job_run_id=job_a,
command_type="retry_item",
target_item_id=item_b,
)
runner.apply_pending_commands()
item_after = repo.get_item(item_b)
event_types = self._read_job_event_types(db_path, job_a)
self.assertEqual(ItemStatus.FAILED, item_after.status)
self.assertIn("retry_rejected", event_types)
def test_invalid_or_missing_target_command_has_observable_event(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(job_type="catalog_sync", config_snapshot={})
cmd1 = repo.create_command(job_run_id=job_id, command_type="unknown_command")
cmd2 = repo.create_command(job_run_id=job_id, command_type="retry_item")
runner.apply_pending_commands()
event_types = self._read_job_event_types(db_path, job_id)
cmd1_row = self._read_command(db_path, cmd1)
cmd2_row = self._read_command(db_path, cmd2)
self.assertIn("ignored_command", event_types)
self.assertEqual("applied", cmd1_row["status"])
self.assertEqual("applied", cmd2_row["status"])
def test_cas_state_transitions_do_not_drift_stage_counters(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import ItemStatus
from musicdl.catalogsync.ops.repository import OpsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
job_id = repo.create_job(job_type="catalog_sync", config_snapshot={})
stage_id = repo.create_stage(job_run_id=job_id, stage_type="sync", seq_no=1)
item_id = repo.create_item(
job_stage_id=stage_id,
item_type="song_sync",
item_key="song:cas",
status=ItemStatus.PENDING,
)
changed_illegal = repo.mark_item_interrupted(item_id=item_id, last_error="illegal")
item_after_illegal = repo.get_item(item_id)
stage_after_illegal = repo.get_stage(stage_id)
changed_running = repo.mark_item_running(item_id=item_id, worker_id=11)
changed_running_again = repo.mark_item_running(item_id=item_id, worker_id=12)
item_after_running = repo.get_item(item_id)
stage_after_running = repo.get_stage(stage_id)
changed_interrupted = repo.mark_item_interrupted(item_id=item_id, last_error="stop")
changed_interrupted_again = repo.mark_item_interrupted(
item_id=item_id, last_error="repeat"
)
item_after_interrupted = repo.get_item(item_id)
stage_after_interrupted = repo.get_stage(stage_id)
self.assertFalse(changed_illegal)
self.assertEqual(ItemStatus.PENDING, item_after_illegal.status)
self.assertEqual(1, stage_after_illegal.pending_items)
self.assertEqual(0, stage_after_illegal.running_items)
self.assertTrue(changed_running)
self.assertFalse(changed_running_again)
self.assertEqual(ItemStatus.RUNNING, item_after_running.status)
self.assertEqual(0, stage_after_running.pending_items)
self.assertEqual(1, stage_after_running.running_items)
self.assertTrue(changed_interrupted)
self.assertFalse(changed_interrupted_again)
self.assertEqual(ItemStatus.INTERRUPTED, item_after_interrupted.status)
self.assertEqual(0, stage_after_interrupted.pending_items)
self.assertEqual(0, stage_after_interrupted.running_items)
def test_resume_job_requeues_interrupted_items_back_to_pending(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
job_id = repo.create_job(
job_type="catalog_sync",
config_snapshot={},
status=JobStatus.PAUSED,
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="collect",
seq_no=1,
status=StageStatus.PAUSED,
)
item_id = repo.create_item(
job_stage_id=stage_id,
item_type="collect_source",
item_key="collect:netease",
status=ItemStatus.PENDING,
)
repo.mark_item_running(item_id=item_id, worker_id=1)
repo.mark_item_interrupted(item_id=item_id, last_error="restart")
repo.resume_job(job_id)
job = repo.get_job(job_id)
stage = repo.get_stage(stage_id)
item = repo.get_item(item_id)
self.assertEqual(JobStatus.QUEUED, job.status)
self.assertEqual(StageStatus.PENDING, stage.status)
self.assertEqual(ItemStatus.PENDING, item.status)
self.assertEqual(1, stage.pending_items)
def test_run_job_does_not_revive_paused_or_canceled_job(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import JobStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
paused_job_id = repo.create_job(
job_type="collect_only",
config_snapshot={},
status=JobStatus.PAUSED,
)
canceled_job_id = repo.create_job(
job_type="collect_only",
config_snapshot={},
status=JobStatus.CANCELED,
)
runner._run_job(paused_job_id)
runner._run_job(canceled_job_id)
paused_job = repo.get_job(paused_job_id)
canceled_job = repo.get_job(canceled_job_id)
self.assertEqual(JobStatus.PAUSED, paused_job.status)
self.assertEqual(JobStatus.CANCELED, canceled_job.status)
def test_duplicate_job_sources_materialize_single_collect_item(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(
job_type="collect_only",
config_snapshot={},
sources=["netease", "netease", "qq"],
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="collect",
seq_no=1,
status=StageStatus.RUNNING,
)
with patch("musicdl.catalogsync.ops.runner.CollectStageExecutor.process_item", return_value=None):
runner._materialize_stage_items(repo.get_job(job_id), repo.get_stage(stage_id))
with closing(sqlite3.connect(db_path)) as conn:
rows = conn.execute(
"SELECT item_key FROM job_items WHERE job_stage_id = ? ORDER BY id",
(stage_id,),
).fetchall()
self.assertEqual(["collect:netease", "collect:qq"], [row[0] for row in rows])
def test_interrupted_item_keeps_stage_open(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(
job_type="collect_only",
config_snapshot={},
status=JobStatus.RUNNING,
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="collect",
seq_no=1,
status=StageStatus.RUNNING,
)
item_id = repo.create_item(
job_stage_id=stage_id,
item_type="collect_source",
item_key="collect:netease",
status=ItemStatus.PENDING,
)
repo.mark_item_running(item_id=item_id, worker_id=1)
repo.mark_item_interrupted(item_id=item_id, last_error="restart")
runner._run_stage(repo.get_job(job_id), repo.get_stage(stage_id))
stage = repo.get_stage(stage_id)
job = repo.get_job(job_id)
self.assertEqual(StageStatus.RUNNING, stage.status)
self.assertEqual(JobStatus.RUNNING, job.status)
def test_scoped_download_stage_exports_playlist_artifacts_after_completion(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.ops.models import JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
from musicdl.catalogsync.repository import CatalogRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
root = Path(tmpdir)
db_path = root / "catalogsync.db"
library_root = root / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = OpsRepository(db_path)
catalog_repo = CatalogRepository(db_path)
runner = OpsRunner(repository=repo)
with closing(sqlite3.connect(db_path)) as conn, conn:
playlist_id = int(
conn.execute(
"""
INSERT INTO playlists (platform, remote_playlist_id, name, url, parse_strategy)
VALUES (?, ?, ?, ?, ?)
""",
("qq", "runner-export-1", "Runner Export Playlist", "https://example.invalid/playlist/1", "playlist_url"),
).lastrowid
)
song_id = catalog_repo.upsert_song(
CatalogSong(
platform="qq",
remote_song_id="runner-export-song-1",
name="Runner Export Song 1",
singers="Runner Singer",
ext="mp3",
file_size_bytes=128,
quality_label="standard",
metadata={},
)
)
catalog_repo.link_playlist_song(playlist_id, song_id, 1)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={"LIBRARY_DIR": str(library_root)},
status=JobStatus.RUNNING,
playlist_scope={"playlist_ids": [playlist_id]},
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key=f"song:{song_id}",
song_id=song_id,
playlist_id=playlist_id,
payload={"row": {"id": song_id, "song_id": song_id, "playlist_id": playlist_id, "name": "Runner Export Song 1"}},
)
def fake_resolve(executor, item_id, worker_name, *, ready_queue, already_claimed=False):
ready_queue.put(
SimpleNamespace(
item_id=item_id,
playlist_id=playlist_id,
row={"id": song_id, "playlist_id": playlist_id},
resolved_payload=f"payload-{item_id}",
)
)
def fake_download_task(executor, task, worker_name):
executor.ops_repo.mark_item_succeeded(item_id=task.item_id)
with patch(
"musicdl.catalogsync.ops.executors.DownloadStageExecutor.process_resolve_item",
new=fake_resolve,
):
with patch(
"musicdl.catalogsync.ops.executors.DownloadStageExecutor.process_download_task",
new=fake_download_task,
):
with patch.object(
runner,
"_export_playlist_artifacts_for_job",
create=True,
) as ensure_export:
runner._run_stage(repo.get_job(job_id), repo.get_stage(stage_id))
stage = repo.get_stage(stage_id)
self.assertEqual(StageStatus.COMPLETED, stage.status)
ensure_export.assert_called_once()
called_job, called_stage = ensure_export.call_args.args
self.assertEqual(job_id, called_job.id)
self.assertEqual(stage_id, called_stage.id)
def test_scoped_download_stage_exports_playlist_when_playlist_finishes_before_stage_end(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.ops.models import JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
from musicdl.catalogsync.repository import CatalogRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
root = Path(tmpdir)
db_path = root / "catalogsync.db"
library_root = root / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = OpsRepository(db_path)
catalog_repo = CatalogRepository(db_path)
runner = OpsRunner(repository=repo)
with closing(sqlite3.connect(db_path)) as conn, conn:
playlist_a = int(
conn.execute(
"""
INSERT INTO playlists (platform, remote_playlist_id, name, url, parse_strategy)
VALUES (?, ?, ?, ?, ?)
""",
("qq", "runner-early-export-a", "Runner Early Export A", "https://example.invalid/playlist/a", "playlist_url"),
).lastrowid
)
playlist_b = int(
conn.execute(
"""
INSERT INTO playlists (platform, remote_playlist_id, name, url, parse_strategy)
VALUES (?, ?, ?, ?, ?)
""",
("qq", "runner-early-export-b", "Runner Early Export B", "https://example.invalid/playlist/b", "playlist_url"),
).lastrowid
)
song_a = catalog_repo.upsert_song(
CatalogSong(
platform="qq",
remote_song_id="runner-early-export-song-a",
name="Runner Early Export Song A",
singers="Runner Singer",
ext="mp3",
file_size_bytes=128,
quality_label="standard",
metadata={},
)
)
song_b = catalog_repo.upsert_song(
CatalogSong(
platform="qq",
remote_song_id="runner-early-export-song-b",
name="Runner Early Export Song B",
singers="Runner Singer",
ext="mp3",
file_size_bytes=128,
quality_label="standard",
metadata={},
)
)
catalog_repo.link_playlist_song(playlist_a, song_a, 1)
catalog_repo.link_playlist_song(playlist_b, song_b, 1)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={"LIBRARY_DIR": str(library_root), "DOWNLOAD_WORKERS": "1"},
status=JobStatus.RUNNING,
playlist_scope={"playlist_ids": [playlist_a, playlist_b]},
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
item_a = repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key=f"song:{song_a}",
song_id=song_a,
playlist_id=playlist_a,
payload={"row": {"id": song_a, "song_id": song_a, "playlist_id": playlist_a, "platform": "qq", "name": "Runner Early Export Song A"}},
)
repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key=f"song:{song_b}",
song_id=song_b,
playlist_id=playlist_b,
payload={"row": {"id": song_b, "song_id": song_b, "playlist_id": playlist_b, "platform": "qq", "name": "Runner Early Export Song B"}},
)
seen_blocking_item = {"value": False}
def fake_download(executor, item_id, worker_name, *, already_claimed=False):
if item_id == item_a:
executor.ops_repo.mark_item_succeeded(item_id=item_id)
return
seen_blocking_item["value"] = True
raise RuntimeError("keep stage open")
with patch(
"musicdl.catalogsync.ops.executors.DownloadStageExecutor.process_item",
new=fake_download,
):
with patch(
"musicdl.catalogsync.services.CatalogSyncService.ensure_playlist_artifacts_for_playlist",
return_value=root / "playlists" / "Runner Early Export A",
) as ensure_export:
runner._run_stage(repo.get_job(job_id), repo.get_stage(stage_id))
stage = repo.get_stage(stage_id)
self.assertTrue(seen_blocking_item["value"])
self.assertEqual(StageStatus.RUNNING, stage.status)
self.assertEqual([playlist_a], [call.args[0] for call in ensure_export.call_args_list])
def test_scoped_download_stage_does_not_preexport_completed_playlists_on_resume(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.ops.models import ItemStatus, JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
from musicdl.catalogsync.repository import CatalogRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
root = Path(tmpdir)
db_path = root / "catalogsync.db"
library_root = root / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = OpsRepository(db_path)
catalog_repo = CatalogRepository(db_path)
runner = OpsRunner(repository=repo)
with closing(sqlite3.connect(db_path)) as conn, conn:
playlist_a = int(
conn.execute(
"""
INSERT INTO playlists (platform, remote_playlist_id, name, url, parse_strategy)
VALUES (?, ?, ?, ?, ?)
""",
("qq", "runner-resume-export-a", "Runner Resume Export A", "https://example.invalid/playlist/resume-a", "playlist_url"),
).lastrowid
)
playlist_b = int(
conn.execute(
"""
INSERT INTO playlists (platform, remote_playlist_id, name, url, parse_strategy)
VALUES (?, ?, ?, ?, ?)
""",
("qq", "runner-resume-export-b", "Runner Resume Export B", "https://example.invalid/playlist/resume-b", "playlist_url"),
).lastrowid
)
song_a = catalog_repo.upsert_song(
CatalogSong(
platform="qq",
remote_song_id="runner-resume-export-song-a",
name="Runner Resume Export Song A",
singers="Runner Singer",
ext="mp3",
file_size_bytes=128,
quality_label="standard",
metadata={},
)
)
song_b = catalog_repo.upsert_song(
CatalogSong(
platform="qq",
remote_song_id="runner-resume-export-song-b",
name="Runner Resume Export Song B",
singers="Runner Singer",
ext="mp3",
file_size_bytes=128,
quality_label="standard",
metadata={},
)
)
catalog_repo.link_playlist_song(playlist_a, song_a, 1)
catalog_repo.link_playlist_song(playlist_b, song_b, 1)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={"LIBRARY_DIR": str(library_root), "DOWNLOAD_WORKERS": "1"},
status=JobStatus.RUNNING,
playlist_scope={"playlist_ids": [playlist_a, playlist_b]},
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key=f"song:{song_a}",
song_id=song_a,
playlist_id=playlist_a,
status=ItemStatus.SUCCEEDED,
payload={"row": {"id": song_a, "song_id": song_a, "playlist_id": playlist_a, "platform": "qq", "name": "Runner Resume Export Song A"}},
)
repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key=f"song:{song_b}",
song_id=song_b,
playlist_id=playlist_b,
payload={"row": {"id": song_b, "song_id": song_b, "playlist_id": playlist_b, "platform": "qq", "name": "Runner Resume Export Song B"}},
)
processed_pending_item = {"value": False}
def fake_download(executor, item_id, worker_name, *, already_claimed=False):
processed_pending_item["value"] = True
raise RuntimeError("keep stage open")
with patch(
"musicdl.catalogsync.ops.executors.DownloadStageExecutor.process_item",
new=fake_download,
):
with patch(
"musicdl.catalogsync.services.CatalogSyncService.ensure_playlist_artifacts_for_playlist",
return_value=root / "playlists" / "Runner Resume Export A",
) as ensure_export:
runner._run_stage(repo.get_job(job_id), repo.get_stage(stage_id))
stage = repo.get_stage(stage_id)
self.assertTrue(processed_pending_item["value"])
self.assertEqual(StageStatus.RUNNING, stage.status)
self.assertEqual([], [call.args[0] for call in ensure_export.call_args_list])
def test_scoped_download_stage_ignores_playlist_export_summary_event_failure(self):
from musicdl.catalogsync.catalog_export import CatalogExportResult
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.ops.models import JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
from musicdl.catalogsync.repository import CatalogRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
root = Path(tmpdir)
db_path = root / "catalogsync.db"
library_root = root / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = OpsRepository(db_path)
catalog_repo = CatalogRepository(db_path)
runner = OpsRunner(repository=repo)
with closing(sqlite3.connect(db_path)) as conn, conn:
playlist_id = int(
conn.execute(
"""
INSERT INTO playlists (platform, remote_playlist_id, name, url, parse_strategy)
VALUES (?, ?, ?, ?, ?)
""",
("qq", "runner-summary-export", "Runner Summary Export", "https://example.invalid/playlist/export", "playlist_url"),
).lastrowid
)
song_id = catalog_repo.upsert_song(
CatalogSong(
platform="qq",
remote_song_id="runner-summary-export-song",
name="Runner Summary Export Song",
singers="Runner Singer",
ext="mp3",
file_size_bytes=128,
quality_label="standard",
metadata={},
)
)
catalog_repo.link_playlist_song(playlist_id, song_id, 1)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={
"LIBRARY_DIR": str(library_root),
"DOWNLOAD_WORKERS": "1",
"CATALOG_EXPORT_COMMAND": "echo catalog-export",
},
status=JobStatus.RUNNING,
playlist_scope={"playlist_ids": [playlist_id]},
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key=f"song:{song_id}",
song_id=song_id,
playlist_id=playlist_id,
payload={"row": {"id": song_id, "song_id": song_id, "playlist_id": playlist_id, "platform": "qq", "name": "Runner Summary Export Song"}},
)
def fake_download(executor, item_id, worker_name, *, already_claimed=False):
executor.ops_repo.mark_item_succeeded(item_id=item_id)
original_add_job_event = repo.add_job_event
playlist_exported_attempts = {"count": 0}
def flaky_add_job_event(job_id, event_type, message=None, **kwargs):
if event_type == "playlist_exported":
playlist_exported_attempts["count"] += 1
raise RuntimeError("playlist export summary failed")
return original_add_job_event(job_id, event_type, message, **kwargs)
with patch(
"musicdl.catalogsync.ops.executors.DownloadStageExecutor.process_item",
new=fake_download,
):
with patch(
"musicdl.catalogsync.services.CatalogSyncService.ensure_playlist_artifacts_for_playlist",
return_value=root / "playlists" / "Runner Summary Export",
):
with patch.object(repo, "add_job_event", side_effect=flaky_add_job_event):
with patch(
"musicdl.catalogsync.ops.runner.run_catalog_export_command",
return_value=CatalogExportResult(
status="succeeded",
command="echo catalog-export",
returncode=0,
stdout="ok",
),
) as run_export:
runner._run_stage(repo.get_job(job_id), repo.get_stage(stage_id))
stage = repo.get_stage(stage_id)
event_types = self._read_job_event_types(db_path, job_id)
self.assertEqual(StageStatus.COMPLETED, stage.status)
self.assertEqual(1, playlist_exported_attempts["count"])
self.assertIn("playlist_export_ready", event_types)
self.assertNotIn("playlist_exported", event_types)
self.assertIn("catalog_export_started", event_types)
self.assertIn("catalog_export_succeeded", event_types)
run_export.assert_called_once()
def test_scoped_download_stage_emits_playlist_exported_summary_event(self):
from musicdl.catalogsync.catalog_export import CatalogExportResult
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.models import CatalogSong
from musicdl.catalogsync.ops.models import JobStatus, StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
from musicdl.catalogsync.repository import CatalogRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
root = Path(tmpdir)
db_path = root / "catalogsync.db"
library_root = root / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = OpsRepository(db_path)
catalog_repo = CatalogRepository(db_path)
runner = OpsRunner(repository=repo)
with closing(sqlite3.connect(db_path)) as conn, conn:
playlist_id = int(
conn.execute(
"""
INSERT INTO playlists (platform, remote_playlist_id, name, url, parse_strategy)
VALUES (?, ?, ?, ?, ?)
""",
("qq", "runner-summary-event", "Runner Summary Event", "https://example.invalid/playlist/summary", "playlist_url"),
).lastrowid
)
song_id = catalog_repo.upsert_song(
CatalogSong(
platform="qq",
remote_song_id="runner-summary-event-song",
name="Runner Summary Event Song",
singers="Runner Singer",
ext="mp3",
file_size_bytes=128,
quality_label="standard",
metadata={},
)
)
catalog_repo.link_playlist_song(playlist_id, song_id, 1)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={
"LIBRARY_DIR": str(library_root),
"DOWNLOAD_WORKERS": "1",
"CATALOG_EXPORT_COMMAND": "echo catalog-export",
},
status=JobStatus.RUNNING,
playlist_scope={"playlist_ids": [playlist_id]},
)
stage_id = repo.create_stage(
job_run_id=job_id,
stage_type="download",
seq_no=1,
status=StageStatus.RUNNING,
)
repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key=f"song:{song_id}",
song_id=song_id,
playlist_id=playlist_id,
payload={"row": {"id": song_id, "song_id": song_id, "playlist_id": playlist_id, "platform": "qq", "name": "Runner Summary Event Song"}},
)
def fake_download(executor, item_id, worker_name, *, already_claimed=False):
executor.ops_repo.mark_item_succeeded(item_id=item_id)
with patch(
"musicdl.catalogsync.ops.executors.DownloadStageExecutor.process_item",
new=fake_download,
):
with patch(
"musicdl.catalogsync.services.CatalogSyncService.ensure_playlist_artifacts_for_playlist",
return_value=root / "playlists" / "Runner Summary Event",
):
with patch(
"musicdl.catalogsync.ops.runner.run_catalog_export_command",
return_value=CatalogExportResult(
status="succeeded",
command="echo catalog-export",
returncode=0,
stdout="ok",
),
) as run_export:
runner._run_stage(repo.get_job(job_id), repo.get_stage(stage_id))
stage = repo.get_stage(stage_id)
event_types = self._read_job_event_types(db_path, job_id)
self.assertEqual(StageStatus.COMPLETED, stage.status)
self.assertIn("playlist_export_ready", event_types)
self.assertIn("playlist_exported", event_types)
self.assertLess(
event_types.index("playlist_export_ready"),
event_types.index("playlist_exported"),
)
self.assertIn("catalog_export_started", event_types)
self.assertIn("catalog_export_succeeded", event_types)
run_export.assert_called_once()
class OpsRunnerLaneTests(unittest.TestCase):
def _wait_for(self, predicate, *, timeout: float = 1.5, interval: float = 0.02) -> None:
deadline = time.time() + timeout
while time.time() < deadline:
if predicate():
return
time.sleep(interval)
self.assertTrue(predicate())
def _seed_stage_item(self, repo, *, job_id: int, stage_type: str, item_key: str) -> int:
stage_id = repo.create_stage(job_run_id=job_id, stage_type=stage_type, seq_no=1)
return repo.create_item(
job_stage_id=stage_id,
item_type="song_download" if stage_type == "download" else "playlist_sync",
item_key=item_key,
song_id=101 if stage_type == "download" else None,
playlist_id=201 if stage_type == "sync" else None,
payload={"row": {"id": 101, "name": "Task Item"}},
)
def test_download_lane_runs_only_one_job_at_a_time(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import JobStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo, sleep_seconds=0.01)
job_a = repo.create_job(
job_type="download_only",
config_snapshot={"LIBRARY_DIR": str(library_root)},
)
job_b = repo.create_job(
job_type="catalog_sync",
config_snapshot={"LIBRARY_DIR": str(library_root)},
)
self._seed_stage_item(repo, job_id=job_a, stage_type="download", item_key="song:a")
self._seed_stage_item(repo, job_id=job_b, stage_type="download", item_key="song:b")
gate = threading.Event()
def slow_download(executor, item_id, worker_name, *, already_claimed=False):
gate.wait(0.2)
executor.ops_repo.mark_item_succeeded(item_id=item_id)
with patch(
"musicdl.catalogsync.ops.executors.DownloadStageExecutor.process_item",
new=slow_download,
):
runner.loop_once()
runner.loop_once()
self._wait_for(
lambda: repo.get_job(job_a).status == JobStatus.RUNNING
and repo.get_job(job_b).status == JobStatus.QUEUED
)
status_a = repo.get_job(job_a).status
status_b = repo.get_job(job_b).status
gate.set()
time.sleep(0.1)
self.assertEqual(JobStatus.RUNNING, status_a)
self.assertEqual(JobStatus.QUEUED, status_b)
def test_pause_requested_queued_job_is_paused_without_starting(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import JobStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo, sleep_seconds=0.01)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={"LIBRARY_DIR": str(library_root)},
)
self._seed_stage_item(repo, job_id=job_id, stage_type="download", item_key="song:pause")
repo.create_command(job_run_id=job_id, command_type="pause")
with patch("musicdl.catalogsync.ops.executors.DownloadStageExecutor.process_item") as process_item:
runner.loop_once()
self._wait_for(lambda: repo.get_job(job_id).status == JobStatus.PAUSED)
job = repo.get_job(job_id)
self.assertEqual(JobStatus.PAUSED, job.status)
self.assertIsNone(job.started_at)
process_item.assert_not_called()
def test_failed_background_job_future_is_reaped_without_sticking(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import JobStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo, sleep_seconds=0.01)
job_id = repo.create_job(job_type="collect_only", config_snapshot={})
with patch.object(runner, "_ensure_job_stages", side_effect=RuntimeError("boom")):
runner.loop_once()
self._wait_for(lambda: repo.get_job(job_id).status == JobStatus.FAILED)
runner.loop_once()
job = repo.get_job(job_id)
self.assertEqual(JobStatus.FAILED, job.status)
self.assertEqual({}, runner._futures)
def test_general_lane_job_can_run_while_download_lane_job_is_active(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import JobStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo, sleep_seconds=0.01)
download_job = repo.create_job(
job_type="download_only",
config_snapshot={"LIBRARY_DIR": str(library_root)},
)
sync_job = repo.create_job(job_type="sync_only", config_snapshot={})
self._seed_stage_item(repo, job_id=download_job, stage_type="download", item_key="song:d")
self._seed_stage_item(repo, job_id=sync_job, stage_type="sync", item_key="playlist:s")
download_gate = threading.Event()
sync_seen = threading.Event()
def slow_download(executor, item_id, worker_name, *, already_claimed=False):
download_gate.wait(0.2)
executor.ops_repo.mark_item_succeeded(item_id=item_id)
def fast_sync(executor, item_id, worker_name, *, already_claimed=False):
sync_seen.set()
executor.ops_repo.mark_item_succeeded(item_id=item_id)
with patch(
"musicdl.catalogsync.ops.executors.DownloadStageExecutor.process_item",
new=slow_download,
):
with patch(
"musicdl.catalogsync.ops.executors.SyncStageExecutor.process_item",
new=fast_sync,
):
runner.loop_once()
runner.loop_once()
self._wait_for(sync_seen.is_set)
download_status = repo.get_job(download_job).status
sync_status = repo.get_job(sync_job).status
download_gate.set()
time.sleep(0.1)
self.assertEqual(JobStatus.RUNNING, download_status)
self.assertIn(sync_status, {JobStatus.RUNNING, JobStatus.COMPLETED})
def test_download_lane_stays_serial_even_if_configured_higher(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import JobStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = OpsRepository(db_path)
runner = OpsRunner(
repository=repo,
sleep_seconds=0.01,
download_lane_concurrency=2,
)
job_a = repo.create_job(
job_type="download_only",
config_snapshot={"LIBRARY_DIR": str(library_root)},
)
job_b = repo.create_job(
job_type="download_only",
config_snapshot={"LIBRARY_DIR": str(library_root)},
)
self._seed_stage_item(repo, job_id=job_a, stage_type="download", item_key="song:a")
self._seed_stage_item(repo, job_id=job_b, stage_type="download", item_key="song:b")
gate = threading.Event()
def slow_download(executor, item_id, worker_name, *, already_claimed=False):
gate.wait(0.2)
executor.ops_repo.mark_item_succeeded(item_id=item_id)
with patch(
"musicdl.catalogsync.ops.executors.DownloadStageExecutor.process_item",
new=slow_download,
):
runner.loop_once()
runner.loop_once()
self._wait_for(
lambda: repo.get_job(job_a).status == JobStatus.RUNNING
and repo.get_job(job_b).status == JobStatus.QUEUED
)
status_a = repo.get_job(job_a).status
status_b = repo.get_job(job_b).status
gate.set()
time.sleep(0.1)
self.assertEqual(JobStatus.RUNNING, status_a)
self.assertEqual(JobStatus.QUEUED, status_b)
def test_download_stage_uses_ten_workers_by_default(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(job_type="download_only", config_snapshot={})
job = repo.get_job(job_id)
self.assertEqual(10, runner._worker_count(job, "download"))
def test_download_stage_uses_configured_download_workers(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={"DOWNLOAD_WORKERS": "7"},
)
job = repo.get_job(job_id)
self.assertEqual(7, runner._worker_count(job, "download"))
def test_sync_stage_uses_four_workers_by_default(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(job_type="sync_only", config_snapshot={})
job = repo.get_job(job_id)
self.assertEqual(4, runner._worker_count(job, "sync"))
def test_sync_stage_uses_configured_sync_workers(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(
job_type="sync_only",
config_snapshot={"SYNC_WORKERS": "6"},
)
job = repo.get_job(job_id)
self.assertEqual(6, runner._worker_count(job, "sync"))
def test_download_stage_worker_split_biases_toward_resolve_workers(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
resolver_workers, download_workers = runner._download_stage_worker_split(total_workers=10)
self.assertEqual(8, resolver_workers)
self.assertEqual(2, download_workers)
def test_download_stage_pipeline_processes_items_through_ready_queue(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.models import StageStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
processed: list[tuple[int, str, str]] = []
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={"LIBRARY_DIR": str(library_root), "DOWNLOAD_WORKERS": "3"},
)
stage_id = repo.create_stage(job_run_id=job_id, stage_type="download", seq_no=1)
repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:701",
song_id=701,
payload={"row": {"id": 701, "platform": "qq", "name": "Runner Queue 1"}},
)
repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:702",
song_id=702,
payload={"row": {"id": 702, "platform": "qq", "name": "Runner Queue 2"}},
)
job = repo.get_job(job_id)
stage = repo.get_stage(stage_id)
self.assertIsNotNone(job)
self.assertIsNotNone(stage)
class FakeDownloadExecutor:
def process_resolve_item(self, item_id, worker_name, *, ready_queue, already_claimed=False):
ready_queue.put(
SimpleNamespace(
item_id=item_id,
playlist_id=None,
row={"id": item_id},
resolved_payload=f"payload-{item_id}",
)
)
def process_download_task(self, task, worker_name):
processed.append((task.item_id, worker_name, task.resolved_payload))
repo.mark_item_succeeded(task.item_id)
with patch.object(runner, "_build_executor", return_value=FakeDownloadExecutor()):
runner._run_stage(job, stage)
refreshed_stage = repo.get_stage(stage_id)
self.assertTrue(processed)
self.assertTrue(all(worker_name.startswith("download-") for _, worker_name, _ in processed))
self.assertIsNotNone(refreshed_stage)
self.assertEqual(StageStatus.COMPLETED, refreshed_stage.status)
def test_download_stage_pipeline_uses_single_thread_compatibility_when_worker_count_is_one(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.ops.runner import OpsRunner
calls: list[tuple[int, str, bool]] = []
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
repo = OpsRepository(db_path)
runner = OpsRunner(repository=repo)
job_id = repo.create_job(
job_type="download_only",
config_snapshot={"LIBRARY_DIR": str(library_root), "DOWNLOAD_WORKERS": "1"},
)
stage_id = repo.create_stage(job_run_id=job_id, stage_type="download", seq_no=1)
repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:711",
song_id=711,
payload={"row": {"id": 711, "platform": "qq", "name": "Runner Compat 1"}},
)
job = repo.get_job(job_id)
stage = repo.get_stage(stage_id)
self.assertIsNotNone(job)
self.assertIsNotNone(stage)
class FakeDownloadExecutor:
def process_item(self, item_id, worker_name, *, already_claimed=False):
calls.append((item_id, worker_name, already_claimed))
repo.mark_item_succeeded(item_id)
with patch.object(runner, "_build_executor", return_value=FakeDownloadExecutor()):
runner._run_stage(job, stage)
self.assertTrue(calls)
self.assertEqual("download-1", calls[0][1])
def test_catalog_sync_is_classified_into_download_lane(self):
from musicdl.catalogsync.ops.jobdefs import DOWNLOAD_LANE, GENERAL_LANE, job_lane_type
self.assertEqual(DOWNLOAD_LANE, job_lane_type("catalog_sync"))
self.assertEqual(DOWNLOAD_LANE, job_lane_type("download_upload"))
self.assertEqual(GENERAL_LANE, job_lane_type("sync_only"))
if __name__ == "__main__":
unittest.main()