16 KiB
Object Storage Upload Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Add an S3-compatible object storage upload pipeline to musicdl.catalogsync, persist remote locations and backend presence, and expose it through dedicated CLI commands while keeping the existing local download workflow intact.
Architecture: Extend the SQLite schema and repository so upload state is queue-driven and derived from the existing file_assets plus file_locations model. Add a focused uploader.py module that plans missing uploads, resolves credentials from environment variables, uploads to an object backend with limited concurrency, and records remote locations plus summary presence rows.
Tech Stack: Python stdlib (json, os, pathlib, threading, concurrent.futures), sqlite3, click, boto3, existing musicdl.catalogsync modules, unittest
Task 1: Extend schema and repository for object storage uploads
Files:
-
Modify:
musicdl/catalogsync/db.py -
Modify:
musicdl/catalogsync/repository.py -
Modify:
tests/catalogsync/test_db.py -
Step 1: Write the failing schema and repository tests
class DatabaseSchemaTests(unittest.TestCase):
def test_initialize_database_creates_upload_tables(self):
from musicdl.catalogsync.db import initialize_database
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
db_path = Path(tmpdir) / "catalogsync.db"
conn = initialize_database(db_path)
conn.close()
with closing(sqlite3.connect(db_path)) as verify_conn:
tables = {
row[0]
for row in verify_conn.execute(
"SELECT name FROM sqlite_master WHERE type = 'table'"
).fetchall()
}
self.assertIn("song_backend_presence", tables)
self.assertIn("upload_tasks", tables)
- Step 2: Run the focused schema tests to verify they fail
Run: python -m unittest tests.catalogsync.test_db -v
Expected: FAIL because song_backend_presence and upload_tasks do not exist yet.
- Step 3: Implement schema, backend upsert, presence refresh, and upload queue helpers
REQUIRED_TABLES = {
"playlist_pools",
"playlists",
"pool_playlists",
"artist_pools",
"artists",
"pool_artists",
"songs",
"playlist_songs",
"artist_songs",
"storage_backends",
"file_assets",
"file_locations",
"download_tasks",
"song_backend_presence",
"upload_tasks",
}
def upsert_object_storage_backend(
self,
name: str,
container_name: str,
endpoint: str,
region: str | None,
base_prefix: str | None,
credential_env_prefix: str,
addressing_style: str | None = None,
public_base_url: str | None = None,
) -> int:
config = {
"endpoint": endpoint,
"region": region,
"base_prefix": base_prefix,
"addressing_style": addressing_style,
"public_base_url": public_base_url,
"credential_env_prefix": credential_env_prefix,
}
return self._upsert_backend_row(name=name, backend_type="object_storage", container_name=container_name, config=config)
def get_backend_by_name(self, name: str) -> sqlite3.Row | None:
return self._fetchone("SELECT * FROM storage_backends WHERE name = ?", (name,))
def record_remote_file(
self,
file_asset_id: int,
backend_id: int,
container_name: str,
locator: str,
public_url: str | None,
download_url: str | None,
) -> int:
return self._execute(
"""
INSERT INTO file_locations (
file_asset_id, backend_id, container_name, locator, absolute_path,
public_url, download_url, status, is_primary
) VALUES (?, ?, ?, ?, NULL, ?, ?, 'active', 0)
ON CONFLICT(file_asset_id, backend_id, locator) DO UPDATE SET
public_url = excluded.public_url,
download_url = excluded.download_url,
status = excluded.status,
updated_at = CURRENT_TIMESTAMP
""",
(file_asset_id, backend_id, container_name, locator, public_url, download_url),
)
def refresh_song_backend_presence(self, song_id: int, backend_id: int) -> None:
self._execute_presence_refresh(song_id=song_id, backend_id=backend_id)
def enqueue_upload_task(
self,
file_asset_id: int,
source_location_id: int,
target_backend_id: int,
target_container_name: str,
target_locator: str,
) -> int:
return self._execute(
"""
INSERT INTO upload_tasks (
file_asset_id, source_location_id, target_backend_id, target_container_name, target_locator
) VALUES (?, ?, ?, ?, ?)
ON CONFLICT(file_asset_id, target_backend_id, target_locator) DO NOTHING
""",
(file_asset_id, source_location_id, target_backend_id, target_container_name, target_locator),
)
def list_pending_upload_tasks(self, target_backend_id: int, limit: int | None = None) -> list[sqlite3.Row]:
return self._fetchall(
"""
SELECT ut.*, fl.absolute_path, fa.song_id
FROM upload_tasks ut
JOIN file_locations fl ON fl.id = ut.source_location_id
JOIN file_assets fa ON fa.id = ut.file_asset_id
WHERE ut.target_backend_id = ? AND ut.status IN ('pending', 'failed')
ORDER BY ut.id ASC
LIMIT COALESCE(?, -1)
""",
(target_backend_id, limit),
)
def mark_upload_task_status(self, task_id: int, status: str, last_error: str | None = None) -> None:
self._execute(
"""
UPDATE upload_tasks
SET status = ?, last_error = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(status, last_error, task_id),
)
- Step 4: Re-run the focused schema tests
Run: python -m unittest tests.catalogsync.test_db -v
Expected: PASS
- Step 5: Commit
git add musicdl/catalogsync/db.py musicdl/catalogsync/repository.py tests/catalogsync/test_db.py
git commit -m "feat: add upload queue schema and repository helpers"
Task 2: Add the object storage uploader module with limited concurrency
Files:
-
Create:
musicdl/catalogsync/uploader.py -
Modify:
tests/catalogsync/test_services.py -
Step 1: Write failing uploader tests
class ObjectStorageUploadTests(unittest.TestCase):
def test_upload_runner_records_remote_location_and_presence(self):
uploader = CatalogUploader(repository=repo, worker_count=2)
backend_id = repo.upsert_object_storage_backend(
name="main-s3",
container_name="music-bucket",
endpoint="https://s3.example.com",
region="auto",
base_prefix="music",
credential_env_prefix="CATALOGSYNC_MAIN_S3",
)
task_count = uploader.enqueue_missing_uploads(backend_name="main-s3")
self.assertEqual(1, task_count)
with patch("musicdl.catalogsync.uploader.build_s3_client", return_value=fake_client):
summary = uploader.run(backend_name="main-s3")
self.assertEqual(1, summary["succeeded"])
self.assertTrue(repo.song_has_active_backend_file(song_id, backend_id))
- Step 2: Run the focused uploader tests to verify they fail
Run: python -m unittest tests.catalogsync.test_services.ObjectStorageUploadTests -v
Expected: FAIL because musicdl.catalogsync.uploader and upload repository helpers do not exist yet.
- Step 3: Implement the upload planner and S3-compatible runner
class CatalogUploader:
def enqueue_missing_uploads(self, backend_name: str, song_ids: list[int] | None = None) -> int:
backend = self.repository.get_backend_by_name(backend_name)
candidates = self.repository.list_missing_object_upload_candidates(int(backend["id"]), song_ids=song_ids)
return sum(
1
for row in candidates
if self.repository.enqueue_upload_task(
file_asset_id=int(row["file_asset_id"]),
source_location_id=int(row["source_location_id"]),
target_backend_id=int(backend["id"]),
target_container_name=backend["container_name"],
target_locator=row["target_locator"],
)
)
def run(self, backend_name: str, limit: int | None = None) -> dict[str, int]:
tasks = self.repository.list_pending_upload_tasks(target_backend_id=backend_id, limit=limit)
return self._run_tasks(tasks, backend)
class S3CompatibleUploader:
def upload_file(self, local_path: Path, container_name: str, locator: str) -> dict[str, str | None]:
self.client.upload_file(str(local_path), container_name, locator, ExtraArgs=extra_args or None)
return {"public_url": public_url, "download_url": None}
- Step 4: Re-run the focused uploader tests
Run: python -m unittest tests.catalogsync.test_services.ObjectStorageUploadTests -v
Expected: PASS
- Step 5: Commit
git add musicdl/catalogsync/uploader.py tests/catalogsync/test_services.py
git commit -m "feat: add s3 compatible uploader"
Task 3: Expose backend registration and upload execution through the CLI
Files:
-
Modify:
musicdl/catalogsync/cli.py -
Modify:
tests/catalogsync/test_cli.py -
Modify:
setup.py -
Modify:
requirements.txt -
Step 1: Write failing CLI tests for backend registration and upload
def test_register_object_backend_command_wires_application_method(self):
from musicdl.catalogsync.cli import cli
with patch("musicdl.catalogsync.cli.CatalogSyncApplication") as app_cls:
result = CliRunner().invoke(
cli,
[
"register-object-backend",
"--db", str(db_path),
"--name", "main-s3",
"--bucket", "music-bucket",
"--endpoint", "https://s3.example.com",
"--region", "auto",
"--base-prefix", "music",
"--credential-env-prefix", "CATALOGSYNC_MAIN_S3",
],
)
self.assertEqual(0, result.exit_code, msg=result.output)
app_cls.return_value.register_object_backend.assert_called_once()
- Step 2: Run the focused CLI tests to verify they fail
Run: python -m unittest tests.catalogsync.test_cli -v
Expected: FAIL because the new commands and application methods do not exist yet.
- Step 3: Add CLI methods and commands
class CatalogSyncApplication:
def register_object_backend(self, **kwargs):
return self.repository.upsert_object_storage_backend(**kwargs)
def upload_files(self, backend_name: str, workers: int = 4, limit: int | None = None, enqueue_only: bool = False):
uploader = CatalogUploader(self.repository, worker_count=workers)
queued = uploader.enqueue_missing_uploads(backend_name=backend_name)
return {"queued": queued} if enqueue_only else uploader.run(backend_name=backend_name, limit=limit)
@cli.command("register-object-backend")
@click.option("--name", required=True)
@click.option("--bucket", "container_name", required=True)
@click.option("--endpoint", required=True)
@click.option("--workers", type=int, default=4, show_default=True)
@click.option("--enqueue-only/--run", default=False, show_default=True)
def upload_command(db_path: str, library_root: str | None, backend_name: str, workers: int, limit: int | None, enqueue_only: bool):
app = CatalogSyncApplication(db_path=db_path, library_root=library_root)
result = app.upload_files(
backend_name=backend_name,
workers=workers,
limit=limit,
enqueue_only=enqueue_only,
)
click.echo(result)
- Step 4: Re-run the focused CLI tests
Run: python -m unittest tests.catalogsync.test_cli -v
Expected: PASS
- Step 5: Commit
git add musicdl/catalogsync/cli.py tests/catalogsync/test_cli.py setup.py requirements.txt
git commit -m "feat: add upload cli commands"
Task 4: Add bounded concurrency to download and upload execution
Files:
-
Modify:
musicdl/catalogsync/downloader.py -
Modify:
musicdl/catalogsync/uploader.py -
Modify:
tests/catalogsync/test_services.py -
Step 1: Write failing concurrency and disk-switch tests
def test_catalog_downloader_reuses_new_root_after_space_prompt(self):
downloader = CatalogDownloader(repository=repo, worker_count=2)
with patch("builtins.input", side_effect=[str(second_root)]):
count = downloader.download_pending(library_root=first_root, limit=2)
self.assertEqual(2, count)
def test_catalog_uploader_uses_bounded_workers(self):
uploader = CatalogUploader(repository=repo, worker_count=3)
summary = uploader.run(backend_name="main-s3")
self.assertEqual(3, summary["workers"])
- Step 2: Run the focused concurrency tests to verify they fail
Run: python -m unittest tests.catalogsync.test_services -v
Expected: FAIL because downloader and uploader are still single-threaded or do not report bounded worker behavior yet.
- Step 3: Implement limited worker pools and one-time root switching
class CatalogDownloader:
def __init__(self, repository: CatalogRepository, work_dir: str = "musicdl_outputs/catalogsync", worker_count: int = 3):
self.worker_count = max(1, worker_count)
self._current_library_root: Path | None = None
def ensure_space(self, root_path: str | Path, required_bytes: int | None) -> Path:
if self._current_library_root is None:
self._current_library_root = Path(root_path).resolve()
root = self._current_library_root
while required_bytes and shutil.disk_usage(root).free < required_bytes:
root = Path(input("磁盘空间不足,请输入新的下载目录继续: ").strip()).resolve()
root.mkdir(parents=True, exist_ok=True)
self._current_library_root = root
return root
def download_pending(self, library_root: str | Path, sources: list[str] | None = None, limit: int | None = None, playlist_ids: list[int] | None = None) -> int:
with ThreadPoolExecutor(max_workers=self.worker_count) as executor:
futures = [executor.submit(self._download_one, row, default_root) for row in queue]
return sum(1 for future in as_completed(futures) if future.result())
- Step 4: Re-run the focused concurrency tests
Run: python -m unittest tests.catalogsync.test_services -v
Expected: PASS
- Step 5: Commit
git add musicdl/catalogsync/downloader.py musicdl/catalogsync/uploader.py tests/catalogsync/test_services.py
git commit -m "feat: add bounded download and upload concurrency"
Task 5: Document the operator workflow and run final verification
Files:
-
Modify:
docs/catalogsync.md -
Modify:
README.md -
Step 1: Update operator docs for object storage upload
## Object Storage Upload
1. Register one backend with `musicdl-catalogsync register-object-backend`
2. Export `${PREFIX}_ACCESS_KEY_ID` and `${PREFIX}_SECRET_ACCESS_KEY`
3. Run `musicdl-catalogsync upload --backend main-s3`
4. Inspect `file_locations`, `song_backend_presence`, and `upload_tasks`
- Step 2: Verify help output and test suite
Run: python -m unittest discover -s tests/catalogsync -v
Expected: PASS
Run: python -m musicdl.catalogsync.cli run --help
Expected: PASS and include existing run options
Run: python -m musicdl.catalogsync.cli upload --help
Expected: PASS and include object storage upload options
- Step 3: Commit
git add docs/catalogsync.md README.md
git commit -m "docs: add object storage upload workflow"