44 KiB
Public Music Service Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Build a standalone public service that loads the exported snapshot, serves MusicFree catalog APIs, resolves playable media with signed stream URLs, and exposes the first version of player-backend endpoints.
Architecture: Use one Python web service for both /mf/v1/* and /player/v1/*, backed by a read-only catalog_read.db plus a writable player.db. Keep catalog reading, auth, covers, media resolution, streaming, and player-user features in separate modules so the same process can serve both MusicFree and future first-party clients cleanly.
Tech Stack: Python 3.11+, FastAPI, Uvicorn, sqlite3, httpx, pydantic, unittest
Repository root: D:\source\musicdl-catalog-sync-worktrees\Music_Server
File Structure
- Create:
pyproject.toml - Create:
requirements.txt - Create:
src/music_server/__init__.py - Create:
src/music_server/settings.py - Create:
src/music_server/auth.py - Create:
src/music_server/app.py - Create:
src/music_server/db.py - Create:
src/music_server/routes/health.py - Create:
src/music_server/routes/mf_catalog.py - Create:
src/music_server/routes/mf_media.py - Create:
src/music_server/routes/player.py - Create:
src/music_server/routes/covers.py - Create:
src/music_server/services/catalog_reader.py - Create:
src/music_server/services/cover_service.py - Create:
src/music_server/services/media_resolver.py - Create:
src/music_server/services/stream_tokens.py - Create:
src/music_server/services/player_service.py - Create:
tests/__init__.py - Create:
tests/test_health.py - Create:
tests/test_catalog_reader.py - Create:
tests/test_mf_catalog_routes.py - Create:
tests/test_mf_media_routes.py - Create:
tests/test_player_routes.py
Task 1: Scaffold the service repo, settings, auth, and health endpoint
Files:
-
Create:
pyproject.toml -
Create:
requirements.txt -
Create:
src/music_server/__init__.py -
Create:
src/music_server/settings.py -
Create:
src/music_server/auth.py -
Create:
src/music_server/app.py -
Create:
src/music_server/routes/health.py -
Create:
tests/__init__.py -
Test:
tests/test_health.py -
Step 1: Write the failing test
import unittest
from fastapi.testclient import TestClient
from music_server.app import create_app
class HealthRouteTests(unittest.TestCase):
def test_healthz_returns_ok(self):
client = TestClient(create_app())
response = client.get("/healthz")
self.assertEqual(200, response.status_code)
self.assertEqual({"status": "ok"}, response.json())
if __name__ == "__main__":
unittest.main()
- Step 2: Run test to verify it fails
Run: python -m unittest tests.test_health -v
Expected: ERROR with ModuleNotFoundError: No module named 'music_server'
- Step 3: Write minimal implementation
requirements.txt
fastapi==0.115.0
uvicorn==0.32.0
httpx==0.27.2
pyproject.toml
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "music-server"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
"fastapi==0.115.0",
"uvicorn==0.32.0",
"httpx==0.27.2",
]
[tool.setuptools.packages.find]
where = ["src"]
tests/__init__.py
import sys
from pathlib import Path
SRC_DIR = Path(__file__).resolve().parents[1] / "src"
if str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))
src/music_server/settings.py
import os
from dataclasses import dataclass
@dataclass(frozen=True)
class Settings:
access_token: str = os.getenv("PUBLIC_MUSIC_ACCESS_TOKEN", "dev-token")
catalog_db_path: str = os.getenv("CATALOG_DB_PATH", "./data/catalog_read.db")
player_db_path: str = os.getenv("PLAYER_DB_PATH", "./data/player.db")
def get_settings() -> Settings:
return Settings()
src/music_server/auth.py
from fastapi import Header, HTTPException
from .settings import get_settings
def require_bearer_token(authorization: str | None = Header(default=None)) -> None:
expected = f"Bearer {get_settings().access_token}"
if authorization != expected:
raise HTTPException(status_code=401, detail="unauthorized")
src/music_server/routes/health.py
from fastapi import APIRouter
router = APIRouter()
@router.get("/healthz")
def healthz() -> dict:
return {"status": "ok"}
src/music_server/app.py
from fastapi import FastAPI
from .routes.health import router as health_router
def create_app() -> FastAPI:
app = FastAPI(title="Public Music Service")
app.include_router(health_router)
return app
- Step 4: Run test to verify it passes
Run: python -m unittest tests.test_health -v
Expected: OK
Note: tests/__init__.py enables importing the src-layout package from repo-root unittest runs.
- Step 5: Commit
git add pyproject.toml requirements.txt src/music_server tests/__init__.py tests/test_health.py
git commit -m "feat: scaffold public music service"
Task 2: Load the snapshot database and expose catalog reading primitives
Files:
-
Create:
src/music_server/db.py -
Create:
src/music_server/services/catalog_reader.py -
Test:
tests/test_catalog_reader.py -
Step 1: Write the failing test
import sqlite3
import tempfile
import unittest
from pathlib import Path
from music_server.services.catalog_reader import CatalogReader
class CatalogReaderTests(unittest.TestCase):
def test_list_playlists_orders_by_play_count_desc(self):
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "catalog_read.db"
conn = sqlite3.connect(db_path)
conn.executescript(
'''
create table catalog_playlists (
playlist_id integer primary key,
platform text not null,
remote_playlist_id text not null,
name text not null,
description text,
cover_url text,
play_count integer not null,
song_count integer not null
);
insert into catalog_playlists values (1, 'netease', '18165', 'A', 'desc', 'https://img/a.jpg', 10, 3);
insert into catalog_playlists values (2, 'qq', '75', 'B', 'desc', 'https://img/b.jpg', 100, 2);
'''
)
conn.commit()
conn.close()
reader = CatalogReader(db_path=str(db_path))
rows = reader.list_playlists(page=1, page_size=10)
self.assertEqual([2, 1], [row["playlist_id"] for row in rows])
if __name__ == "__main__":
unittest.main()
- Step 2: Run test to verify it fails
Run: python -m unittest tests.test_catalog_reader -v
Expected: ERROR because CatalogReader does not exist yet
- Step 3: Write minimal implementation
src/music_server/db.py
import sqlite3
def connect_sqlite(db_path: str) -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
return conn
src/music_server/services/catalog_reader.py
from ..db import connect_sqlite
class CatalogReader:
def __init__(self, db_path: str) -> None:
self._db_path = db_path
def list_playlists(self, page: int, page_size: int) -> list[dict]:
offset = max(0, (page - 1) * page_size)
conn = connect_sqlite(self._db_path)
rows = conn.execute(
"""
select playlist_id, platform, remote_playlist_id, name, description, cover_url, play_count, song_count
from catalog_playlists
where song_count > 0
order by play_count desc, playlist_id asc
limit ? offset ?
""",
(page_size, offset),
).fetchall()
conn.close()
return [dict(row) for row in rows]
def get_playlist(self, playlist_id: int) -> dict | None:
conn = connect_sqlite(self._db_path)
row = conn.execute(
"""
select playlist_id, platform, remote_playlist_id, name, description, cover_url, play_count, song_count
from catalog_playlists
where playlist_id = ?
""",
(playlist_id,),
).fetchone()
conn.close()
return dict(row) if row else None
- Step 4: Run test to verify it passes
Run: python -m unittest tests.test_catalog_reader -v
Expected: OK
- Step 5: Commit
git add src/music_server/db.py src/music_server/services/catalog_reader.py tests/test_catalog_reader.py
git commit -m "feat: load exported catalog snapshot"
Task 3: Implement MusicFree catalog APIs for tags, playlists, and toplists
Files:
-
Create:
src/music_server/routes/mf_catalog.py -
Modify:
src/music_server/app.py -
Test:
tests/test_mf_catalog_routes.py -
Step 1: Write the failing test
import sqlite3
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
from fastapi.testclient import TestClient
from music_server.app import create_app
class MfCatalogRouteTests(unittest.TestCase):
def test_recommend_sheets_returns_musicfree_shape(self):
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "catalog_read.db"
conn = sqlite3.connect(db_path)
conn.executescript(
'''
create table catalog_playlists (
playlist_id integer primary key,
platform text not null,
remote_playlist_id text not null,
name text not null,
description text,
cover_url text,
play_count integer not null,
song_count integer not null
);
insert into catalog_playlists values (1, 'netease', '18165', '娴嬭瘯姝屽崟', 'desc', 'https://img/1.jpg', 99, 5);
'''
)
conn.commit()
conn.close()
with patch.dict("os.environ", {"CATALOG_DB_PATH": str(db_path)}):
client = TestClient(create_app())
response = client.get(
"/mf/v1/recommend/sheets?page=1&page_size=20",
headers={"Authorization": "Bearer dev-token"},
)
self.assertEqual(200, response.status_code)
payload = response.json()
self.assertFalse(payload["isEnd"])
self.assertEqual("catalogsync:playlist:1", payload["data"][0]["id"])
self.assertEqual("娴嬭瘯姝屽崟", payload["data"][0]["title"])
if __name__ == "__main__":
unittest.main()
- Step 2: Run test to verify it fails
Run: python -m unittest tests.test_mf_catalog_routes -v
Expected: FAIL with 404 Not Found for /mf/v1/recommend/sheets
- Step 3: Write minimal implementation
src/music_server/routes/mf_catalog.py
from fastapi import APIRouter, Depends, Query
from ..auth import require_bearer_token
from ..services.catalog_reader import CatalogReader
from ..settings import get_settings
router = APIRouter(prefix="/mf/v1", dependencies=[Depends(require_bearer_token)])
def _reader() -> CatalogReader:
return CatalogReader(db_path=get_settings().catalog_db_path)
def _to_sheet_item(row: dict) -> dict:
return {
"id": f"catalogsync:playlist:{row['playlist_id']}",
"platform": "catalogsync",
"title": row["name"],
"coverImg": row["cover_url"] or "",
"description": row["description"] or "",
"worksNum": row["song_count"],
"playCount": row["play_count"],
}
@router.get("/recommend/tags")
def recommend_tags() -> dict:
return {
"pinned": [
{"id": "all", "title": "鍏ㄩ儴"},
{"id": "netease", "title": "缃戞槗浜?},
{"id": "qq", "title": "QQ闊充箰"},
{"id": "kuwo", "title": "閰锋垜"},
],
"data": [
{
"title": "鏉ユ簮",
"data": [
{"id": "playlist_square", "title": "姝屽崟骞垮満"},
{"id": "toplist", "title": "鎺掕姒?},
],
}
],
}
@router.get("/recommend/sheets")
def recommend_sheets(
page: int = Query(default=1, ge=1),
page_size: int = Query(default=60, ge=1, le=200),
) -> dict:
rows = _reader().list_playlists(page=page, page_size=page_size)
return {
"isEnd": len(rows) < page_size,
"data": [_to_sheet_item(row) for row in rows],
}
src/music_server/app.py
from fastapi import FastAPI
from .routes.health import router as health_router
from .routes.mf_catalog import router as mf_catalog_router
def create_app() -> FastAPI:
app = FastAPI(title="Public Music Service")
app.include_router(health_router)
app.include_router(mf_catalog_router)
return app
- Step 4: Run test to verify it passes
Run: python -m unittest tests.test_mf_catalog_routes -v
Expected: OK
- Step 5: Commit
git add src/music_server/routes/mf_catalog.py src/music_server/app.py tests/test_mf_catalog_routes.py
git commit -m "feat: add musicfree catalog endpoints"
Task 4: Implement cover and media resolve endpoints with signed stream URLs
Files:
-
Create:
src/music_server/routes/covers.py -
Create:
src/music_server/routes/mf_media.py -
Create:
src/music_server/services/cover_service.py -
Create:
src/music_server/services/media_resolver.py -
Create:
src/music_server/services/stream_tokens.py -
Modify:
src/music_server/app.py -
Test:
tests/test_mf_media_routes.py -
Step 1: Write the failing test
import sqlite3
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
from fastapi.testclient import TestClient
from music_server.app import create_app
class MfMediaRouteTests(unittest.TestCase):
def test_media_resolve_prefers_public_url_and_returns_signed_stream(self):
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "catalog_read.db"
conn = sqlite3.connect(db_path)
conn.executescript(
'''
create table catalog_track_files (
song_id integer not null,
quality_label text,
ext text,
file_size_bytes integer,
backend_type text,
backend_name text,
locator text,
public_url text,
status text,
is_primary integer
);
insert into catalog_track_files values (
3476, 'super', 'flac', 42345678, 'object_storage', 'main-s3',
'music/netease/test.flac', 'https://cdn.example/test.flac', 'active', 1
);
'''
)
conn.commit()
conn.close()
with patch.dict(
"os.environ",
{
"CATALOG_DB_PATH": str(db_path),
"PUBLIC_MUSIC_ACCESS_TOKEN": "dev-token",
},
):
client = TestClient(create_app())
response = client.post(
"/mf/v1/media/resolve",
headers={"Authorization": "Bearer dev-token"},
json={"song_id": "catalogsync:song:3476", "quality": "super"},
)
self.assertEqual(200, response.status_code)
payload = response.json()
self.assertEqual("object_storage", payload["selected_source"]["kind"])
self.assertIn("/mf/v1/media/stream/", payload["stream"]["url"])
if __name__ == "__main__":
unittest.main()
- Step 2: Run test to verify it fails
Run: python -m unittest tests.test_mf_media_routes -v
Expected: FAIL with 404 Not Found for /mf/v1/media/resolve
- Step 3: Write minimal implementation
src/music_server/services/stream_tokens.py
import base64
import hashlib
import hmac
import json
import time
def create_stream_token(secret: str, song_id: int, locator: str, ttl_seconds: int = 300) -> str:
payload = {
"song_id": int(song_id),
"locator": locator,
"expires_at": int(time.time()) + int(ttl_seconds),
}
payload_json = json.dumps(payload, separators=(",", ":"), ensure_ascii=False)
signature = hmac.new(secret.encode("utf-8"), payload_json.encode("utf-8"), hashlib.sha256).hexdigest()
envelope = {"payload": payload, "sig": signature}
return base64.urlsafe_b64encode(json.dumps(envelope, separators=(",", ":"), ensure_ascii=False).encode("utf-8")).decode("ascii")
src/music_server/services/media_resolver.py
from ..db import connect_sqlite
class MediaResolver:
def __init__(self, db_path: str) -> None:
self._db_path = db_path
def resolve(self, song_id: int, quality: str) -> dict:
conn = connect_sqlite(self._db_path)
row = conn.execute(
"""
select song_id, quality_label, ext, file_size_bytes, backend_type, backend_name, locator, public_url
from catalog_track_files
where song_id = ? and status = 'active'
order by case when quality_label = ? then 0 else 1 end, is_primary desc
limit 1
""",
(song_id, quality),
).fetchone()
conn.close()
if row is None:
raise LookupError("no playable source found")
return dict(row)
src/music_server/routes/mf_media.py
from fastapi import APIRouter, Depends, HTTPException
from ..auth import require_bearer_token
from ..services.media_resolver import MediaResolver
from ..services.stream_tokens import create_stream_token
from ..settings import get_settings
router = APIRouter(prefix="/mf/v1", dependencies=[Depends(require_bearer_token)])
def _song_id_from_public_id(public_id: str) -> int:
return int(str(public_id).split(":")[-1])
@router.post("/media/resolve")
def media_resolve(payload: dict) -> dict:
settings = get_settings()
song_id = _song_id_from_public_id(payload["song_id"])
quality = payload.get("quality", "standard")
try:
resolved = MediaResolver(db_path=settings.catalog_db_path).resolve(song_id=song_id, quality=quality)
except LookupError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
token = create_stream_token(
secret=settings.access_token,
song_id=song_id,
locator=resolved["locator"],
)
return {
"song_id": payload["song_id"],
"selected_source": {
"kind": resolved["backend_type"],
"backend": resolved["backend_name"],
"quality": resolved["quality_label"] or quality,
"ext": resolved["ext"],
"size_bytes": resolved["file_size_bytes"],
},
"stream": {
"url": f"/mf/v1/media/stream/{token}",
"headers": {},
"range_supported": True,
},
}
src/music_server/services/cover_service.py
from fastapi import HTTPException
from ..db import connect_sqlite
class CoverService:
def __init__(self, db_path: str) -> None:
self._db_path = db_path
def playlist_cover_url(self, playlist_id: int) -> str:
conn = connect_sqlite(self._db_path)
row = conn.execute(
"select cover_url from catalog_playlists where playlist_id = ?",
(playlist_id,),
).fetchone()
conn.close()
if row is None or not row["cover_url"]:
raise HTTPException(status_code=404, detail="playlist cover not found")
return str(row["cover_url"])
def song_cover_url(self, song_id: int) -> str:
conn = connect_sqlite(self._db_path)
row = conn.execute(
"select cover_url from catalog_tracks where song_id = ?",
(song_id,),
).fetchone()
conn.close()
if row is None or not row["cover_url"]:
raise HTTPException(status_code=404, detail="song cover not found")
return str(row["cover_url"])
src/music_server/routes/covers.py
from fastapi import APIRouter
from fastapi.responses import RedirectResponse
from ..services.cover_service import CoverService
from ..settings import get_settings
router = APIRouter(prefix="/mf/v1/covers")
def _cover_service() -> CoverService:
return CoverService(db_path=get_settings().catalog_db_path)
@router.get("/playlists/{playlist_id}")
def playlist_cover(playlist_id: int) -> RedirectResponse:
return RedirectResponse(_cover_service().playlist_cover_url(playlist_id=playlist_id), status_code=307)
@router.get("/songs/{song_id}")
def song_cover(song_id: int) -> RedirectResponse:
return RedirectResponse(_cover_service().song_cover_url(song_id=song_id), status_code=307)
src/music_server/app.py
from fastapi import FastAPI
from .routes.covers import router as covers_router
from .routes.health import router as health_router
from .routes.mf_catalog import router as mf_catalog_router
from .routes.mf_media import router as mf_media_router
def create_app() -> FastAPI:
app = FastAPI(title="Public Music Service")
app.include_router(health_router)
app.include_router(mf_catalog_router)
app.include_router(mf_media_router)
app.include_router(covers_router)
return app
- Step 4: Run test to verify it passes
Run: python -m unittest tests.test_mf_media_routes -v
Expected: OK
- Step 5: Commit
git add src/music_server/routes/covers.py src/music_server/routes/mf_media.py src/music_server/services/media_resolver.py src/music_server/services/stream_tokens.py src/music_server/app.py tests/test_mf_media_routes.py
git commit -m "feat: add media resolve and cover endpoints"
Task 5: Add player persistence and /player/v1/* endpoints
Files:
-
Create:
src/music_server/services/player_service.py -
Create:
src/music_server/routes/player.py -
Modify:
src/music_server/app.py -
Test:
tests/test_player_routes.py -
Step 1: Write the failing test
import sqlite3
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
from fastapi.testclient import TestClient
from music_server.app import create_app
class PlayerRouteTests(unittest.TestCase):
def test_put_and_get_favorite_tracks(self):
with tempfile.TemporaryDirectory() as tmpdir:
player_db = Path(tmpdir) / "player.db"
conn = sqlite3.connect(player_db)
conn.executescript(
'''
create table favorite_tracks (
track_id integer primary key,
added_at text not null
);
'''
)
conn.commit()
conn.close()
with patch.dict(
"os.environ",
{
"PLAYER_DB_PATH": str(player_db),
"PUBLIC_MUSIC_ACCESS_TOKEN": "dev-token",
},
):
client = TestClient(create_app())
put_response = client.put(
"/player/v1/me/favorites/tracks/3476",
headers={"Authorization": "Bearer dev-token"},
)
get_response = client.get(
"/player/v1/me/favorites/tracks",
headers={"Authorization": "Bearer dev-token"},
)
self.assertEqual(204, put_response.status_code)
self.assertEqual([{"track_id": 3476}], get_response.json()["items"])
if __name__ == "__main__":
unittest.main()
- Step 2: Run test to verify it fails
Run: python -m unittest tests.test_player_routes -v
Expected: FAIL with 404 Not Found for /player/v1/me/favorites/tracks/3476
- Step 3: Write minimal implementation
src/music_server/services/player_service.py
from datetime import datetime, timezone
from ..db import connect_sqlite
class PlayerService:
def __init__(self, db_path: str) -> None:
self._db_path = db_path
def add_favorite_track(self, track_id: int) -> None:
conn = connect_sqlite(self._db_path)
conn.execute(
"""
insert into favorite_tracks (track_id, added_at)
values (?, ?)
on conflict(track_id) do update set added_at = excluded.added_at
""",
(track_id, datetime.now(timezone.utc).isoformat()),
)
conn.commit()
conn.close()
def list_favorite_tracks(self) -> list[dict]:
conn = connect_sqlite(self._db_path)
rows = conn.execute(
"""
select track_id
from favorite_tracks
order by added_at desc
"""
).fetchall()
conn.close()
return [dict(row) for row in rows]
src/music_server/routes/player.py
from fastapi import APIRouter, Depends, Response
from ..auth import require_bearer_token
from ..services.player_service import PlayerService
from ..settings import get_settings
router = APIRouter(prefix="/player/v1", dependencies=[Depends(require_bearer_token)])
def _player_service() -> PlayerService:
return PlayerService(db_path=get_settings().player_db_path)
@router.put("/me/favorites/tracks/{track_id}", status_code=204)
def add_favorite_track(track_id: int) -> Response:
_player_service().add_favorite_track(track_id=track_id)
return Response(status_code=204)
@router.get("/me/favorites/tracks")
def list_favorite_tracks() -> dict:
return {"items": _player_service().list_favorite_tracks()}
src/music_server/app.py
from fastapi import FastAPI
from .routes.covers import router as covers_router
from .routes.health import router as health_router
from .routes.mf_catalog import router as mf_catalog_router
from .routes.mf_media import router as mf_media_router
from .routes.player import router as player_router
def create_app() -> FastAPI:
app = FastAPI(title="Public Music Service")
app.include_router(health_router)
app.include_router(mf_catalog_router)
app.include_router(mf_media_router)
app.include_router(covers_router)
app.include_router(player_router)
return app
- Step 4: Run test to verify it passes
Run: python -m unittest tests.test_player_routes -v
Expected: OK
- Step 5: Commit
git add src/music_server/services/player_service.py src/music_server/routes/player.py src/music_server/app.py tests/test_player_routes.py
git commit -m "feat: add player favorite endpoints"
Task 6: Complete playlist detail, toplist detail, home, and history endpoints
Files:
-
Modify:
src/music_server/services/catalog_reader.py -
Modify:
src/music_server/routes/mf_catalog.py -
Modify:
src/music_server/services/player_service.py -
Modify:
src/music_server/routes/player.py -
Test:
tests/test_mf_detail_routes.py -
Test:
tests/test_player_history_routes.py -
Step 1: Write the failing tests
import sqlite3
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
from fastapi.testclient import TestClient
from music_server.app import create_app
class MfDetailRouteTests(unittest.TestCase):
def test_playlist_tracks_and_toplists_return_expected_shapes(self):
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "catalog_read.db"
conn = sqlite3.connect(db_path)
conn.executescript(
'''
create table catalog_playlists (
playlist_id integer primary key,
platform text not null,
remote_playlist_id text not null,
name text not null,
description text,
cover_url text,
play_count integer not null,
song_count integer not null
);
create table catalog_tracks (
song_id integer primary key,
platform text not null,
remote_song_id text not null,
name text not null,
singers text,
album text,
cover_url text,
duration_ms integer,
metadata_json text
);
create table catalog_playlist_tracks (
playlist_id integer not null,
song_id integer not null,
position integer not null
);
create table catalog_toplists (
toplist_id text primary key,
platform text not null,
name text not null,
description text,
cover_url text,
play_count integer not null,
song_count integer not null,
group_name text not null
);
create table catalog_toplist_tracks (
toplist_id text not null,
song_id integer not null,
position integer not null
);
insert into catalog_playlists values (1, 'netease', '18165', '测试歌单', 'desc', 'https://img/p.jpg', 99, 1);
insert into catalog_tracks values (3476, 'netease', '65800', '海屿你', '马也 / Crabbit', '', 'https://img/s.jpg', 0, '{}');
insert into catalog_playlist_tracks values (1, 3476, 1);
insert into catalog_toplists values ('kuwo_top_16', 'kuwo', '酷我飙升榜', 'desc', 'https://img/t.jpg', 1000, 1, '酷我');
insert into catalog_toplist_tracks values ('kuwo_top_16', 3476, 1);
'''
)
conn.commit()
conn.close()
with patch.dict("os.environ", {"CATALOG_DB_PATH": str(db_path)}):
client = TestClient(create_app())
playlist_response = client.get(
"/mf/v1/playlists/1/tracks?page=1&page_size=20",
headers={"Authorization": "Bearer dev-token"},
)
toplist_response = client.get(
"/mf/v1/toplists",
headers={"Authorization": "Bearer dev-token"},
)
self.assertEqual(200, playlist_response.status_code)
self.assertEqual("catalogsync:song:3476", playlist_response.json()["musicList"][0]["id"])
self.assertEqual(200, toplist_response.status_code)
self.assertEqual("酷我", toplist_response.json()[0]["title"])
if __name__ == "__main__":
unittest.main()
import sqlite3
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
from fastapi.testclient import TestClient
from music_server.app import create_app
class PlayerHistoryRouteTests(unittest.TestCase):
def test_home_and_history_routes_return_items(self):
with tempfile.TemporaryDirectory() as tmpdir:
player_db = Path(tmpdir) / "player.db"
catalog_db = Path(tmpdir) / "catalog_read.db"
player_conn = sqlite3.connect(player_db)
player_conn.executescript(
'''
create table play_history (
id integer primary key autoincrement,
track_id integer not null,
played_at text not null,
progress_seconds integer not null
);
create table favorite_playlists (
playlist_id integer primary key,
added_at text not null
);
'''
)
player_conn.commit()
player_conn.close()
catalog_conn = sqlite3.connect(catalog_db)
catalog_conn.executescript(
'''
create table catalog_playlists (
playlist_id integer primary key,
platform text not null,
remote_playlist_id text not null,
name text not null,
description text,
cover_url text,
play_count integer not null,
song_count integer not null
);
insert into catalog_playlists values (1, 'netease', '18165', '测试歌单', 'desc', 'https://img/p.jpg', 99, 1);
'''
)
catalog_conn.commit()
catalog_conn.close()
with patch.dict(
"os.environ",
{
"PLAYER_DB_PATH": str(player_db),
"CATALOG_DB_PATH": str(catalog_db),
"PUBLIC_MUSIC_ACCESS_TOKEN": "dev-token",
},
):
client = TestClient(create_app())
post_response = client.post(
"/player/v1/me/history",
headers={"Authorization": "Bearer dev-token"},
json={"track_id": 3476, "progress_seconds": 12},
)
home_response = client.get(
"/player/v1/home",
headers={"Authorization": "Bearer dev-token"},
)
history_response = client.get(
"/player/v1/me/history",
headers={"Authorization": "Bearer dev-token"},
)
self.assertEqual(201, post_response.status_code)
self.assertEqual(200, home_response.status_code)
self.assertEqual(3476, history_response.json()["items"][0]["track_id"])
if __name__ == "__main__":
unittest.main()
- Step 2: Run tests to verify they fail
Run: python -m unittest tests.test_mf_detail_routes tests.test_player_history_routes -v
Expected: FAIL with 404 Not Found for /mf/v1/playlists/1/tracks and /player/v1/home
- Step 3: Write minimal implementation
src/music_server/services/catalog_reader.py
from ..db import connect_sqlite
class CatalogReader:
def __init__(self, db_path: str) -> None:
self._db_path = db_path
def list_playlists(self, page: int, page_size: int) -> list[dict]:
offset = max(0, (page - 1) * page_size)
conn = connect_sqlite(self._db_path)
rows = conn.execute(
"""
select playlist_id, platform, remote_playlist_id, name, description, cover_url, play_count, song_count
from catalog_playlists
where song_count > 0
order by play_count desc, playlist_id asc
limit ? offset ?
""",
(page_size, offset),
).fetchall()
conn.close()
return [dict(row) for row in rows]
def get_playlist(self, playlist_id: int) -> dict | None:
conn = connect_sqlite(self._db_path)
row = conn.execute(
"""
select playlist_id, platform, remote_playlist_id, name, description, cover_url, play_count, song_count
from catalog_playlists
where playlist_id = ?
""",
(playlist_id,),
).fetchone()
conn.close()
return dict(row) if row else None
def list_playlist_tracks(self, playlist_id: int, page: int, page_size: int) -> list[dict]:
offset = max(0, (page - 1) * page_size)
conn = connect_sqlite(self._db_path)
rows = conn.execute(
"""
select t.song_id, t.name, t.singers, t.album, t.cover_url, t.duration_ms
from catalog_playlist_tracks pt
join catalog_tracks t on t.song_id = pt.song_id
where pt.playlist_id = ?
order by pt.position asc
limit ? offset ?
""",
(playlist_id, page_size, offset),
).fetchall()
conn.close()
return [dict(row) for row in rows]
def list_toplists(self) -> list[dict]:
conn = connect_sqlite(self._db_path)
rows = conn.execute(
"""
select toplist_id, platform, name, description, cover_url, play_count, song_count, group_name
from catalog_toplists
order by group_name asc, play_count desc, name asc
"""
).fetchall()
conn.close()
grouped: dict[str, list[dict]] = {}
for row in rows:
grouped.setdefault(row["group_name"], []).append(dict(row))
return [{"title": group_name, "data": items} for group_name, items in grouped.items()]
src/music_server/routes/mf_catalog.py
from fastapi import APIRouter, Depends, Query
from ..auth import require_bearer_token
from ..services.catalog_reader import CatalogReader
from ..settings import get_settings
router = APIRouter(prefix="/mf/v1", dependencies=[Depends(require_bearer_token)])
def _reader() -> CatalogReader:
return CatalogReader(db_path=get_settings().catalog_db_path)
def _to_sheet_item(row: dict) -> dict:
item_id = row.get("playlist_id", row.get("toplist_id"))
item_prefix = "playlist" if "playlist_id" in row else "toplist"
return {
"id": f"catalogsync:{item_prefix}:{item_id}",
"platform": "catalogsync",
"title": row["name"],
"coverImg": row.get("cover_url") or "",
"description": row.get("description") or "",
"worksNum": row.get("song_count", 0),
"playCount": row.get("play_count", 0),
}
def _to_music_item(row: dict) -> dict:
return {
"id": f"catalogsync:song:{row['song_id']}",
"platform": "catalogsync",
"title": row["name"],
"artist": row.get("singers") or "",
"album": row.get("album") or "",
"artwork": row.get("cover_url") or "",
"duration": int((row.get("duration_ms") or 0) / 1000),
}
@router.get("/recommend/tags")
def recommend_tags() -> dict:
return {
"pinned": [
{"id": "all", "title": "全部"},
{"id": "netease", "title": "网易云"},
{"id": "qq", "title": "QQ音乐"},
{"id": "kuwo", "title": "酷我"},
],
"data": [{"title": "来源", "data": [{"id": "playlist_square", "title": "歌单广场"}, {"id": "toplist", "title": "排行榜"}]}],
}
@router.get("/recommend/sheets")
def recommend_sheets(page: int = Query(default=1, ge=1), page_size: int = Query(default=60, ge=1, le=200)) -> dict:
rows = _reader().list_playlists(page=page, page_size=page_size)
return {"isEnd": len(rows) < page_size, "data": [_to_sheet_item(row) for row in rows]}
@router.get("/playlists/{playlist_id}")
def playlist_detail(playlist_id: int) -> dict:
row = _reader().get_playlist(playlist_id=playlist_id)
return _to_sheet_item(row)
@router.get("/playlists/{playlist_id}/tracks")
def playlist_tracks(playlist_id: int, page: int = Query(default=1, ge=1), page_size: int = Query(default=100, ge=1, le=200)) -> dict:
rows = _reader().list_playlist_tracks(playlist_id=playlist_id, page=page, page_size=page_size)
return {"isEnd": len(rows) < page_size, "musicList": [_to_music_item(row) for row in rows]}
@router.get("/toplists")
def toplists() -> list[dict]:
groups = _reader().list_toplists()
normalized = []
for group in groups:
normalized.append({"title": group["title"], "data": [_to_sheet_item(row) for row in group["data"]]})
return normalized
src/music_server/services/player_service.py
from datetime import datetime, timezone
from ..db import connect_sqlite
class PlayerService:
def __init__(self, db_path: str) -> None:
self._db_path = db_path
def add_favorite_track(self, track_id: int) -> None:
conn = connect_sqlite(self._db_path)
conn.execute(
"""
insert into favorite_tracks (track_id, added_at)
values (?, ?)
on conflict(track_id) do update set added_at = excluded.added_at
""",
(track_id, datetime.now(timezone.utc).isoformat()),
)
conn.commit()
conn.close()
def list_favorite_tracks(self) -> list[dict]:
conn = connect_sqlite(self._db_path)
rows = conn.execute("select track_id from favorite_tracks order by added_at desc").fetchall()
conn.close()
return [dict(row) for row in rows]
def add_favorite_playlist(self, playlist_id: int) -> None:
conn = connect_sqlite(self._db_path)
conn.execute(
"""
insert into favorite_playlists (playlist_id, added_at)
values (?, ?)
on conflict(playlist_id) do update set added_at = excluded.added_at
""",
(playlist_id, datetime.now(timezone.utc).isoformat()),
)
conn.commit()
conn.close()
def list_favorite_playlists(self) -> list[dict]:
conn = connect_sqlite(self._db_path)
rows = conn.execute("select playlist_id from favorite_playlists order by added_at desc").fetchall()
conn.close()
return [dict(row) for row in rows]
def record_history(self, track_id: int, progress_seconds: int) -> None:
conn = connect_sqlite(self._db_path)
conn.execute(
"""
insert into play_history (track_id, played_at, progress_seconds)
values (?, ?, ?)
""",
(track_id, datetime.now(timezone.utc).isoformat(), progress_seconds),
)
conn.commit()
conn.close()
def list_history(self) -> list[dict]:
conn = connect_sqlite(self._db_path)
rows = conn.execute(
"""
select track_id, played_at, progress_seconds
from play_history
order by played_at desc, id desc
limit 100
"""
).fetchall()
conn.close()
return [dict(row) for row in rows]
src/music_server/routes/player.py
from fastapi import APIRouter, Depends, Response
from ..auth import require_bearer_token
from ..services.catalog_reader import CatalogReader
from ..services.player_service import PlayerService
from ..settings import get_settings
router = APIRouter(prefix="/player/v1", dependencies=[Depends(require_bearer_token)])
def _player_service() -> PlayerService:
return PlayerService(db_path=get_settings().player_db_path)
def _catalog_reader() -> CatalogReader:
return CatalogReader(db_path=get_settings().catalog_db_path)
@router.get("/home")
def home() -> dict:
return {
"recommend_playlists": _catalog_reader().list_playlists(page=1, page_size=12),
"favorite_playlists": _player_service().list_favorite_playlists(),
"recent_history": _player_service().list_history(),
}
@router.put("/me/favorites/tracks/{track_id}", status_code=204)
def add_favorite_track(track_id: int) -> Response:
_player_service().add_favorite_track(track_id=track_id)
return Response(status_code=204)
@router.get("/me/favorites/tracks")
def list_favorite_tracks() -> dict:
return {"items": _player_service().list_favorite_tracks()}
@router.put("/me/favorites/playlists/{playlist_id}", status_code=204)
def add_favorite_playlist(playlist_id: int) -> Response:
_player_service().add_favorite_playlist(playlist_id=playlist_id)
return Response(status_code=204)
@router.get("/me/favorites/playlists")
def list_favorite_playlists() -> dict:
return {"items": _player_service().list_favorite_playlists()}
@router.post("/me/history", status_code=201)
def record_history(payload: dict) -> dict:
_player_service().record_history(track_id=int(payload["track_id"]), progress_seconds=int(payload.get("progress_seconds", 0)))
return {"status": "created"}
@router.get("/me/history")
def list_history() -> dict:
return {"items": _player_service().list_history()}
- Step 4: Run tests to verify they pass
Run: python -m unittest tests.test_mf_detail_routes tests.test_player_history_routes -v
Expected: OK
- Step 5: Commit
git add src/music_server/services/catalog_reader.py src/music_server/routes/mf_catalog.py src/music_server/services/player_service.py src/music_server/routes/player.py tests/test_mf_detail_routes.py tests/test_player_history_routes.py
git commit -m "feat: add musicfree detail routes and player history endpoints"