Files

16 KiB

Object Storage Upload Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Add an S3-compatible object storage upload pipeline to musicdl.catalogsync, persist remote locations and backend presence, and expose it through dedicated CLI commands while keeping the existing local download workflow intact.

Architecture: Extend the SQLite schema and repository so upload state is queue-driven and derived from the existing file_assets plus file_locations model. Add a focused uploader.py module that plans missing uploads, resolves credentials from environment variables, uploads to an object backend with limited concurrency, and records remote locations plus summary presence rows.

Tech Stack: Python stdlib (json, os, pathlib, threading, concurrent.futures), sqlite3, click, boto3, existing musicdl.catalogsync modules, unittest


Task 1: Extend schema and repository for object storage uploads

Files:

  • Modify: musicdl/catalogsync/db.py

  • Modify: musicdl/catalogsync/repository.py

  • Modify: tests/catalogsync/test_db.py

  • Step 1: Write the failing schema and repository tests

class DatabaseSchemaTests(unittest.TestCase):
    def test_initialize_database_creates_upload_tables(self):
        from musicdl.catalogsync.db import initialize_database

        with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
            db_path = Path(tmpdir) / "catalogsync.db"
            conn = initialize_database(db_path)
            conn.close()

            with closing(sqlite3.connect(db_path)) as verify_conn:
                tables = {
                    row[0]
                    for row in verify_conn.execute(
                        "SELECT name FROM sqlite_master WHERE type = 'table'"
                    ).fetchall()
                }

        self.assertIn("song_backend_presence", tables)
        self.assertIn("upload_tasks", tables)
  • Step 2: Run the focused schema tests to verify they fail

Run: python -m unittest tests.catalogsync.test_db -v Expected: FAIL because song_backend_presence and upload_tasks do not exist yet.

  • Step 3: Implement schema, backend upsert, presence refresh, and upload queue helpers
REQUIRED_TABLES = {
    "playlist_pools",
    "playlists",
    "pool_playlists",
    "artist_pools",
    "artists",
    "pool_artists",
    "songs",
    "playlist_songs",
    "artist_songs",
    "storage_backends",
    "file_assets",
    "file_locations",
    "download_tasks",
    "song_backend_presence",
    "upload_tasks",
}

def upsert_object_storage_backend(
    self,
    name: str,
    container_name: str,
    endpoint: str,
    region: str | None,
    base_prefix: str | None,
    credential_env_prefix: str,
    addressing_style: str | None = None,
    public_base_url: str | None = None,
) -> int:
    config = {
        "endpoint": endpoint,
        "region": region,
        "base_prefix": base_prefix,
        "addressing_style": addressing_style,
        "public_base_url": public_base_url,
        "credential_env_prefix": credential_env_prefix,
    }
    return self._upsert_backend_row(name=name, backend_type="object_storage", container_name=container_name, config=config)

def get_backend_by_name(self, name: str) -> sqlite3.Row | None:
    return self._fetchone("SELECT * FROM storage_backends WHERE name = ?", (name,))

def record_remote_file(
    self,
    file_asset_id: int,
    backend_id: int,
    container_name: str,
    locator: str,
    public_url: str | None,
    download_url: str | None,
) -> int:
    return self._execute(
        """
        INSERT INTO file_locations (
            file_asset_id, backend_id, container_name, locator, absolute_path,
            public_url, download_url, status, is_primary
        ) VALUES (?, ?, ?, ?, NULL, ?, ?, 'active', 0)
        ON CONFLICT(file_asset_id, backend_id, locator) DO UPDATE SET
            public_url = excluded.public_url,
            download_url = excluded.download_url,
            status = excluded.status,
            updated_at = CURRENT_TIMESTAMP
        """,
        (file_asset_id, backend_id, container_name, locator, public_url, download_url),
    )

def refresh_song_backend_presence(self, song_id: int, backend_id: int) -> None:
    self._execute_presence_refresh(song_id=song_id, backend_id=backend_id)
def enqueue_upload_task(
    self,
    file_asset_id: int,
    source_location_id: int,
    target_backend_id: int,
    target_container_name: str,
    target_locator: str,
) -> int:
    return self._execute(
        """
        INSERT INTO upload_tasks (
            file_asset_id, source_location_id, target_backend_id, target_container_name, target_locator
        ) VALUES (?, ?, ?, ?, ?)
        ON CONFLICT(file_asset_id, target_backend_id, target_locator) DO NOTHING
        """,
        (file_asset_id, source_location_id, target_backend_id, target_container_name, target_locator),
    )

def list_pending_upload_tasks(self, target_backend_id: int, limit: int | None = None) -> list[sqlite3.Row]:
    return self._fetchall(
        """
        SELECT ut.*, fl.absolute_path, fa.song_id
        FROM upload_tasks ut
        JOIN file_locations fl ON fl.id = ut.source_location_id
        JOIN file_assets fa ON fa.id = ut.file_asset_id
        WHERE ut.target_backend_id = ? AND ut.status IN ('pending', 'failed')
        ORDER BY ut.id ASC
        LIMIT COALESCE(?, -1)
        """,
        (target_backend_id, limit),
    )

def mark_upload_task_status(self, task_id: int, status: str, last_error: str | None = None) -> None:
    self._execute(
        """
        UPDATE upload_tasks
        SET status = ?, last_error = ?, updated_at = CURRENT_TIMESTAMP
        WHERE id = ?
        """,
        (status, last_error, task_id),
    )
  • Step 4: Re-run the focused schema tests

Run: python -m unittest tests.catalogsync.test_db -v Expected: PASS

  • Step 5: Commit
git add musicdl/catalogsync/db.py musicdl/catalogsync/repository.py tests/catalogsync/test_db.py
git commit -m "feat: add upload queue schema and repository helpers"

Task 2: Add the object storage uploader module with limited concurrency

Files:

  • Create: musicdl/catalogsync/uploader.py

  • Modify: tests/catalogsync/test_services.py

  • Step 1: Write failing uploader tests

class ObjectStorageUploadTests(unittest.TestCase):
    def test_upload_runner_records_remote_location_and_presence(self):
        uploader = CatalogUploader(repository=repo, worker_count=2)
        backend_id = repo.upsert_object_storage_backend(
            name="main-s3",
            container_name="music-bucket",
            endpoint="https://s3.example.com",
            region="auto",
            base_prefix="music",
            credential_env_prefix="CATALOGSYNC_MAIN_S3",
        )

        task_count = uploader.enqueue_missing_uploads(backend_name="main-s3")
        self.assertEqual(1, task_count)

        with patch("musicdl.catalogsync.uploader.build_s3_client", return_value=fake_client):
            summary = uploader.run(backend_name="main-s3")

        self.assertEqual(1, summary["succeeded"])
        self.assertTrue(repo.song_has_active_backend_file(song_id, backend_id))
  • Step 2: Run the focused uploader tests to verify they fail

Run: python -m unittest tests.catalogsync.test_services.ObjectStorageUploadTests -v Expected: FAIL because musicdl.catalogsync.uploader and upload repository helpers do not exist yet.

  • Step 3: Implement the upload planner and S3-compatible runner
class CatalogUploader:
    def enqueue_missing_uploads(self, backend_name: str, song_ids: list[int] | None = None) -> int:
        backend = self.repository.get_backend_by_name(backend_name)
        candidates = self.repository.list_missing_object_upload_candidates(int(backend["id"]), song_ids=song_ids)
        return sum(
            1
            for row in candidates
            if self.repository.enqueue_upload_task(
                file_asset_id=int(row["file_asset_id"]),
                source_location_id=int(row["source_location_id"]),
                target_backend_id=int(backend["id"]),
                target_container_name=backend["container_name"],
                target_locator=row["target_locator"],
            )
        )

    def run(self, backend_name: str, limit: int | None = None) -> dict[str, int]:
        tasks = self.repository.list_pending_upload_tasks(target_backend_id=backend_id, limit=limit)
        return self._run_tasks(tasks, backend)

class S3CompatibleUploader:
    def upload_file(self, local_path: Path, container_name: str, locator: str) -> dict[str, str | None]:
        self.client.upload_file(str(local_path), container_name, locator, ExtraArgs=extra_args or None)
        return {"public_url": public_url, "download_url": None}
  • Step 4: Re-run the focused uploader tests

Run: python -m unittest tests.catalogsync.test_services.ObjectStorageUploadTests -v Expected: PASS

  • Step 5: Commit
git add musicdl/catalogsync/uploader.py tests/catalogsync/test_services.py
git commit -m "feat: add s3 compatible uploader"

Task 3: Expose backend registration and upload execution through the CLI

Files:

  • Modify: musicdl/catalogsync/cli.py

  • Modify: tests/catalogsync/test_cli.py

  • Modify: setup.py

  • Modify: requirements.txt

  • Step 1: Write failing CLI tests for backend registration and upload

def test_register_object_backend_command_wires_application_method(self):
    from musicdl.catalogsync.cli import cli

    with patch("musicdl.catalogsync.cli.CatalogSyncApplication") as app_cls:
        result = CliRunner().invoke(
            cli,
            [
                "register-object-backend",
                "--db", str(db_path),
                "--name", "main-s3",
                "--bucket", "music-bucket",
                "--endpoint", "https://s3.example.com",
                "--region", "auto",
                "--base-prefix", "music",
                "--credential-env-prefix", "CATALOGSYNC_MAIN_S3",
            ],
        )

    self.assertEqual(0, result.exit_code, msg=result.output)
    app_cls.return_value.register_object_backend.assert_called_once()
  • Step 2: Run the focused CLI tests to verify they fail

Run: python -m unittest tests.catalogsync.test_cli -v Expected: FAIL because the new commands and application methods do not exist yet.

  • Step 3: Add CLI methods and commands
class CatalogSyncApplication:
    def register_object_backend(self, **kwargs):
        return self.repository.upsert_object_storage_backend(**kwargs)

    def upload_files(self, backend_name: str, workers: int = 4, limit: int | None = None, enqueue_only: bool = False):
        uploader = CatalogUploader(self.repository, worker_count=workers)
        queued = uploader.enqueue_missing_uploads(backend_name=backend_name)
        return {"queued": queued} if enqueue_only else uploader.run(backend_name=backend_name, limit=limit)

@cli.command("register-object-backend")
@click.option("--name", required=True)
@click.option("--bucket", "container_name", required=True)
@click.option("--endpoint", required=True)
@click.option("--workers", type=int, default=4, show_default=True)
@click.option("--enqueue-only/--run", default=False, show_default=True)
def upload_command(db_path: str, library_root: str | None, backend_name: str, workers: int, limit: int | None, enqueue_only: bool):
    app = CatalogSyncApplication(db_path=db_path, library_root=library_root)
    result = app.upload_files(
        backend_name=backend_name,
        workers=workers,
        limit=limit,
        enqueue_only=enqueue_only,
    )
    click.echo(result)
  • Step 4: Re-run the focused CLI tests

Run: python -m unittest tests.catalogsync.test_cli -v Expected: PASS

  • Step 5: Commit
git add musicdl/catalogsync/cli.py tests/catalogsync/test_cli.py setup.py requirements.txt
git commit -m "feat: add upload cli commands"

Task 4: Add bounded concurrency to download and upload execution

Files:

  • Modify: musicdl/catalogsync/downloader.py

  • Modify: musicdl/catalogsync/uploader.py

  • Modify: tests/catalogsync/test_services.py

  • Step 1: Write failing concurrency and disk-switch tests

def test_catalog_downloader_reuses_new_root_after_space_prompt(self):
    downloader = CatalogDownloader(repository=repo, worker_count=2)
    with patch("builtins.input", side_effect=[str(second_root)]):
        count = downloader.download_pending(library_root=first_root, limit=2)
    self.assertEqual(2, count)

def test_catalog_uploader_uses_bounded_workers(self):
    uploader = CatalogUploader(repository=repo, worker_count=3)
    summary = uploader.run(backend_name="main-s3")
    self.assertEqual(3, summary["workers"])
  • Step 2: Run the focused concurrency tests to verify they fail

Run: python -m unittest tests.catalogsync.test_services -v Expected: FAIL because downloader and uploader are still single-threaded or do not report bounded worker behavior yet.

  • Step 3: Implement limited worker pools and one-time root switching
class CatalogDownloader:
    def __init__(self, repository: CatalogRepository, work_dir: str = "musicdl_outputs/catalogsync", worker_count: int = 3):
        self.worker_count = max(1, worker_count)
        self._current_library_root: Path | None = None

    def ensure_space(self, root_path: str | Path, required_bytes: int | None) -> Path:
        if self._current_library_root is None:
            self._current_library_root = Path(root_path).resolve()
        root = self._current_library_root
        while required_bytes and shutil.disk_usage(root).free < required_bytes:
            root = Path(input("磁盘空间不足,请输入新的下载目录继续: ").strip()).resolve()
            root.mkdir(parents=True, exist_ok=True)
        self._current_library_root = root
        return root

    def download_pending(self, library_root: str | Path, sources: list[str] | None = None, limit: int | None = None, playlist_ids: list[int] | None = None) -> int:
        with ThreadPoolExecutor(max_workers=self.worker_count) as executor:
            futures = [executor.submit(self._download_one, row, default_root) for row in queue]
            return sum(1 for future in as_completed(futures) if future.result())
  • Step 4: Re-run the focused concurrency tests

Run: python -m unittest tests.catalogsync.test_services -v Expected: PASS

  • Step 5: Commit
git add musicdl/catalogsync/downloader.py musicdl/catalogsync/uploader.py tests/catalogsync/test_services.py
git commit -m "feat: add bounded download and upload concurrency"

Task 5: Document the operator workflow and run final verification

Files:

  • Modify: docs/catalogsync.md

  • Modify: README.md

  • Step 1: Update operator docs for object storage upload

## Object Storage Upload

1. Register one backend with `musicdl-catalogsync register-object-backend`
2. Export `${PREFIX}_ACCESS_KEY_ID` and `${PREFIX}_SECRET_ACCESS_KEY`
3. Run `musicdl-catalogsync upload --backend main-s3`
4. Inspect `file_locations`, `song_backend_presence`, and `upload_tasks`
  • Step 2: Verify help output and test suite

Run: python -m unittest discover -s tests/catalogsync -v Expected: PASS

Run: python -m musicdl.catalogsync.cli run --help Expected: PASS and include existing run options

Run: python -m musicdl.catalogsync.cli upload --help Expected: PASS and include object storage upload options

  • Step 3: Commit
git add docs/catalogsync.md README.md
git commit -m "docs: add object storage upload workflow"