902 lines
39 KiB
Python
902 lines
39 KiB
Python
import json
|
|
from contextlib import closing
|
|
from queue import Queue
|
|
import tempfile
|
|
import threading
|
|
import unittest
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
from unittest.mock import patch
|
|
|
|
|
|
class StageExecutorTests(unittest.TestCase):
|
|
def _create_job_stage(self, repo, stage_type: str = "download") -> int:
|
|
job_id = repo.create_job(job_type="catalog_sync", config_snapshot={})
|
|
return repo.create_stage(job_run_id=job_id, stage_type=stage_type, seq_no=1)
|
|
|
|
def _insert_song_row(self, db_path: Path, *, platform: str, remote_song_id: str, name: str, singers: str) -> int:
|
|
with closing(sqlite3.connect(db_path)) as conn, conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
INSERT INTO songs (platform, remote_song_id, name, singers, ext, file_size_bytes)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(platform, remote_song_id, name, singers, "mp3", 128),
|
|
)
|
|
return int(cursor.lastrowid)
|
|
|
|
def _insert_playlist_row(self, db_path: Path, *, platform: str, remote_playlist_id: str, name: str, url: str) -> int:
|
|
with closing(sqlite3.connect(db_path)) as conn, conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
INSERT INTO playlists (platform, remote_playlist_id, name, url, parse_strategy)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
(platform, remote_playlist_id, name, url, "playlist_url"),
|
|
)
|
|
return int(cursor.lastrowid)
|
|
|
|
def _insert_upload_task_bundle(self, db_path: Path, *, absolute_path: str, target_locator: str) -> int:
|
|
with closing(sqlite3.connect(db_path)) as conn, conn:
|
|
song_id = int(
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO songs (platform, remote_song_id, name, singers, ext, file_size_bytes)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
("qq", "song-upload", "Song Upload", "Singer Upload", "mp3", 64),
|
|
).lastrowid
|
|
)
|
|
file_asset_id = int(
|
|
conn.execute(
|
|
"INSERT INTO file_assets (song_id, ext, file_size_bytes) VALUES (?, ?, ?)",
|
|
(song_id, "mp3", 64),
|
|
).lastrowid
|
|
)
|
|
source_location_id = int(
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO file_locations (
|
|
file_asset_id, backend_id, container_name, locator, absolute_path, status, is_primary
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, 'active', 1)
|
|
""",
|
|
(file_asset_id, 1, "library", "qq/Singer Upload/song-upload.mp3", absolute_path),
|
|
).lastrowid
|
|
)
|
|
task_id = int(
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO upload_tasks (
|
|
file_asset_id, source_location_id, target_backend_id, target_container_name, target_locator, status
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, 'pending')
|
|
""",
|
|
(file_asset_id, source_location_id, 2, "bucket-a", target_locator),
|
|
).lastrowid
|
|
)
|
|
return task_id
|
|
|
|
def test_download_executor_marks_item_succeeded_when_hook_returns_true(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
|
|
from musicdl.catalogsync.ops.models import ItemStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
library_root = Path(tmpdir) / "library"
|
|
initialize_database(db_path, default_library_root=library_root).close()
|
|
ops_repo = OpsRepository(db_path)
|
|
|
|
stage_id = self._create_job_stage(ops_repo, stage_type="download")
|
|
item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="song_download",
|
|
item_key="song:1",
|
|
song_id=1,
|
|
payload={"row": {"id": 1, "platform": "qq", "name": "Song 1"}},
|
|
)
|
|
|
|
executor = DownloadStageExecutor(
|
|
db_path=db_path,
|
|
library_root=library_root,
|
|
download_sources=["qq"],
|
|
)
|
|
with patch(
|
|
"musicdl.catalogsync.downloader.CatalogDownloader.download_song_row",
|
|
return_value=True,
|
|
):
|
|
executor.process_item(item_id=item_id, worker_name="download-1")
|
|
|
|
item = ops_repo.get_item(item_id)
|
|
|
|
self.assertEqual(ItemStatus.SUCCEEDED, item.status)
|
|
|
|
def test_download_executor_skips_redownload_when_song_already_exists_locally(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
|
|
from musicdl.catalogsync.ops.models import ItemStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
from musicdl.catalogsync.repository import CatalogRepository
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
library_root = Path(tmpdir) / "library"
|
|
initialize_database(db_path, default_library_root=library_root).close()
|
|
ops_repo = OpsRepository(db_path)
|
|
catalog_repo = CatalogRepository(db_path)
|
|
|
|
song_id = self._insert_song_row(
|
|
db_path,
|
|
platform="qq",
|
|
remote_song_id="song-existing-local",
|
|
name="Song Existing Local",
|
|
singers="Singer Existing",
|
|
)
|
|
existing_relative_path = "qq/Singer Existing/song-existing-local.mp3"
|
|
existing_absolute_path = library_root / existing_relative_path
|
|
existing_absolute_path.parent.mkdir(parents=True, exist_ok=True)
|
|
existing_absolute_path.write_bytes(b"x" * 128)
|
|
backend_id = catalog_repo.ensure_local_backend(
|
|
library_root,
|
|
name="default-local",
|
|
is_default=True,
|
|
)
|
|
catalog_repo.record_local_file(
|
|
song_id=song_id,
|
|
backend_id=backend_id,
|
|
relative_path=existing_relative_path,
|
|
file_size_bytes=128,
|
|
ext="mp3",
|
|
quality_label="standard",
|
|
)
|
|
|
|
stage_id = self._create_job_stage(ops_repo, stage_type="download")
|
|
item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="song_download",
|
|
item_key=f"song:{song_id}",
|
|
song_id=song_id,
|
|
payload={"row": {"id": song_id, "platform": "qq", "name": "Song Existing Local"}},
|
|
)
|
|
|
|
executor = DownloadStageExecutor(
|
|
db_path=db_path,
|
|
library_root=library_root,
|
|
download_sources=["qq"],
|
|
)
|
|
with patch(
|
|
"musicdl.catalogsync.downloader.CatalogDownloader.download_song_row",
|
|
side_effect=AssertionError("download should not be called for existing local file"),
|
|
):
|
|
executor.process_item(item_id=item_id, worker_name="download-existing-local")
|
|
|
|
item = ops_repo.get_item(item_id)
|
|
|
|
self.assertEqual(ItemStatus.SUCCEEDED, item.status)
|
|
|
|
def test_download_executor_process_resolve_item_marks_failed_when_resolution_returns_none(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
|
|
from musicdl.catalogsync.ops.models import ItemStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
library_root = Path(tmpdir) / "library"
|
|
initialize_database(db_path, default_library_root=library_root).close()
|
|
ops_repo = OpsRepository(db_path)
|
|
|
|
song_id = self._insert_song_row(
|
|
db_path,
|
|
platform="qq",
|
|
remote_song_id="song-resolve-none",
|
|
name="Song Resolve None",
|
|
singers="Singer Resolve",
|
|
)
|
|
stage_id = self._create_job_stage(ops_repo, stage_type="download")
|
|
item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="song_download",
|
|
item_key=f"song:{song_id}",
|
|
song_id=song_id,
|
|
payload={"row": {"id": song_id}},
|
|
)
|
|
executor = DownloadStageExecutor(
|
|
db_path=db_path,
|
|
library_root=library_root,
|
|
download_sources=["qq"],
|
|
)
|
|
ready_queue: Queue = Queue()
|
|
|
|
with patch.object(executor.downloader, "resolve_song_row", return_value=None):
|
|
executor.process_resolve_item(
|
|
item_id=item_id,
|
|
worker_name="resolve-1",
|
|
ready_queue=ready_queue,
|
|
)
|
|
|
|
item = ops_repo.get_item(item_id)
|
|
|
|
self.assertEqual(ItemStatus.FAILED, item.status)
|
|
self.assertTrue(ready_queue.empty())
|
|
|
|
def test_download_executor_process_resolve_item_enqueues_resolved_payload(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
library_root = Path(tmpdir) / "library"
|
|
initialize_database(db_path, default_library_root=library_root).close()
|
|
ops_repo = OpsRepository(db_path)
|
|
|
|
song_id = self._insert_song_row(
|
|
db_path,
|
|
platform="qq",
|
|
remote_song_id="song-resolve-ok",
|
|
name="Song Resolve OK",
|
|
singers="Singer Resolve",
|
|
)
|
|
stage_id = self._create_job_stage(ops_repo, stage_type="download")
|
|
item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="song_download",
|
|
item_key=f"song:{song_id}",
|
|
song_id=song_id,
|
|
playlist_id=33,
|
|
payload={"row": {"id": song_id, "playlist_id": 33}},
|
|
)
|
|
executor = DownloadStageExecutor(
|
|
db_path=db_path,
|
|
library_root=library_root,
|
|
download_sources=["qq"],
|
|
)
|
|
ready_queue: Queue = Queue()
|
|
resolved_payload = SimpleNamespace(
|
|
row={"id": song_id, "playlist_id": 33},
|
|
display_text="Song Resolve OK / Singer Resolve",
|
|
)
|
|
|
|
with patch.object(executor.downloader, "resolve_song_row", return_value=resolved_payload):
|
|
executor.process_resolve_item(
|
|
item_id=item_id,
|
|
worker_name="resolve-1",
|
|
ready_queue=ready_queue,
|
|
)
|
|
|
|
queued = ready_queue.get_nowait()
|
|
|
|
self.assertEqual(item_id, queued.item_id)
|
|
self.assertEqual(33, queued.playlist_id)
|
|
self.assertIs(resolved_payload, queued.resolved_payload)
|
|
|
|
def test_download_executor_process_download_task_marks_item_succeeded(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
|
|
from musicdl.catalogsync.ops.models import ItemStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
library_root = Path(tmpdir) / "library"
|
|
initialize_database(db_path, default_library_root=library_root).close()
|
|
ops_repo = OpsRepository(db_path)
|
|
|
|
song_id = self._insert_song_row(
|
|
db_path,
|
|
platform="qq",
|
|
remote_song_id="song-download-task",
|
|
name="Song Download Task",
|
|
singers="Singer Download",
|
|
)
|
|
stage_id = self._create_job_stage(ops_repo, stage_type="download")
|
|
item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="song_download",
|
|
item_key=f"song:{song_id}",
|
|
song_id=song_id,
|
|
payload={"row": {"id": song_id}},
|
|
)
|
|
executor = DownloadStageExecutor(
|
|
db_path=db_path,
|
|
library_root=library_root,
|
|
download_sources=["qq"],
|
|
)
|
|
ops_repo.claim_item(item_id=item_id, worker_name="resolve-1")
|
|
resolved_task = SimpleNamespace(
|
|
item_id=item_id,
|
|
playlist_id=None,
|
|
row={"id": song_id},
|
|
resolved_payload=SimpleNamespace(row={"id": song_id}, display_text="Song Download Task / Singer Download"),
|
|
)
|
|
|
|
with patch.object(executor.downloader, "download_resolved_song", return_value=True):
|
|
executor.process_download_task(
|
|
task=resolved_task,
|
|
worker_name="download-1",
|
|
)
|
|
|
|
item = ops_repo.get_item(item_id)
|
|
|
|
self.assertEqual(ItemStatus.SUCCEEDED, item.status)
|
|
|
|
def test_sync_executor_marks_item_succeeded_and_records_linked_count(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.executors import SyncStageExecutor
|
|
from musicdl.catalogsync.ops.models import ItemStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
from musicdl.catalogsync.services import CatalogSyncService
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
initialize_database(db_path).close()
|
|
ops_repo = OpsRepository(db_path)
|
|
|
|
stage_id = self._create_job_stage(ops_repo, stage_type="sync")
|
|
item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="playlist_sync",
|
|
item_key="playlist:2",
|
|
playlist_id=2,
|
|
payload={"playlist_row": {"id": 2, "platform": "qq", "name": "Playlist 2"}},
|
|
)
|
|
|
|
executor = SyncStageExecutor(db_path=db_path)
|
|
with patch.object(CatalogSyncService, "sync_playlist_row", return_value=7):
|
|
executor.process_item(item_id=item_id, worker_name="sync-1")
|
|
|
|
item = ops_repo.get_item(item_id)
|
|
|
|
self.assertEqual(ItemStatus.SUCCEEDED, item.status)
|
|
self.assertEqual(7, item.payload.get("linked_count"))
|
|
|
|
def test_sync_executor_marks_item_failed_when_hook_raises(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.executors import SyncStageExecutor
|
|
from musicdl.catalogsync.ops.models import ItemStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
from musicdl.catalogsync.services import CatalogSyncService
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
initialize_database(db_path).close()
|
|
ops_repo = OpsRepository(db_path)
|
|
|
|
stage_id = self._create_job_stage(ops_repo, stage_type="sync")
|
|
item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="playlist_sync",
|
|
item_key="playlist:22",
|
|
playlist_id=22,
|
|
payload={"playlist_row": {"id": 22, "platform": "qq", "name": "Playlist 22"}},
|
|
)
|
|
|
|
executor = SyncStageExecutor(db_path=db_path)
|
|
with patch.object(CatalogSyncService, "sync_playlist_row", side_effect=RuntimeError("sync boom")):
|
|
executor.process_item(item_id=item_id, worker_name="sync-2")
|
|
|
|
item = ops_repo.get_item(item_id)
|
|
|
|
self.assertEqual(ItemStatus.FAILED, item.status)
|
|
self.assertIn("sync boom", str(item.last_error))
|
|
|
|
def test_sync_executor_reuses_service_per_thread_and_isolates_across_threads(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.executors import SyncStageExecutor
|
|
from musicdl.catalogsync.ops.models import ItemStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
created_services: list[object] = []
|
|
|
|
class FakeSyncService:
|
|
def sync_playlist_row(self, playlist_row):
|
|
return int(playlist_row["id"])
|
|
|
|
def build_service():
|
|
service = FakeSyncService()
|
|
created_services.append(service)
|
|
return service
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
initialize_database(db_path).close()
|
|
ops_repo = OpsRepository(db_path)
|
|
|
|
stage_id = self._create_job_stage(ops_repo, stage_type="sync")
|
|
first_item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="playlist_sync",
|
|
item_key="playlist:31",
|
|
playlist_id=31,
|
|
payload={"playlist_row": {"id": 31, "platform": "qq", "name": "Playlist 31"}},
|
|
)
|
|
second_item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="playlist_sync",
|
|
item_key="playlist:32",
|
|
playlist_id=32,
|
|
payload={"playlist_row": {"id": 32, "platform": "qq", "name": "Playlist 32"}},
|
|
)
|
|
third_item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="playlist_sync",
|
|
item_key="playlist:33",
|
|
playlist_id=33,
|
|
payload={"playlist_row": {"id": 33, "platform": "qq", "name": "Playlist 33"}},
|
|
)
|
|
|
|
executor = SyncStageExecutor(db_path=db_path, service_factory=build_service)
|
|
executor.process_item(item_id=first_item_id, worker_name="sync-1")
|
|
executor.process_item(item_id=second_item_id, worker_name="sync-1")
|
|
|
|
self.assertEqual(1, len(created_services))
|
|
|
|
thread = threading.Thread(
|
|
target=executor.process_item,
|
|
kwargs={"item_id": third_item_id, "worker_name": "sync-2"},
|
|
)
|
|
thread.start()
|
|
thread.join(timeout=2)
|
|
self.assertFalse(thread.is_alive())
|
|
|
|
first_item = ops_repo.get_item(first_item_id)
|
|
second_item = ops_repo.get_item(second_item_id)
|
|
third_item = ops_repo.get_item(third_item_id)
|
|
|
|
self.assertEqual(2, len(created_services))
|
|
self.assertEqual(ItemStatus.SUCCEEDED, first_item.status)
|
|
self.assertEqual(ItemStatus.SUCCEEDED, second_item.status)
|
|
self.assertEqual(ItemStatus.SUCCEEDED, third_item.status)
|
|
self.assertEqual(31, first_item.payload.get("linked_count"))
|
|
self.assertEqual(32, second_item.payload.get("linked_count"))
|
|
self.assertEqual(33, third_item.payload.get("linked_count"))
|
|
|
|
def test_upload_executor_marks_item_failed_when_hook_result_not_succeeded(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.executors import UploadStageExecutor
|
|
from musicdl.catalogsync.ops.models import ItemStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
from musicdl.catalogsync.uploader import CatalogUploader
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
initialize_database(db_path).close()
|
|
ops_repo = OpsRepository(db_path)
|
|
|
|
stage_id = self._create_job_stage(ops_repo, stage_type="upload")
|
|
item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="file_upload",
|
|
item_key="upload:3",
|
|
payload={"upload_row": {"id": 3, "target_locator": "music/qq/song.mp3"}},
|
|
)
|
|
|
|
executor = UploadStageExecutor(db_path=db_path, backend_name="main-s3")
|
|
with patch.object(CatalogUploader, "process_upload_task_row", return_value="failed"):
|
|
executor.process_item(item_id=item_id, worker_name="upload-1")
|
|
|
|
item = ops_repo.get_item(item_id)
|
|
|
|
self.assertEqual(ItemStatus.FAILED, item.status)
|
|
self.assertIn("failed", str(item.last_error))
|
|
|
|
def test_upload_executor_marks_item_succeeded_when_hook_result_is_succeeded(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.executors import UploadStageExecutor
|
|
from musicdl.catalogsync.ops.models import ItemStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
from musicdl.catalogsync.uploader import CatalogUploader
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
initialize_database(db_path).close()
|
|
ops_repo = OpsRepository(db_path)
|
|
|
|
stage_id = self._create_job_stage(ops_repo, stage_type="upload")
|
|
item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="file_upload",
|
|
item_key="upload:33",
|
|
payload={"upload_row": {"id": 33, "target_locator": "music/qq/song-33.mp3"}},
|
|
)
|
|
|
|
executor = UploadStageExecutor(db_path=db_path, backend_name="main-s3")
|
|
with patch.object(CatalogUploader, "process_upload_task_row", return_value="succeeded"):
|
|
executor.process_item(item_id=item_id, worker_name="upload-2")
|
|
|
|
item = ops_repo.get_item(item_id)
|
|
|
|
self.assertEqual(ItemStatus.SUCCEEDED, item.status)
|
|
self.assertIsNone(item.last_error)
|
|
|
|
def test_upload_executor_marks_item_failed_when_hook_raises(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.executors import UploadStageExecutor
|
|
from musicdl.catalogsync.ops.models import ItemStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
from musicdl.catalogsync.uploader import CatalogUploader
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
initialize_database(db_path).close()
|
|
ops_repo = OpsRepository(db_path)
|
|
|
|
stage_id = self._create_job_stage(ops_repo, stage_type="upload")
|
|
item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="file_upload",
|
|
item_key="upload:44",
|
|
payload={"upload_row": {"id": 44, "target_locator": "music/qq/song-44.mp3"}},
|
|
)
|
|
|
|
executor = UploadStageExecutor(db_path=db_path, backend_name="main-s3")
|
|
with patch.object(CatalogUploader, "process_upload_task_row", side_effect=RuntimeError("upload boom")):
|
|
executor.process_item(item_id=item_id, worker_name="upload-3")
|
|
|
|
item = ops_repo.get_item(item_id)
|
|
|
|
self.assertEqual(ItemStatus.FAILED, item.status)
|
|
self.assertIn("upload boom", str(item.last_error))
|
|
|
|
def test_download_executor_marks_item_failed_when_hook_raises(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
|
|
from musicdl.catalogsync.ops.models import ItemStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
library_root = Path(tmpdir) / "library"
|
|
initialize_database(db_path, default_library_root=library_root).close()
|
|
ops_repo = OpsRepository(db_path)
|
|
|
|
stage_id = self._create_job_stage(ops_repo, stage_type="download")
|
|
item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="song_download",
|
|
item_key="song:4",
|
|
song_id=4,
|
|
payload={"row": {"id": 4, "platform": "qq", "name": "Song 4"}},
|
|
)
|
|
|
|
executor = DownloadStageExecutor(
|
|
db_path=db_path,
|
|
library_root=library_root,
|
|
download_sources=["qq"],
|
|
)
|
|
with patch(
|
|
"musicdl.catalogsync.downloader.CatalogDownloader.download_song_row",
|
|
side_effect=RuntimeError("boom"),
|
|
):
|
|
executor.process_item(item_id=item_id, worker_name="download-2")
|
|
|
|
item = ops_repo.get_item(item_id)
|
|
|
|
self.assertEqual(ItemStatus.FAILED, item.status)
|
|
self.assertIn("boom", str(item.last_error))
|
|
|
|
def test_download_executor_marks_non_music_resource_skipped_when_hook_raises(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
|
|
from musicdl.catalogsync.ops.models import ItemStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
library_root = Path(tmpdir) / "library"
|
|
initialize_database(db_path, default_library_root=library_root).close()
|
|
ops_repo = OpsRepository(db_path)
|
|
|
|
stage_id = self._create_job_stage(ops_repo, stage_type="download")
|
|
song_id = self._insert_song_row(
|
|
db_path,
|
|
platform="qq",
|
|
remote_song_id="qqtop_75_test_skip_raise",
|
|
name="QQ Toplist Audio Error",
|
|
singers="Narrator",
|
|
)
|
|
with closing(sqlite3.connect(db_path)) as conn, conn:
|
|
conn.execute(
|
|
"UPDATE songs SET metadata_json = ? WHERE id = ?",
|
|
(
|
|
json.dumps(
|
|
{
|
|
"snapshot": {
|
|
"raw_data": {
|
|
"search": {"qq_toplist_fallback": True},
|
|
}
|
|
}
|
|
},
|
|
ensure_ascii=False,
|
|
),
|
|
song_id,
|
|
),
|
|
)
|
|
item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="song_download",
|
|
item_key="song:qqtop-skip-raise",
|
|
song_id=song_id,
|
|
payload={"row": {"id": song_id, "platform": "qq", "name": "QQ Toplist Audio Error"}},
|
|
)
|
|
|
|
executor = DownloadStageExecutor(
|
|
db_path=db_path,
|
|
library_root=library_root,
|
|
download_sources=["qq"],
|
|
)
|
|
with patch(
|
|
"musicdl.catalogsync.downloader.CatalogDownloader.download_song_row",
|
|
side_effect=RuntimeError("boom"),
|
|
):
|
|
executor.process_item(item_id=item_id, worker_name="download-skip-raise")
|
|
|
|
item = ops_repo.get_item(item_id)
|
|
|
|
self.assertEqual(ItemStatus.SKIPPED, item.status)
|
|
self.assertEqual("NON_MUSIC_RESOURCE", item.last_error_code)
|
|
|
|
def test_repository_get_upload_row_for_item_backfills_skinny_payload(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
initialize_database(db_path, default_library_root=Path(tmpdir) / "library").close()
|
|
ops_repo = OpsRepository(db_path)
|
|
|
|
upload_task_id = self._insert_upload_task_bundle(
|
|
db_path,
|
|
absolute_path=str(Path(tmpdir) / "library" / "qq" / "Singer Upload" / "song-upload.mp3"),
|
|
target_locator="music/qq/Singer Upload/song-upload.mp3",
|
|
)
|
|
stage_id = self._create_job_stage(ops_repo, stage_type="upload")
|
|
item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="file_upload",
|
|
item_key=f"upload:{upload_task_id}",
|
|
payload={"upload_row": {"id": upload_task_id}},
|
|
)
|
|
|
|
row = ops_repo.get_upload_row_for_item(item_id)
|
|
|
|
self.assertEqual(upload_task_id, int(row["id"]))
|
|
self.assertEqual("music/qq/Singer Upload/song-upload.mp3", row["target_locator"])
|
|
self.assertIn("file_asset_id", row)
|
|
self.assertIn("target_backend_id", row)
|
|
self.assertIn("absolute_path", row)
|
|
|
|
def test_repository_build_download_row_backfills_missing_payload_fields_from_song(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
initialize_database(db_path).close()
|
|
ops_repo = OpsRepository(db_path)
|
|
|
|
song_id = self._insert_song_row(
|
|
db_path,
|
|
platform="qq",
|
|
remote_song_id="song-dl-1",
|
|
name="Song DL 1",
|
|
singers="Singer DL",
|
|
)
|
|
stage_id = self._create_job_stage(ops_repo, stage_type="download")
|
|
item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="song_download",
|
|
item_key="song:dl:1",
|
|
song_id=song_id,
|
|
payload={"row": {"id": song_id}},
|
|
)
|
|
|
|
row = ops_repo.build_download_row(item_id)
|
|
|
|
self.assertEqual(song_id, int(row["id"]))
|
|
self.assertEqual("qq", row["platform"])
|
|
self.assertEqual("Song DL 1", row["name"])
|
|
|
|
def test_repository_get_playlist_row_for_item_backfills_missing_payload_fields_from_playlist(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
initialize_database(db_path).close()
|
|
ops_repo = OpsRepository(db_path)
|
|
|
|
playlist_id = self._insert_playlist_row(
|
|
db_path,
|
|
platform="qq",
|
|
remote_playlist_id="playlist-sync-1",
|
|
name="Playlist Sync 1",
|
|
url="https://y.qq.com/n/ryqq/playlist/playlist-sync-1",
|
|
)
|
|
stage_id = self._create_job_stage(ops_repo, stage_type="sync")
|
|
item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="playlist_sync",
|
|
item_key="playlist:sync:1",
|
|
playlist_id=playlist_id,
|
|
payload={"playlist_row": {"id": playlist_id}},
|
|
)
|
|
|
|
row = ops_repo.get_playlist_row_for_item(item_id)
|
|
|
|
self.assertEqual(playlist_id, int(row["id"]))
|
|
self.assertEqual("qq", row["platform"])
|
|
self.assertEqual("playlist_url", row["parse_strategy"])
|
|
|
|
def test_download_executor_raises_when_mark_item_succeeded_returns_false(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
library_root = Path(tmpdir) / "library"
|
|
initialize_database(db_path, default_library_root=library_root).close()
|
|
ops_repo = OpsRepository(db_path)
|
|
stage_id = self._create_job_stage(ops_repo, stage_type="download")
|
|
item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="song_download",
|
|
item_key="song:cas:ok-to-fail",
|
|
song_id=11,
|
|
payload={"row": {"id": 11, "platform": "qq", "name": "Song CAS"}},
|
|
)
|
|
|
|
executor = DownloadStageExecutor(
|
|
db_path=db_path,
|
|
library_root=library_root,
|
|
download_sources=["qq"],
|
|
)
|
|
with patch(
|
|
"musicdl.catalogsync.downloader.CatalogDownloader.download_song_row",
|
|
return_value=True,
|
|
):
|
|
with patch.object(executor.ops_repo, "mark_item_succeeded", return_value=False):
|
|
with patch.object(executor.ops_repo, "mark_item_failed", return_value=True):
|
|
with self.assertRaises(RuntimeError):
|
|
executor.process_item(item_id=item_id, worker_name="download-cas-1")
|
|
|
|
def test_download_executor_raises_when_mark_item_failed_returns_false(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
library_root = Path(tmpdir) / "library"
|
|
initialize_database(db_path, default_library_root=library_root).close()
|
|
ops_repo = OpsRepository(db_path)
|
|
stage_id = self._create_job_stage(ops_repo, stage_type="download")
|
|
item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="song_download",
|
|
item_key="song:cas:failed",
|
|
song_id=12,
|
|
payload={"row": {"id": 12, "platform": "qq", "name": "Song CAS Failed"}},
|
|
)
|
|
|
|
executor = DownloadStageExecutor(
|
|
db_path=db_path,
|
|
library_root=library_root,
|
|
download_sources=["qq"],
|
|
)
|
|
with patch(
|
|
"musicdl.catalogsync.downloader.CatalogDownloader.download_song_row",
|
|
return_value=False,
|
|
):
|
|
with patch.object(executor.ops_repo, "mark_item_failed", return_value=False):
|
|
with self.assertRaises(RuntimeError):
|
|
executor.process_item(item_id=item_id, worker_name="download-cas-2")
|
|
|
|
def test_download_executor_marks_item_skipped_for_non_music_resource_when_no_file(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
|
|
from musicdl.catalogsync.ops.models import ItemStatus
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
library_root = Path(tmpdir) / "library"
|
|
initialize_database(db_path, default_library_root=library_root).close()
|
|
ops_repo = OpsRepository(db_path)
|
|
stage_id = self._create_job_stage(ops_repo, stage_type="download")
|
|
song_id = self._insert_song_row(
|
|
db_path,
|
|
platform="qq",
|
|
remote_song_id="qqtop_75_test_skip",
|
|
name="QQ Toplist Audio",
|
|
singers="Narrator",
|
|
)
|
|
with closing(sqlite3.connect(db_path)) as conn, conn:
|
|
conn.execute(
|
|
"UPDATE songs SET metadata_json = ? WHERE id = ?",
|
|
(
|
|
json.dumps(
|
|
{
|
|
"snapshot": {
|
|
"raw_data": {
|
|
"search": {"qq_toplist_fallback": True},
|
|
}
|
|
}
|
|
},
|
|
ensure_ascii=False,
|
|
),
|
|
song_id,
|
|
),
|
|
)
|
|
item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="song_download",
|
|
item_key="song:qqtop-skip",
|
|
song_id=song_id,
|
|
payload={"row": {"id": song_id, "platform": "qq", "name": "QQ Toplist Audio"}},
|
|
)
|
|
|
|
executor = DownloadStageExecutor(
|
|
db_path=db_path,
|
|
library_root=library_root,
|
|
download_sources=["qq"],
|
|
)
|
|
with patch(
|
|
"musicdl.catalogsync.downloader.CatalogDownloader.download_song_row",
|
|
return_value=False,
|
|
):
|
|
executor.process_item(item_id=item_id, worker_name="download-skip-1")
|
|
|
|
item = ops_repo.get_item(item_id)
|
|
|
|
self.assertEqual(ItemStatus.SKIPPED, item.status)
|
|
self.assertIn("非音乐资源", str(item.last_error))
|
|
|
|
def test_download_executor_raises_when_mark_item_skipped_returns_false(self):
|
|
from musicdl.catalogsync.db import initialize_database
|
|
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
|
|
from musicdl.catalogsync.ops.repository import OpsRepository
|
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
|
|
db_path = Path(tmpdir) / "catalogsync.db"
|
|
library_root = Path(tmpdir) / "library"
|
|
initialize_database(db_path, default_library_root=library_root).close()
|
|
ops_repo = OpsRepository(db_path)
|
|
stage_id = self._create_job_stage(ops_repo, stage_type="download")
|
|
song_id = self._insert_song_row(
|
|
db_path,
|
|
platform="qq",
|
|
remote_song_id="qqtop_75_test_skip_cas",
|
|
name="QQ Toplist Audio CAS",
|
|
singers="Narrator",
|
|
)
|
|
item_id = ops_repo.create_item(
|
|
job_stage_id=stage_id,
|
|
item_type="song_download",
|
|
item_key="song:qqtop-skip-cas",
|
|
song_id=song_id,
|
|
payload={"row": {"id": song_id, "platform": "qq", "name": "QQ Toplist Audio CAS"}},
|
|
)
|
|
|
|
executor = DownloadStageExecutor(
|
|
db_path=db_path,
|
|
library_root=library_root,
|
|
download_sources=["qq"],
|
|
)
|
|
with patch(
|
|
"musicdl.catalogsync.downloader.CatalogDownloader.download_song_row",
|
|
return_value=False,
|
|
):
|
|
with patch.object(executor.ops_repo, "mark_item_skipped", return_value=False):
|
|
with self.assertRaises(RuntimeError):
|
|
executor.process_item(item_id=item_id, worker_name="download-skip-cas")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|