Files
musicdl-catalog-sync-suite/Music_Server/docs/superpowers/plans/2026-04-19-public-music-service-implementation.md

44 KiB

Public Music Service Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Build a standalone public service that loads the exported snapshot, serves MusicFree catalog APIs, resolves playable media with signed stream URLs, and exposes the first version of player-backend endpoints.

Architecture: Use one Python web service for both /mf/v1/* and /player/v1/*, backed by a read-only catalog_read.db plus a writable player.db. Keep catalog reading, auth, covers, media resolution, streaming, and player-user features in separate modules so the same process can serve both MusicFree and future first-party clients cleanly.

Tech Stack: Python 3.11+, FastAPI, Uvicorn, sqlite3, httpx, pydantic, unittest


Repository root: D:\source\musicdl-catalog-sync-worktrees\Music_Server

File Structure

  • Create: pyproject.toml
  • Create: requirements.txt
  • Create: src/music_server/__init__.py
  • Create: src/music_server/settings.py
  • Create: src/music_server/auth.py
  • Create: src/music_server/app.py
  • Create: src/music_server/db.py
  • Create: src/music_server/routes/health.py
  • Create: src/music_server/routes/mf_catalog.py
  • Create: src/music_server/routes/mf_media.py
  • Create: src/music_server/routes/player.py
  • Create: src/music_server/routes/covers.py
  • Create: src/music_server/services/catalog_reader.py
  • Create: src/music_server/services/cover_service.py
  • Create: src/music_server/services/media_resolver.py
  • Create: src/music_server/services/stream_tokens.py
  • Create: src/music_server/services/player_service.py
  • Create: tests/__init__.py
  • Create: tests/test_health.py
  • Create: tests/test_catalog_reader.py
  • Create: tests/test_mf_catalog_routes.py
  • Create: tests/test_mf_media_routes.py
  • Create: tests/test_player_routes.py

Task 1: Scaffold the service repo, settings, auth, and health endpoint

Files:

  • Create: pyproject.toml

  • Create: requirements.txt

  • Create: src/music_server/__init__.py

  • Create: src/music_server/settings.py

  • Create: src/music_server/auth.py

  • Create: src/music_server/app.py

  • Create: src/music_server/routes/health.py

  • Create: tests/__init__.py

  • Test: tests/test_health.py

  • Step 1: Write the failing test

import unittest

from fastapi.testclient import TestClient

from music_server.app import create_app


class HealthRouteTests(unittest.TestCase):
    def test_healthz_returns_ok(self):
        client = TestClient(create_app())
        response = client.get("/healthz")

        self.assertEqual(200, response.status_code)
        self.assertEqual({"status": "ok"}, response.json())


if __name__ == "__main__":
    unittest.main()
  • Step 2: Run test to verify it fails

Run: python -m unittest tests.test_health -v
Expected: ERROR with ModuleNotFoundError: No module named 'music_server'

  • Step 3: Write minimal implementation

requirements.txt

fastapi==0.115.0
uvicorn==0.32.0
httpx==0.27.2

pyproject.toml

[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"

[project]
name = "music-server"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
  "fastapi==0.115.0",
  "uvicorn==0.32.0",
  "httpx==0.27.2",
]

[tool.setuptools.packages.find]
where = ["src"]

tests/__init__.py

import sys
from pathlib import Path

SRC_DIR = Path(__file__).resolve().parents[1] / "src"
if str(SRC_DIR) not in sys.path:
    sys.path.insert(0, str(SRC_DIR))

src/music_server/settings.py

import os
from dataclasses import dataclass


@dataclass(frozen=True)
class Settings:
    access_token: str = os.getenv("PUBLIC_MUSIC_ACCESS_TOKEN", "dev-token")
    catalog_db_path: str = os.getenv("CATALOG_DB_PATH", "./data/catalog_read.db")
    player_db_path: str = os.getenv("PLAYER_DB_PATH", "./data/player.db")


def get_settings() -> Settings:
    return Settings()

src/music_server/auth.py

from fastapi import Header, HTTPException

from .settings import get_settings


def require_bearer_token(authorization: str | None = Header(default=None)) -> None:
    expected = f"Bearer {get_settings().access_token}"
    if authorization != expected:
        raise HTTPException(status_code=401, detail="unauthorized")

src/music_server/routes/health.py

from fastapi import APIRouter

router = APIRouter()


@router.get("/healthz")
def healthz() -> dict:
    return {"status": "ok"}

src/music_server/app.py

from fastapi import FastAPI

from .routes.health import router as health_router


def create_app() -> FastAPI:
    app = FastAPI(title="Public Music Service")
    app.include_router(health_router)
    return app
  • Step 4: Run test to verify it passes

Run: python -m unittest tests.test_health -v
Expected: OK Note: tests/__init__.py enables importing the src-layout package from repo-root unittest runs.

  • Step 5: Commit
git add pyproject.toml requirements.txt src/music_server tests/__init__.py tests/test_health.py
git commit -m "feat: scaffold public music service"

Task 2: Load the snapshot database and expose catalog reading primitives

Files:

  • Create: src/music_server/db.py

  • Create: src/music_server/services/catalog_reader.py

  • Test: tests/test_catalog_reader.py

  • Step 1: Write the failing test

import sqlite3
import tempfile
import unittest
from pathlib import Path

from music_server.services.catalog_reader import CatalogReader


class CatalogReaderTests(unittest.TestCase):
    def test_list_playlists_orders_by_play_count_desc(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            db_path = Path(tmpdir) / "catalog_read.db"
            conn = sqlite3.connect(db_path)
            conn.executescript(
                '''
                create table catalog_playlists (
                    playlist_id integer primary key,
                    platform text not null,
                    remote_playlist_id text not null,
                    name text not null,
                    description text,
                    cover_url text,
                    play_count integer not null,
                    song_count integer not null
                );
                insert into catalog_playlists values (1, 'netease', '18165', 'A', 'desc', 'https://img/a.jpg', 10, 3);
                insert into catalog_playlists values (2, 'qq', '75', 'B', 'desc', 'https://img/b.jpg', 100, 2);
                '''
            )
            conn.commit()
            conn.close()

            reader = CatalogReader(db_path=str(db_path))
            rows = reader.list_playlists(page=1, page_size=10)

            self.assertEqual([2, 1], [row["playlist_id"] for row in rows])


if __name__ == "__main__":
    unittest.main()
  • Step 2: Run test to verify it fails

Run: python -m unittest tests.test_catalog_reader -v
Expected: ERROR because CatalogReader does not exist yet

  • Step 3: Write minimal implementation

src/music_server/db.py

import sqlite3


def connect_sqlite(db_path: str) -> sqlite3.Connection:
    conn = sqlite3.connect(db_path)
    conn.row_factory = sqlite3.Row
    return conn

src/music_server/services/catalog_reader.py

from ..db import connect_sqlite


class CatalogReader:
    def __init__(self, db_path: str) -> None:
        self._db_path = db_path

    def list_playlists(self, page: int, page_size: int) -> list[dict]:
        offset = max(0, (page - 1) * page_size)
        conn = connect_sqlite(self._db_path)
        rows = conn.execute(
            """
            select playlist_id, platform, remote_playlist_id, name, description, cover_url, play_count, song_count
            from catalog_playlists
            where song_count > 0
            order by play_count desc, playlist_id asc
            limit ? offset ?
            """,
            (page_size, offset),
        ).fetchall()
        conn.close()
        return [dict(row) for row in rows]

    def get_playlist(self, playlist_id: int) -> dict | None:
        conn = connect_sqlite(self._db_path)
        row = conn.execute(
            """
            select playlist_id, platform, remote_playlist_id, name, description, cover_url, play_count, song_count
            from catalog_playlists
            where playlist_id = ?
            """,
            (playlist_id,),
        ).fetchone()
        conn.close()
        return dict(row) if row else None
  • Step 4: Run test to verify it passes

Run: python -m unittest tests.test_catalog_reader -v
Expected: OK

  • Step 5: Commit
git add src/music_server/db.py src/music_server/services/catalog_reader.py tests/test_catalog_reader.py
git commit -m "feat: load exported catalog snapshot"

Task 3: Implement MusicFree catalog APIs for tags, playlists, and toplists

Files:

  • Create: src/music_server/routes/mf_catalog.py

  • Modify: src/music_server/app.py

  • Test: tests/test_mf_catalog_routes.py

  • Step 1: Write the failing test

import sqlite3
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch

from fastapi.testclient import TestClient

from music_server.app import create_app


class MfCatalogRouteTests(unittest.TestCase):
    def test_recommend_sheets_returns_musicfree_shape(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            db_path = Path(tmpdir) / "catalog_read.db"
            conn = sqlite3.connect(db_path)
            conn.executescript(
                '''
                create table catalog_playlists (
                    playlist_id integer primary key,
                    platform text not null,
                    remote_playlist_id text not null,
                    name text not null,
                    description text,
                    cover_url text,
                    play_count integer not null,
                    song_count integer not null
                );
                insert into catalog_playlists values (1, 'netease', '18165', '娴嬭瘯姝屽崟', 'desc', 'https://img/1.jpg', 99, 5);
                '''
            )
            conn.commit()
            conn.close()

            with patch.dict("os.environ", {"CATALOG_DB_PATH": str(db_path)}):
                client = TestClient(create_app())
                response = client.get(
                    "/mf/v1/recommend/sheets?page=1&page_size=20",
                    headers={"Authorization": "Bearer dev-token"},
                )

            self.assertEqual(200, response.status_code)
            payload = response.json()
            self.assertFalse(payload["isEnd"])
            self.assertEqual("catalogsync:playlist:1", payload["data"][0]["id"])
            self.assertEqual("娴嬭瘯姝屽崟", payload["data"][0]["title"])


if __name__ == "__main__":
    unittest.main()
  • Step 2: Run test to verify it fails

Run: python -m unittest tests.test_mf_catalog_routes -v
Expected: FAIL with 404 Not Found for /mf/v1/recommend/sheets

  • Step 3: Write minimal implementation

src/music_server/routes/mf_catalog.py

from fastapi import APIRouter, Depends, Query

from ..auth import require_bearer_token
from ..services.catalog_reader import CatalogReader
from ..settings import get_settings

router = APIRouter(prefix="/mf/v1", dependencies=[Depends(require_bearer_token)])


def _reader() -> CatalogReader:
    return CatalogReader(db_path=get_settings().catalog_db_path)


def _to_sheet_item(row: dict) -> dict:
    return {
        "id": f"catalogsync:playlist:{row['playlist_id']}",
        "platform": "catalogsync",
        "title": row["name"],
        "coverImg": row["cover_url"] or "",
        "description": row["description"] or "",
        "worksNum": row["song_count"],
        "playCount": row["play_count"],
    }


@router.get("/recommend/tags")
def recommend_tags() -> dict:
    return {
        "pinned": [
            {"id": "all", "title": "鍏ㄩ儴"},
            {"id": "netease", "title": "缃戞槗浜?},
            {"id": "qq", "title": "QQ闊充箰"},
            {"id": "kuwo", "title": "閰锋垜"},
        ],
        "data": [
            {
                "title": "鏉ユ簮",
                "data": [
                    {"id": "playlist_square", "title": "姝屽崟骞垮満"},
                    {"id": "toplist", "title": "鎺掕姒?},
                ],
            }
        ],
    }


@router.get("/recommend/sheets")
def recommend_sheets(
    page: int = Query(default=1, ge=1),
    page_size: int = Query(default=60, ge=1, le=200),
) -> dict:
    rows = _reader().list_playlists(page=page, page_size=page_size)
    return {
        "isEnd": len(rows) < page_size,
        "data": [_to_sheet_item(row) for row in rows],
    }

src/music_server/app.py

from fastapi import FastAPI

from .routes.health import router as health_router
from .routes.mf_catalog import router as mf_catalog_router


def create_app() -> FastAPI:
    app = FastAPI(title="Public Music Service")
    app.include_router(health_router)
    app.include_router(mf_catalog_router)
    return app
  • Step 4: Run test to verify it passes

Run: python -m unittest tests.test_mf_catalog_routes -v
Expected: OK

  • Step 5: Commit
git add src/music_server/routes/mf_catalog.py src/music_server/app.py tests/test_mf_catalog_routes.py
git commit -m "feat: add musicfree catalog endpoints"

Task 4: Implement cover and media resolve endpoints with signed stream URLs

Files:

  • Create: src/music_server/routes/covers.py

  • Create: src/music_server/routes/mf_media.py

  • Create: src/music_server/services/cover_service.py

  • Create: src/music_server/services/media_resolver.py

  • Create: src/music_server/services/stream_tokens.py

  • Modify: src/music_server/app.py

  • Test: tests/test_mf_media_routes.py

  • Step 1: Write the failing test

import sqlite3
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch

from fastapi.testclient import TestClient

from music_server.app import create_app


class MfMediaRouteTests(unittest.TestCase):
    def test_media_resolve_prefers_public_url_and_returns_signed_stream(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            db_path = Path(tmpdir) / "catalog_read.db"
            conn = sqlite3.connect(db_path)
            conn.executescript(
                '''
                create table catalog_track_files (
                    song_id integer not null,
                    quality_label text,
                    ext text,
                    file_size_bytes integer,
                    backend_type text,
                    backend_name text,
                    locator text,
                    public_url text,
                    status text,
                    is_primary integer
                );
                insert into catalog_track_files values (
                    3476, 'super', 'flac', 42345678, 'object_storage', 'main-s3',
                    'music/netease/test.flac', 'https://cdn.example/test.flac', 'active', 1
                );
                '''
            )
            conn.commit()
            conn.close()

            with patch.dict(
                "os.environ",
                {
                    "CATALOG_DB_PATH": str(db_path),
                    "PUBLIC_MUSIC_ACCESS_TOKEN": "dev-token",
                },
            ):
                client = TestClient(create_app())
                response = client.post(
                    "/mf/v1/media/resolve",
                    headers={"Authorization": "Bearer dev-token"},
                    json={"song_id": "catalogsync:song:3476", "quality": "super"},
                )

            self.assertEqual(200, response.status_code)
            payload = response.json()
            self.assertEqual("object_storage", payload["selected_source"]["kind"])
            self.assertIn("/mf/v1/media/stream/", payload["stream"]["url"])


if __name__ == "__main__":
    unittest.main()
  • Step 2: Run test to verify it fails

Run: python -m unittest tests.test_mf_media_routes -v
Expected: FAIL with 404 Not Found for /mf/v1/media/resolve

  • Step 3: Write minimal implementation

src/music_server/services/stream_tokens.py

import base64
import hashlib
import hmac
import json
import time


def create_stream_token(secret: str, song_id: int, locator: str, ttl_seconds: int = 300) -> str:
    payload = {
        "song_id": int(song_id),
        "locator": locator,
        "expires_at": int(time.time()) + int(ttl_seconds),
    }
    payload_json = json.dumps(payload, separators=(",", ":"), ensure_ascii=False)
    signature = hmac.new(secret.encode("utf-8"), payload_json.encode("utf-8"), hashlib.sha256).hexdigest()
    envelope = {"payload": payload, "sig": signature}
    return base64.urlsafe_b64encode(json.dumps(envelope, separators=(",", ":"), ensure_ascii=False).encode("utf-8")).decode("ascii")

src/music_server/services/media_resolver.py

from ..db import connect_sqlite


class MediaResolver:
    def __init__(self, db_path: str) -> None:
        self._db_path = db_path

    def resolve(self, song_id: int, quality: str) -> dict:
        conn = connect_sqlite(self._db_path)
        row = conn.execute(
            """
            select song_id, quality_label, ext, file_size_bytes, backend_type, backend_name, locator, public_url
            from catalog_track_files
            where song_id = ? and status = 'active'
            order by case when quality_label = ? then 0 else 1 end, is_primary desc
            limit 1
            """,
            (song_id, quality),
        ).fetchone()
        conn.close()
        if row is None:
            raise LookupError("no playable source found")
        return dict(row)

src/music_server/routes/mf_media.py

from fastapi import APIRouter, Depends, HTTPException

from ..auth import require_bearer_token
from ..services.media_resolver import MediaResolver
from ..services.stream_tokens import create_stream_token
from ..settings import get_settings

router = APIRouter(prefix="/mf/v1", dependencies=[Depends(require_bearer_token)])


def _song_id_from_public_id(public_id: str) -> int:
    return int(str(public_id).split(":")[-1])


@router.post("/media/resolve")
def media_resolve(payload: dict) -> dict:
    settings = get_settings()
    song_id = _song_id_from_public_id(payload["song_id"])
    quality = payload.get("quality", "standard")
    try:
        resolved = MediaResolver(db_path=settings.catalog_db_path).resolve(song_id=song_id, quality=quality)
    except LookupError as exc:
        raise HTTPException(status_code=404, detail=str(exc)) from exc

    token = create_stream_token(
        secret=settings.access_token,
        song_id=song_id,
        locator=resolved["locator"],
    )
    return {
        "song_id": payload["song_id"],
        "selected_source": {
            "kind": resolved["backend_type"],
            "backend": resolved["backend_name"],
            "quality": resolved["quality_label"] or quality,
            "ext": resolved["ext"],
            "size_bytes": resolved["file_size_bytes"],
        },
        "stream": {
            "url": f"/mf/v1/media/stream/{token}",
            "headers": {},
            "range_supported": True,
        },
    }

src/music_server/services/cover_service.py

from fastapi import HTTPException

from ..db import connect_sqlite


class CoverService:
    def __init__(self, db_path: str) -> None:
        self._db_path = db_path

    def playlist_cover_url(self, playlist_id: int) -> str:
        conn = connect_sqlite(self._db_path)
        row = conn.execute(
            "select cover_url from catalog_playlists where playlist_id = ?",
            (playlist_id,),
        ).fetchone()
        conn.close()
        if row is None or not row["cover_url"]:
            raise HTTPException(status_code=404, detail="playlist cover not found")
        return str(row["cover_url"])

    def song_cover_url(self, song_id: int) -> str:
        conn = connect_sqlite(self._db_path)
        row = conn.execute(
            "select cover_url from catalog_tracks where song_id = ?",
            (song_id,),
        ).fetchone()
        conn.close()
        if row is None or not row["cover_url"]:
            raise HTTPException(status_code=404, detail="song cover not found")
        return str(row["cover_url"])

src/music_server/routes/covers.py

from fastapi import APIRouter
from fastapi.responses import RedirectResponse

from ..services.cover_service import CoverService
from ..settings import get_settings

router = APIRouter(prefix="/mf/v1/covers")


def _cover_service() -> CoverService:
    return CoverService(db_path=get_settings().catalog_db_path)


@router.get("/playlists/{playlist_id}")
def playlist_cover(playlist_id: int) -> RedirectResponse:
    return RedirectResponse(_cover_service().playlist_cover_url(playlist_id=playlist_id), status_code=307)


@router.get("/songs/{song_id}")
def song_cover(song_id: int) -> RedirectResponse:
    return RedirectResponse(_cover_service().song_cover_url(song_id=song_id), status_code=307)

src/music_server/app.py

from fastapi import FastAPI

from .routes.covers import router as covers_router
from .routes.health import router as health_router
from .routes.mf_catalog import router as mf_catalog_router
from .routes.mf_media import router as mf_media_router


def create_app() -> FastAPI:
    app = FastAPI(title="Public Music Service")
    app.include_router(health_router)
    app.include_router(mf_catalog_router)
    app.include_router(mf_media_router)
    app.include_router(covers_router)
    return app
  • Step 4: Run test to verify it passes

Run: python -m unittest tests.test_mf_media_routes -v
Expected: OK

  • Step 5: Commit
git add src/music_server/routes/covers.py src/music_server/routes/mf_media.py src/music_server/services/media_resolver.py src/music_server/services/stream_tokens.py src/music_server/app.py tests/test_mf_media_routes.py
git commit -m "feat: add media resolve and cover endpoints"

Task 5: Add player persistence and /player/v1/* endpoints

Files:

  • Create: src/music_server/services/player_service.py

  • Create: src/music_server/routes/player.py

  • Modify: src/music_server/app.py

  • Test: tests/test_player_routes.py

  • Step 1: Write the failing test

import sqlite3
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch

from fastapi.testclient import TestClient

from music_server.app import create_app


class PlayerRouteTests(unittest.TestCase):
    def test_put_and_get_favorite_tracks(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            player_db = Path(tmpdir) / "player.db"
            conn = sqlite3.connect(player_db)
            conn.executescript(
                '''
                create table favorite_tracks (
                    track_id integer primary key,
                    added_at text not null
                );
                '''
            )
            conn.commit()
            conn.close()

            with patch.dict(
                "os.environ",
                {
                    "PLAYER_DB_PATH": str(player_db),
                    "PUBLIC_MUSIC_ACCESS_TOKEN": "dev-token",
                },
            ):
                client = TestClient(create_app())
                put_response = client.put(
                    "/player/v1/me/favorites/tracks/3476",
                    headers={"Authorization": "Bearer dev-token"},
                )
                get_response = client.get(
                    "/player/v1/me/favorites/tracks",
                    headers={"Authorization": "Bearer dev-token"},
                )

            self.assertEqual(204, put_response.status_code)
            self.assertEqual([{"track_id": 3476}], get_response.json()["items"])


if __name__ == "__main__":
    unittest.main()
  • Step 2: Run test to verify it fails

Run: python -m unittest tests.test_player_routes -v
Expected: FAIL with 404 Not Found for /player/v1/me/favorites/tracks/3476

  • Step 3: Write minimal implementation

src/music_server/services/player_service.py

from datetime import datetime, timezone

from ..db import connect_sqlite


class PlayerService:
    def __init__(self, db_path: str) -> None:
        self._db_path = db_path

    def add_favorite_track(self, track_id: int) -> None:
        conn = connect_sqlite(self._db_path)
        conn.execute(
            """
            insert into favorite_tracks (track_id, added_at)
            values (?, ?)
            on conflict(track_id) do update set added_at = excluded.added_at
            """,
            (track_id, datetime.now(timezone.utc).isoformat()),
        )
        conn.commit()
        conn.close()

    def list_favorite_tracks(self) -> list[dict]:
        conn = connect_sqlite(self._db_path)
        rows = conn.execute(
            """
            select track_id
            from favorite_tracks
            order by added_at desc
            """
        ).fetchall()
        conn.close()
        return [dict(row) for row in rows]

src/music_server/routes/player.py

from fastapi import APIRouter, Depends, Response

from ..auth import require_bearer_token
from ..services.player_service import PlayerService
from ..settings import get_settings

router = APIRouter(prefix="/player/v1", dependencies=[Depends(require_bearer_token)])


def _player_service() -> PlayerService:
    return PlayerService(db_path=get_settings().player_db_path)


@router.put("/me/favorites/tracks/{track_id}", status_code=204)
def add_favorite_track(track_id: int) -> Response:
    _player_service().add_favorite_track(track_id=track_id)
    return Response(status_code=204)


@router.get("/me/favorites/tracks")
def list_favorite_tracks() -> dict:
    return {"items": _player_service().list_favorite_tracks()}

src/music_server/app.py

from fastapi import FastAPI

from .routes.covers import router as covers_router
from .routes.health import router as health_router
from .routes.mf_catalog import router as mf_catalog_router
from .routes.mf_media import router as mf_media_router
from .routes.player import router as player_router


def create_app() -> FastAPI:
    app = FastAPI(title="Public Music Service")
    app.include_router(health_router)
    app.include_router(mf_catalog_router)
    app.include_router(mf_media_router)
    app.include_router(covers_router)
    app.include_router(player_router)
    return app
  • Step 4: Run test to verify it passes

Run: python -m unittest tests.test_player_routes -v
Expected: OK

  • Step 5: Commit
git add src/music_server/services/player_service.py src/music_server/routes/player.py src/music_server/app.py tests/test_player_routes.py
git commit -m "feat: add player favorite endpoints"

Task 6: Complete playlist detail, toplist detail, home, and history endpoints

Files:

  • Modify: src/music_server/services/catalog_reader.py

  • Modify: src/music_server/routes/mf_catalog.py

  • Modify: src/music_server/services/player_service.py

  • Modify: src/music_server/routes/player.py

  • Test: tests/test_mf_detail_routes.py

  • Test: tests/test_player_history_routes.py

  • Step 1: Write the failing tests

import sqlite3
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch

from fastapi.testclient import TestClient

from music_server.app import create_app


class MfDetailRouteTests(unittest.TestCase):
    def test_playlist_tracks_and_toplists_return_expected_shapes(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            db_path = Path(tmpdir) / "catalog_read.db"
            conn = sqlite3.connect(db_path)
            conn.executescript(
                '''
                create table catalog_playlists (
                    playlist_id integer primary key,
                    platform text not null,
                    remote_playlist_id text not null,
                    name text not null,
                    description text,
                    cover_url text,
                    play_count integer not null,
                    song_count integer not null
                );
                create table catalog_tracks (
                    song_id integer primary key,
                    platform text not null,
                    remote_song_id text not null,
                    name text not null,
                    singers text,
                    album text,
                    cover_url text,
                    duration_ms integer,
                    metadata_json text
                );
                create table catalog_playlist_tracks (
                    playlist_id integer not null,
                    song_id integer not null,
                    position integer not null
                );
                create table catalog_toplists (
                    toplist_id text primary key,
                    platform text not null,
                    name text not null,
                    description text,
                    cover_url text,
                    play_count integer not null,
                    song_count integer not null,
                    group_name text not null
                );
                create table catalog_toplist_tracks (
                    toplist_id text not null,
                    song_id integer not null,
                    position integer not null
                );
                insert into catalog_playlists values (1, 'netease', '18165', '测试歌单', 'desc', 'https://img/p.jpg', 99, 1);
                insert into catalog_tracks values (3476, 'netease', '65800', '海屿你', '马也 / Crabbit', '', 'https://img/s.jpg', 0, '{}');
                insert into catalog_playlist_tracks values (1, 3476, 1);
                insert into catalog_toplists values ('kuwo_top_16', 'kuwo', '酷我飙升榜', 'desc', 'https://img/t.jpg', 1000, 1, '酷我');
                insert into catalog_toplist_tracks values ('kuwo_top_16', 3476, 1);
                '''
            )
            conn.commit()
            conn.close()

            with patch.dict("os.environ", {"CATALOG_DB_PATH": str(db_path)}):
                client = TestClient(create_app())
                playlist_response = client.get(
                    "/mf/v1/playlists/1/tracks?page=1&page_size=20",
                    headers={"Authorization": "Bearer dev-token"},
                )
                toplist_response = client.get(
                    "/mf/v1/toplists",
                    headers={"Authorization": "Bearer dev-token"},
                )

            self.assertEqual(200, playlist_response.status_code)
            self.assertEqual("catalogsync:song:3476", playlist_response.json()["musicList"][0]["id"])
            self.assertEqual(200, toplist_response.status_code)
            self.assertEqual("酷我", toplist_response.json()[0]["title"])


if __name__ == "__main__":
    unittest.main()
import sqlite3
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch

from fastapi.testclient import TestClient

from music_server.app import create_app


class PlayerHistoryRouteTests(unittest.TestCase):
    def test_home_and_history_routes_return_items(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            player_db = Path(tmpdir) / "player.db"
            catalog_db = Path(tmpdir) / "catalog_read.db"

            player_conn = sqlite3.connect(player_db)
            player_conn.executescript(
                '''
                create table play_history (
                    id integer primary key autoincrement,
                    track_id integer not null,
                    played_at text not null,
                    progress_seconds integer not null
                );
                create table favorite_playlists (
                    playlist_id integer primary key,
                    added_at text not null
                );
                '''
            )
            player_conn.commit()
            player_conn.close()

            catalog_conn = sqlite3.connect(catalog_db)
            catalog_conn.executescript(
                '''
                create table catalog_playlists (
                    playlist_id integer primary key,
                    platform text not null,
                    remote_playlist_id text not null,
                    name text not null,
                    description text,
                    cover_url text,
                    play_count integer not null,
                    song_count integer not null
                );
                insert into catalog_playlists values (1, 'netease', '18165', '测试歌单', 'desc', 'https://img/p.jpg', 99, 1);
                '''
            )
            catalog_conn.commit()
            catalog_conn.close()

            with patch.dict(
                "os.environ",
                {
                    "PLAYER_DB_PATH": str(player_db),
                    "CATALOG_DB_PATH": str(catalog_db),
                    "PUBLIC_MUSIC_ACCESS_TOKEN": "dev-token",
                },
            ):
                client = TestClient(create_app())
                post_response = client.post(
                    "/player/v1/me/history",
                    headers={"Authorization": "Bearer dev-token"},
                    json={"track_id": 3476, "progress_seconds": 12},
                )
                home_response = client.get(
                    "/player/v1/home",
                    headers={"Authorization": "Bearer dev-token"},
                )
                history_response = client.get(
                    "/player/v1/me/history",
                    headers={"Authorization": "Bearer dev-token"},
                )

            self.assertEqual(201, post_response.status_code)
            self.assertEqual(200, home_response.status_code)
            self.assertEqual(3476, history_response.json()["items"][0]["track_id"])


if __name__ == "__main__":
    unittest.main()
  • Step 2: Run tests to verify they fail

Run: python -m unittest tests.test_mf_detail_routes tests.test_player_history_routes -v
Expected: FAIL with 404 Not Found for /mf/v1/playlists/1/tracks and /player/v1/home

  • Step 3: Write minimal implementation

src/music_server/services/catalog_reader.py

from ..db import connect_sqlite


class CatalogReader:
    def __init__(self, db_path: str) -> None:
        self._db_path = db_path

    def list_playlists(self, page: int, page_size: int) -> list[dict]:
        offset = max(0, (page - 1) * page_size)
        conn = connect_sqlite(self._db_path)
        rows = conn.execute(
            """
            select playlist_id, platform, remote_playlist_id, name, description, cover_url, play_count, song_count
            from catalog_playlists
            where song_count > 0
            order by play_count desc, playlist_id asc
            limit ? offset ?
            """,
            (page_size, offset),
        ).fetchall()
        conn.close()
        return [dict(row) for row in rows]

    def get_playlist(self, playlist_id: int) -> dict | None:
        conn = connect_sqlite(self._db_path)
        row = conn.execute(
            """
            select playlist_id, platform, remote_playlist_id, name, description, cover_url, play_count, song_count
            from catalog_playlists
            where playlist_id = ?
            """,
            (playlist_id,),
        ).fetchone()
        conn.close()
        return dict(row) if row else None

    def list_playlist_tracks(self, playlist_id: int, page: int, page_size: int) -> list[dict]:
        offset = max(0, (page - 1) * page_size)
        conn = connect_sqlite(self._db_path)
        rows = conn.execute(
            """
            select t.song_id, t.name, t.singers, t.album, t.cover_url, t.duration_ms
            from catalog_playlist_tracks pt
            join catalog_tracks t on t.song_id = pt.song_id
            where pt.playlist_id = ?
            order by pt.position asc
            limit ? offset ?
            """,
            (playlist_id, page_size, offset),
        ).fetchall()
        conn.close()
        return [dict(row) for row in rows]

    def list_toplists(self) -> list[dict]:
        conn = connect_sqlite(self._db_path)
        rows = conn.execute(
            """
            select toplist_id, platform, name, description, cover_url, play_count, song_count, group_name
            from catalog_toplists
            order by group_name asc, play_count desc, name asc
            """
        ).fetchall()
        conn.close()
        grouped: dict[str, list[dict]] = {}
        for row in rows:
            grouped.setdefault(row["group_name"], []).append(dict(row))
        return [{"title": group_name, "data": items} for group_name, items in grouped.items()]

src/music_server/routes/mf_catalog.py

from fastapi import APIRouter, Depends, Query

from ..auth import require_bearer_token
from ..services.catalog_reader import CatalogReader
from ..settings import get_settings

router = APIRouter(prefix="/mf/v1", dependencies=[Depends(require_bearer_token)])


def _reader() -> CatalogReader:
    return CatalogReader(db_path=get_settings().catalog_db_path)


def _to_sheet_item(row: dict) -> dict:
    item_id = row.get("playlist_id", row.get("toplist_id"))
    item_prefix = "playlist" if "playlist_id" in row else "toplist"
    return {
        "id": f"catalogsync:{item_prefix}:{item_id}",
        "platform": "catalogsync",
        "title": row["name"],
        "coverImg": row.get("cover_url") or "",
        "description": row.get("description") or "",
        "worksNum": row.get("song_count", 0),
        "playCount": row.get("play_count", 0),
    }


def _to_music_item(row: dict) -> dict:
    return {
        "id": f"catalogsync:song:{row['song_id']}",
        "platform": "catalogsync",
        "title": row["name"],
        "artist": row.get("singers") or "",
        "album": row.get("album") or "",
        "artwork": row.get("cover_url") or "",
        "duration": int((row.get("duration_ms") or 0) / 1000),
    }


@router.get("/recommend/tags")
def recommend_tags() -> dict:
    return {
        "pinned": [
            {"id": "all", "title": "全部"},
            {"id": "netease", "title": "网易云"},
            {"id": "qq", "title": "QQ音乐"},
            {"id": "kuwo", "title": "酷我"},
        ],
        "data": [{"title": "来源", "data": [{"id": "playlist_square", "title": "歌单广场"}, {"id": "toplist", "title": "排行榜"}]}],
    }


@router.get("/recommend/sheets")
def recommend_sheets(page: int = Query(default=1, ge=1), page_size: int = Query(default=60, ge=1, le=200)) -> dict:
    rows = _reader().list_playlists(page=page, page_size=page_size)
    return {"isEnd": len(rows) < page_size, "data": [_to_sheet_item(row) for row in rows]}


@router.get("/playlists/{playlist_id}")
def playlist_detail(playlist_id: int) -> dict:
    row = _reader().get_playlist(playlist_id=playlist_id)
    return _to_sheet_item(row)


@router.get("/playlists/{playlist_id}/tracks")
def playlist_tracks(playlist_id: int, page: int = Query(default=1, ge=1), page_size: int = Query(default=100, ge=1, le=200)) -> dict:
    rows = _reader().list_playlist_tracks(playlist_id=playlist_id, page=page, page_size=page_size)
    return {"isEnd": len(rows) < page_size, "musicList": [_to_music_item(row) for row in rows]}


@router.get("/toplists")
def toplists() -> list[dict]:
    groups = _reader().list_toplists()
    normalized = []
    for group in groups:
        normalized.append({"title": group["title"], "data": [_to_sheet_item(row) for row in group["data"]]})
    return normalized

src/music_server/services/player_service.py

from datetime import datetime, timezone

from ..db import connect_sqlite


class PlayerService:
    def __init__(self, db_path: str) -> None:
        self._db_path = db_path

    def add_favorite_track(self, track_id: int) -> None:
        conn = connect_sqlite(self._db_path)
        conn.execute(
            """
            insert into favorite_tracks (track_id, added_at)
            values (?, ?)
            on conflict(track_id) do update set added_at = excluded.added_at
            """,
            (track_id, datetime.now(timezone.utc).isoformat()),
        )
        conn.commit()
        conn.close()

    def list_favorite_tracks(self) -> list[dict]:
        conn = connect_sqlite(self._db_path)
        rows = conn.execute("select track_id from favorite_tracks order by added_at desc").fetchall()
        conn.close()
        return [dict(row) for row in rows]

    def add_favorite_playlist(self, playlist_id: int) -> None:
        conn = connect_sqlite(self._db_path)
        conn.execute(
            """
            insert into favorite_playlists (playlist_id, added_at)
            values (?, ?)
            on conflict(playlist_id) do update set added_at = excluded.added_at
            """,
            (playlist_id, datetime.now(timezone.utc).isoformat()),
        )
        conn.commit()
        conn.close()

    def list_favorite_playlists(self) -> list[dict]:
        conn = connect_sqlite(self._db_path)
        rows = conn.execute("select playlist_id from favorite_playlists order by added_at desc").fetchall()
        conn.close()
        return [dict(row) for row in rows]

    def record_history(self, track_id: int, progress_seconds: int) -> None:
        conn = connect_sqlite(self._db_path)
        conn.execute(
            """
            insert into play_history (track_id, played_at, progress_seconds)
            values (?, ?, ?)
            """,
            (track_id, datetime.now(timezone.utc).isoformat(), progress_seconds),
        )
        conn.commit()
        conn.close()

    def list_history(self) -> list[dict]:
        conn = connect_sqlite(self._db_path)
        rows = conn.execute(
            """
            select track_id, played_at, progress_seconds
            from play_history
            order by played_at desc, id desc
            limit 100
            """
        ).fetchall()
        conn.close()
        return [dict(row) for row in rows]

src/music_server/routes/player.py

from fastapi import APIRouter, Depends, Response

from ..auth import require_bearer_token
from ..services.catalog_reader import CatalogReader
from ..services.player_service import PlayerService
from ..settings import get_settings

router = APIRouter(prefix="/player/v1", dependencies=[Depends(require_bearer_token)])


def _player_service() -> PlayerService:
    return PlayerService(db_path=get_settings().player_db_path)


def _catalog_reader() -> CatalogReader:
    return CatalogReader(db_path=get_settings().catalog_db_path)


@router.get("/home")
def home() -> dict:
    return {
        "recommend_playlists": _catalog_reader().list_playlists(page=1, page_size=12),
        "favorite_playlists": _player_service().list_favorite_playlists(),
        "recent_history": _player_service().list_history(),
    }


@router.put("/me/favorites/tracks/{track_id}", status_code=204)
def add_favorite_track(track_id: int) -> Response:
    _player_service().add_favorite_track(track_id=track_id)
    return Response(status_code=204)


@router.get("/me/favorites/tracks")
def list_favorite_tracks() -> dict:
    return {"items": _player_service().list_favorite_tracks()}


@router.put("/me/favorites/playlists/{playlist_id}", status_code=204)
def add_favorite_playlist(playlist_id: int) -> Response:
    _player_service().add_favorite_playlist(playlist_id=playlist_id)
    return Response(status_code=204)


@router.get("/me/favorites/playlists")
def list_favorite_playlists() -> dict:
    return {"items": _player_service().list_favorite_playlists()}


@router.post("/me/history", status_code=201)
def record_history(payload: dict) -> dict:
    _player_service().record_history(track_id=int(payload["track_id"]), progress_seconds=int(payload.get("progress_seconds", 0)))
    return {"status": "created"}


@router.get("/me/history")
def list_history() -> dict:
    return {"items": _player_service().list_history()}
  • Step 4: Run tests to verify they pass

Run: python -m unittest tests.test_mf_detail_routes tests.test_player_history_routes -v
Expected: OK

  • Step 5: Commit
git add src/music_server/services/catalog_reader.py src/music_server/routes/mf_catalog.py src/music_server/services/player_service.py src/music_server/routes/player.py tests/test_mf_detail_routes.py tests/test_player_history_routes.py
git commit -m "feat: add musicfree detail routes and player history endpoints"