Files

902 lines
39 KiB
Python

import json
from contextlib import closing
from queue import Queue
import tempfile
import threading
import unittest
import sqlite3
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import patch
class StageExecutorTests(unittest.TestCase):
def _create_job_stage(self, repo, stage_type: str = "download") -> int:
job_id = repo.create_job(job_type="catalog_sync", config_snapshot={})
return repo.create_stage(job_run_id=job_id, stage_type=stage_type, seq_no=1)
def _insert_song_row(self, db_path: Path, *, platform: str, remote_song_id: str, name: str, singers: str) -> int:
with closing(sqlite3.connect(db_path)) as conn, conn:
cursor = conn.execute(
"""
INSERT INTO songs (platform, remote_song_id, name, singers, ext, file_size_bytes)
VALUES (?, ?, ?, ?, ?, ?)
""",
(platform, remote_song_id, name, singers, "mp3", 128),
)
return int(cursor.lastrowid)
def _insert_playlist_row(self, db_path: Path, *, platform: str, remote_playlist_id: str, name: str, url: str) -> int:
with closing(sqlite3.connect(db_path)) as conn, conn:
cursor = conn.execute(
"""
INSERT INTO playlists (platform, remote_playlist_id, name, url, parse_strategy)
VALUES (?, ?, ?, ?, ?)
""",
(platform, remote_playlist_id, name, url, "playlist_url"),
)
return int(cursor.lastrowid)
def _insert_upload_task_bundle(self, db_path: Path, *, absolute_path: str, target_locator: str) -> int:
with closing(sqlite3.connect(db_path)) as conn, conn:
song_id = int(
conn.execute(
"""
INSERT INTO songs (platform, remote_song_id, name, singers, ext, file_size_bytes)
VALUES (?, ?, ?, ?, ?, ?)
""",
("qq", "song-upload", "Song Upload", "Singer Upload", "mp3", 64),
).lastrowid
)
file_asset_id = int(
conn.execute(
"INSERT INTO file_assets (song_id, ext, file_size_bytes) VALUES (?, ?, ?)",
(song_id, "mp3", 64),
).lastrowid
)
source_location_id = int(
conn.execute(
"""
INSERT INTO file_locations (
file_asset_id, backend_id, container_name, locator, absolute_path, status, is_primary
)
VALUES (?, ?, ?, ?, ?, 'active', 1)
""",
(file_asset_id, 1, "library", "qq/Singer Upload/song-upload.mp3", absolute_path),
).lastrowid
)
task_id = int(
conn.execute(
"""
INSERT INTO upload_tasks (
file_asset_id, source_location_id, target_backend_id, target_container_name, target_locator, status
)
VALUES (?, ?, ?, ?, ?, 'pending')
""",
(file_asset_id, source_location_id, 2, "bucket-a", target_locator),
).lastrowid
)
return task_id
def test_download_executor_marks_item_succeeded_when_hook_returns_true(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
from musicdl.catalogsync.ops.models import ItemStatus
from musicdl.catalogsync.ops.repository import OpsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
ops_repo = OpsRepository(db_path)
stage_id = self._create_job_stage(ops_repo, stage_type="download")
item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:1",
song_id=1,
payload={"row": {"id": 1, "platform": "qq", "name": "Song 1"}},
)
executor = DownloadStageExecutor(
db_path=db_path,
library_root=library_root,
download_sources=["qq"],
)
with patch(
"musicdl.catalogsync.downloader.CatalogDownloader.download_song_row",
return_value=True,
):
executor.process_item(item_id=item_id, worker_name="download-1")
item = ops_repo.get_item(item_id)
self.assertEqual(ItemStatus.SUCCEEDED, item.status)
def test_download_executor_skips_redownload_when_song_already_exists_locally(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
from musicdl.catalogsync.ops.models import ItemStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.repository import CatalogRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
ops_repo = OpsRepository(db_path)
catalog_repo = CatalogRepository(db_path)
song_id = self._insert_song_row(
db_path,
platform="qq",
remote_song_id="song-existing-local",
name="Song Existing Local",
singers="Singer Existing",
)
existing_relative_path = "qq/Singer Existing/song-existing-local.mp3"
existing_absolute_path = library_root / existing_relative_path
existing_absolute_path.parent.mkdir(parents=True, exist_ok=True)
existing_absolute_path.write_bytes(b"x" * 128)
backend_id = catalog_repo.ensure_local_backend(
library_root,
name="default-local",
is_default=True,
)
catalog_repo.record_local_file(
song_id=song_id,
backend_id=backend_id,
relative_path=existing_relative_path,
file_size_bytes=128,
ext="mp3",
quality_label="standard",
)
stage_id = self._create_job_stage(ops_repo, stage_type="download")
item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key=f"song:{song_id}",
song_id=song_id,
payload={"row": {"id": song_id, "platform": "qq", "name": "Song Existing Local"}},
)
executor = DownloadStageExecutor(
db_path=db_path,
library_root=library_root,
download_sources=["qq"],
)
with patch(
"musicdl.catalogsync.downloader.CatalogDownloader.download_song_row",
side_effect=AssertionError("download should not be called for existing local file"),
):
executor.process_item(item_id=item_id, worker_name="download-existing-local")
item = ops_repo.get_item(item_id)
self.assertEqual(ItemStatus.SUCCEEDED, item.status)
def test_download_executor_process_resolve_item_marks_failed_when_resolution_returns_none(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
from musicdl.catalogsync.ops.models import ItemStatus
from musicdl.catalogsync.ops.repository import OpsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
ops_repo = OpsRepository(db_path)
song_id = self._insert_song_row(
db_path,
platform="qq",
remote_song_id="song-resolve-none",
name="Song Resolve None",
singers="Singer Resolve",
)
stage_id = self._create_job_stage(ops_repo, stage_type="download")
item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key=f"song:{song_id}",
song_id=song_id,
payload={"row": {"id": song_id}},
)
executor = DownloadStageExecutor(
db_path=db_path,
library_root=library_root,
download_sources=["qq"],
)
ready_queue: Queue = Queue()
with patch.object(executor.downloader, "resolve_song_row", return_value=None):
executor.process_resolve_item(
item_id=item_id,
worker_name="resolve-1",
ready_queue=ready_queue,
)
item = ops_repo.get_item(item_id)
self.assertEqual(ItemStatus.FAILED, item.status)
self.assertTrue(ready_queue.empty())
def test_download_executor_process_resolve_item_enqueues_resolved_payload(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
from musicdl.catalogsync.ops.repository import OpsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
ops_repo = OpsRepository(db_path)
song_id = self._insert_song_row(
db_path,
platform="qq",
remote_song_id="song-resolve-ok",
name="Song Resolve OK",
singers="Singer Resolve",
)
stage_id = self._create_job_stage(ops_repo, stage_type="download")
item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key=f"song:{song_id}",
song_id=song_id,
playlist_id=33,
payload={"row": {"id": song_id, "playlist_id": 33}},
)
executor = DownloadStageExecutor(
db_path=db_path,
library_root=library_root,
download_sources=["qq"],
)
ready_queue: Queue = Queue()
resolved_payload = SimpleNamespace(
row={"id": song_id, "playlist_id": 33},
display_text="Song Resolve OK / Singer Resolve",
)
with patch.object(executor.downloader, "resolve_song_row", return_value=resolved_payload):
executor.process_resolve_item(
item_id=item_id,
worker_name="resolve-1",
ready_queue=ready_queue,
)
queued = ready_queue.get_nowait()
self.assertEqual(item_id, queued.item_id)
self.assertEqual(33, queued.playlist_id)
self.assertIs(resolved_payload, queued.resolved_payload)
def test_download_executor_process_download_task_marks_item_succeeded(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
from musicdl.catalogsync.ops.models import ItemStatus
from musicdl.catalogsync.ops.repository import OpsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
ops_repo = OpsRepository(db_path)
song_id = self._insert_song_row(
db_path,
platform="qq",
remote_song_id="song-download-task",
name="Song Download Task",
singers="Singer Download",
)
stage_id = self._create_job_stage(ops_repo, stage_type="download")
item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key=f"song:{song_id}",
song_id=song_id,
payload={"row": {"id": song_id}},
)
executor = DownloadStageExecutor(
db_path=db_path,
library_root=library_root,
download_sources=["qq"],
)
ops_repo.claim_item(item_id=item_id, worker_name="resolve-1")
resolved_task = SimpleNamespace(
item_id=item_id,
playlist_id=None,
row={"id": song_id},
resolved_payload=SimpleNamespace(row={"id": song_id}, display_text="Song Download Task / Singer Download"),
)
with patch.object(executor.downloader, "download_resolved_song", return_value=True):
executor.process_download_task(
task=resolved_task,
worker_name="download-1",
)
item = ops_repo.get_item(item_id)
self.assertEqual(ItemStatus.SUCCEEDED, item.status)
def test_sync_executor_marks_item_succeeded_and_records_linked_count(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.executors import SyncStageExecutor
from musicdl.catalogsync.ops.models import ItemStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.services import CatalogSyncService
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
ops_repo = OpsRepository(db_path)
stage_id = self._create_job_stage(ops_repo, stage_type="sync")
item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="playlist_sync",
item_key="playlist:2",
playlist_id=2,
payload={"playlist_row": {"id": 2, "platform": "qq", "name": "Playlist 2"}},
)
executor = SyncStageExecutor(db_path=db_path)
with patch.object(CatalogSyncService, "sync_playlist_row", return_value=7):
executor.process_item(item_id=item_id, worker_name="sync-1")
item = ops_repo.get_item(item_id)
self.assertEqual(ItemStatus.SUCCEEDED, item.status)
self.assertEqual(7, item.payload.get("linked_count"))
def test_sync_executor_marks_item_failed_when_hook_raises(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.executors import SyncStageExecutor
from musicdl.catalogsync.ops.models import ItemStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.services import CatalogSyncService
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
ops_repo = OpsRepository(db_path)
stage_id = self._create_job_stage(ops_repo, stage_type="sync")
item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="playlist_sync",
item_key="playlist:22",
playlist_id=22,
payload={"playlist_row": {"id": 22, "platform": "qq", "name": "Playlist 22"}},
)
executor = SyncStageExecutor(db_path=db_path)
with patch.object(CatalogSyncService, "sync_playlist_row", side_effect=RuntimeError("sync boom")):
executor.process_item(item_id=item_id, worker_name="sync-2")
item = ops_repo.get_item(item_id)
self.assertEqual(ItemStatus.FAILED, item.status)
self.assertIn("sync boom", str(item.last_error))
def test_sync_executor_reuses_service_per_thread_and_isolates_across_threads(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.executors import SyncStageExecutor
from musicdl.catalogsync.ops.models import ItemStatus
from musicdl.catalogsync.ops.repository import OpsRepository
created_services: list[object] = []
class FakeSyncService:
def sync_playlist_row(self, playlist_row):
return int(playlist_row["id"])
def build_service():
service = FakeSyncService()
created_services.append(service)
return service
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
ops_repo = OpsRepository(db_path)
stage_id = self._create_job_stage(ops_repo, stage_type="sync")
first_item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="playlist_sync",
item_key="playlist:31",
playlist_id=31,
payload={"playlist_row": {"id": 31, "platform": "qq", "name": "Playlist 31"}},
)
second_item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="playlist_sync",
item_key="playlist:32",
playlist_id=32,
payload={"playlist_row": {"id": 32, "platform": "qq", "name": "Playlist 32"}},
)
third_item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="playlist_sync",
item_key="playlist:33",
playlist_id=33,
payload={"playlist_row": {"id": 33, "platform": "qq", "name": "Playlist 33"}},
)
executor = SyncStageExecutor(db_path=db_path, service_factory=build_service)
executor.process_item(item_id=first_item_id, worker_name="sync-1")
executor.process_item(item_id=second_item_id, worker_name="sync-1")
self.assertEqual(1, len(created_services))
thread = threading.Thread(
target=executor.process_item,
kwargs={"item_id": third_item_id, "worker_name": "sync-2"},
)
thread.start()
thread.join(timeout=2)
self.assertFalse(thread.is_alive())
first_item = ops_repo.get_item(first_item_id)
second_item = ops_repo.get_item(second_item_id)
third_item = ops_repo.get_item(third_item_id)
self.assertEqual(2, len(created_services))
self.assertEqual(ItemStatus.SUCCEEDED, first_item.status)
self.assertEqual(ItemStatus.SUCCEEDED, second_item.status)
self.assertEqual(ItemStatus.SUCCEEDED, third_item.status)
self.assertEqual(31, first_item.payload.get("linked_count"))
self.assertEqual(32, second_item.payload.get("linked_count"))
self.assertEqual(33, third_item.payload.get("linked_count"))
def test_upload_executor_marks_item_failed_when_hook_result_not_succeeded(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.executors import UploadStageExecutor
from musicdl.catalogsync.ops.models import ItemStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.uploader import CatalogUploader
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
ops_repo = OpsRepository(db_path)
stage_id = self._create_job_stage(ops_repo, stage_type="upload")
item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="file_upload",
item_key="upload:3",
payload={"upload_row": {"id": 3, "target_locator": "music/qq/song.mp3"}},
)
executor = UploadStageExecutor(db_path=db_path, backend_name="main-s3")
with patch.object(CatalogUploader, "process_upload_task_row", return_value="failed"):
executor.process_item(item_id=item_id, worker_name="upload-1")
item = ops_repo.get_item(item_id)
self.assertEqual(ItemStatus.FAILED, item.status)
self.assertIn("failed", str(item.last_error))
def test_upload_executor_marks_item_succeeded_when_hook_result_is_succeeded(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.executors import UploadStageExecutor
from musicdl.catalogsync.ops.models import ItemStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.uploader import CatalogUploader
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
ops_repo = OpsRepository(db_path)
stage_id = self._create_job_stage(ops_repo, stage_type="upload")
item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="file_upload",
item_key="upload:33",
payload={"upload_row": {"id": 33, "target_locator": "music/qq/song-33.mp3"}},
)
executor = UploadStageExecutor(db_path=db_path, backend_name="main-s3")
with patch.object(CatalogUploader, "process_upload_task_row", return_value="succeeded"):
executor.process_item(item_id=item_id, worker_name="upload-2")
item = ops_repo.get_item(item_id)
self.assertEqual(ItemStatus.SUCCEEDED, item.status)
self.assertIsNone(item.last_error)
def test_upload_executor_marks_item_failed_when_hook_raises(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.executors import UploadStageExecutor
from musicdl.catalogsync.ops.models import ItemStatus
from musicdl.catalogsync.ops.repository import OpsRepository
from musicdl.catalogsync.uploader import CatalogUploader
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
ops_repo = OpsRepository(db_path)
stage_id = self._create_job_stage(ops_repo, stage_type="upload")
item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="file_upload",
item_key="upload:44",
payload={"upload_row": {"id": 44, "target_locator": "music/qq/song-44.mp3"}},
)
executor = UploadStageExecutor(db_path=db_path, backend_name="main-s3")
with patch.object(CatalogUploader, "process_upload_task_row", side_effect=RuntimeError("upload boom")):
executor.process_item(item_id=item_id, worker_name="upload-3")
item = ops_repo.get_item(item_id)
self.assertEqual(ItemStatus.FAILED, item.status)
self.assertIn("upload boom", str(item.last_error))
def test_download_executor_marks_item_failed_when_hook_raises(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
from musicdl.catalogsync.ops.models import ItemStatus
from musicdl.catalogsync.ops.repository import OpsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
ops_repo = OpsRepository(db_path)
stage_id = self._create_job_stage(ops_repo, stage_type="download")
item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:4",
song_id=4,
payload={"row": {"id": 4, "platform": "qq", "name": "Song 4"}},
)
executor = DownloadStageExecutor(
db_path=db_path,
library_root=library_root,
download_sources=["qq"],
)
with patch(
"musicdl.catalogsync.downloader.CatalogDownloader.download_song_row",
side_effect=RuntimeError("boom"),
):
executor.process_item(item_id=item_id, worker_name="download-2")
item = ops_repo.get_item(item_id)
self.assertEqual(ItemStatus.FAILED, item.status)
self.assertIn("boom", str(item.last_error))
def test_download_executor_marks_non_music_resource_skipped_when_hook_raises(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
from musicdl.catalogsync.ops.models import ItemStatus
from musicdl.catalogsync.ops.repository import OpsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
ops_repo = OpsRepository(db_path)
stage_id = self._create_job_stage(ops_repo, stage_type="download")
song_id = self._insert_song_row(
db_path,
platform="qq",
remote_song_id="qqtop_75_test_skip_raise",
name="QQ Toplist Audio Error",
singers="Narrator",
)
with closing(sqlite3.connect(db_path)) as conn, conn:
conn.execute(
"UPDATE songs SET metadata_json = ? WHERE id = ?",
(
json.dumps(
{
"snapshot": {
"raw_data": {
"search": {"qq_toplist_fallback": True},
}
}
},
ensure_ascii=False,
),
song_id,
),
)
item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:qqtop-skip-raise",
song_id=song_id,
payload={"row": {"id": song_id, "platform": "qq", "name": "QQ Toplist Audio Error"}},
)
executor = DownloadStageExecutor(
db_path=db_path,
library_root=library_root,
download_sources=["qq"],
)
with patch(
"musicdl.catalogsync.downloader.CatalogDownloader.download_song_row",
side_effect=RuntimeError("boom"),
):
executor.process_item(item_id=item_id, worker_name="download-skip-raise")
item = ops_repo.get_item(item_id)
self.assertEqual(ItemStatus.SKIPPED, item.status)
self.assertEqual("NON_MUSIC_RESOURCE", item.last_error_code)
def test_repository_get_upload_row_for_item_backfills_skinny_payload(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.repository import OpsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path, default_library_root=Path(tmpdir) / "library").close()
ops_repo = OpsRepository(db_path)
upload_task_id = self._insert_upload_task_bundle(
db_path,
absolute_path=str(Path(tmpdir) / "library" / "qq" / "Singer Upload" / "song-upload.mp3"),
target_locator="music/qq/Singer Upload/song-upload.mp3",
)
stage_id = self._create_job_stage(ops_repo, stage_type="upload")
item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="file_upload",
item_key=f"upload:{upload_task_id}",
payload={"upload_row": {"id": upload_task_id}},
)
row = ops_repo.get_upload_row_for_item(item_id)
self.assertEqual(upload_task_id, int(row["id"]))
self.assertEqual("music/qq/Singer Upload/song-upload.mp3", row["target_locator"])
self.assertIn("file_asset_id", row)
self.assertIn("target_backend_id", row)
self.assertIn("absolute_path", row)
def test_repository_build_download_row_backfills_missing_payload_fields_from_song(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.repository import OpsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
ops_repo = OpsRepository(db_path)
song_id = self._insert_song_row(
db_path,
platform="qq",
remote_song_id="song-dl-1",
name="Song DL 1",
singers="Singer DL",
)
stage_id = self._create_job_stage(ops_repo, stage_type="download")
item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:dl:1",
song_id=song_id,
payload={"row": {"id": song_id}},
)
row = ops_repo.build_download_row(item_id)
self.assertEqual(song_id, int(row["id"]))
self.assertEqual("qq", row["platform"])
self.assertEqual("Song DL 1", row["name"])
def test_repository_get_playlist_row_for_item_backfills_missing_payload_fields_from_playlist(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.repository import OpsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
initialize_database(db_path).close()
ops_repo = OpsRepository(db_path)
playlist_id = self._insert_playlist_row(
db_path,
platform="qq",
remote_playlist_id="playlist-sync-1",
name="Playlist Sync 1",
url="https://y.qq.com/n/ryqq/playlist/playlist-sync-1",
)
stage_id = self._create_job_stage(ops_repo, stage_type="sync")
item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="playlist_sync",
item_key="playlist:sync:1",
playlist_id=playlist_id,
payload={"playlist_row": {"id": playlist_id}},
)
row = ops_repo.get_playlist_row_for_item(item_id)
self.assertEqual(playlist_id, int(row["id"]))
self.assertEqual("qq", row["platform"])
self.assertEqual("playlist_url", row["parse_strategy"])
def test_download_executor_raises_when_mark_item_succeeded_returns_false(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
from musicdl.catalogsync.ops.repository import OpsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
ops_repo = OpsRepository(db_path)
stage_id = self._create_job_stage(ops_repo, stage_type="download")
item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:cas:ok-to-fail",
song_id=11,
payload={"row": {"id": 11, "platform": "qq", "name": "Song CAS"}},
)
executor = DownloadStageExecutor(
db_path=db_path,
library_root=library_root,
download_sources=["qq"],
)
with patch(
"musicdl.catalogsync.downloader.CatalogDownloader.download_song_row",
return_value=True,
):
with patch.object(executor.ops_repo, "mark_item_succeeded", return_value=False):
with patch.object(executor.ops_repo, "mark_item_failed", return_value=True):
with self.assertRaises(RuntimeError):
executor.process_item(item_id=item_id, worker_name="download-cas-1")
def test_download_executor_raises_when_mark_item_failed_returns_false(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
from musicdl.catalogsync.ops.repository import OpsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
ops_repo = OpsRepository(db_path)
stage_id = self._create_job_stage(ops_repo, stage_type="download")
item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:cas:failed",
song_id=12,
payload={"row": {"id": 12, "platform": "qq", "name": "Song CAS Failed"}},
)
executor = DownloadStageExecutor(
db_path=db_path,
library_root=library_root,
download_sources=["qq"],
)
with patch(
"musicdl.catalogsync.downloader.CatalogDownloader.download_song_row",
return_value=False,
):
with patch.object(executor.ops_repo, "mark_item_failed", return_value=False):
with self.assertRaises(RuntimeError):
executor.process_item(item_id=item_id, worker_name="download-cas-2")
def test_download_executor_marks_item_skipped_for_non_music_resource_when_no_file(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
from musicdl.catalogsync.ops.models import ItemStatus
from musicdl.catalogsync.ops.repository import OpsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
ops_repo = OpsRepository(db_path)
stage_id = self._create_job_stage(ops_repo, stage_type="download")
song_id = self._insert_song_row(
db_path,
platform="qq",
remote_song_id="qqtop_75_test_skip",
name="QQ Toplist Audio",
singers="Narrator",
)
with closing(sqlite3.connect(db_path)) as conn, conn:
conn.execute(
"UPDATE songs SET metadata_json = ? WHERE id = ?",
(
json.dumps(
{
"snapshot": {
"raw_data": {
"search": {"qq_toplist_fallback": True},
}
}
},
ensure_ascii=False,
),
song_id,
),
)
item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:qqtop-skip",
song_id=song_id,
payload={"row": {"id": song_id, "platform": "qq", "name": "QQ Toplist Audio"}},
)
executor = DownloadStageExecutor(
db_path=db_path,
library_root=library_root,
download_sources=["qq"],
)
with patch(
"musicdl.catalogsync.downloader.CatalogDownloader.download_song_row",
return_value=False,
):
executor.process_item(item_id=item_id, worker_name="download-skip-1")
item = ops_repo.get_item(item_id)
self.assertEqual(ItemStatus.SKIPPED, item.status)
self.assertIn("非音乐资源", str(item.last_error))
def test_download_executor_raises_when_mark_item_skipped_returns_false(self):
from musicdl.catalogsync.db import initialize_database
from musicdl.catalogsync.ops.executors import DownloadStageExecutor
from musicdl.catalogsync.ops.repository import OpsRepository
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
library_root = Path(tmpdir) / "library"
initialize_database(db_path, default_library_root=library_root).close()
ops_repo = OpsRepository(db_path)
stage_id = self._create_job_stage(ops_repo, stage_type="download")
song_id = self._insert_song_row(
db_path,
platform="qq",
remote_song_id="qqtop_75_test_skip_cas",
name="QQ Toplist Audio CAS",
singers="Narrator",
)
item_id = ops_repo.create_item(
job_stage_id=stage_id,
item_type="song_download",
item_key="song:qqtop-skip-cas",
song_id=song_id,
payload={"row": {"id": song_id, "platform": "qq", "name": "QQ Toplist Audio CAS"}},
)
executor = DownloadStageExecutor(
db_path=db_path,
library_root=library_root,
download_sources=["qq"],
)
with patch(
"musicdl.catalogsync.downloader.CatalogDownloader.download_song_row",
return_value=False,
):
with patch.object(executor.ops_repo, "mark_item_skipped", return_value=False):
with self.assertRaises(RuntimeError):
executor.process_item(item_id=item_id, worker_name="download-skip-cas")
if __name__ == "__main__":
unittest.main()