43 KiB
MusicFree Pure Music_Server Plugin Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: 补齐 Music_Server 的榜单详情接口,并交付一个只依赖 Music_Server 的 MusicFree 插件,同时保留 netease_17000.js 兼容入口。
Architecture: 先在 Music_Server 读模型访问层和 /mf/v1/* 路由层补齐 toplists/{id} 与 toplists/{id}/tracks,复用现有 catalog_toplists / catalog_toplist_tracks 数据。插件侧沿用 Music_Free 目录现有的单文件 CommonJS 风格,新建 music_server.js 作为正式插件,使用懒加载 axios 和测试注入 client 的方式,这样本地 Node 测试不依赖额外安装 node_modules,最终再把 netease_17000.js 变成兼容壳。
Tech Stack: Python 3.11, FastAPI, sqlite3, unittest, JavaScript CommonJS, Node.js built-in test runner
Repository root: D:\source\musicdl-catalog-sync-worktrees\Music_Server
External plugin root: D:\source\MusicFree\keep-alive-master\Music_Free
File Structure
- Modify:
src/music_server/services/catalog_reader.py - Modify:
src/music_server/routes/mf_catalog.py - Modify:
tests/test_catalog_reader.py - Modify:
tests/test_mf_detail_routes.py - Create:
D:\source\MusicFree\keep-alive-master\Music_Free\music_server.js - Create:
D:\source\MusicFree\keep-alive-master\Music_Free\music_server.test.cjs - Modify:
D:\source\MusicFree\keep-alive-master\Music_Free\netease_17000.js
Task 1: 在 CatalogReader 中补齐榜单详情与榜单歌曲读取
Files:
-
Modify:
src/music_server/services/catalog_reader.py -
Modify:
tests/test_catalog_reader.py -
Step 1: Write the failing test
在 tests/test_catalog_reader.py 的 CatalogReaderTests 里追加这个测试:
def test_get_toplist_and_list_toplist_tracks(self):
conn = sqlite3.connect(self._db_path)
conn.executescript(
"""
create table catalog_toplists (
toplist_id text primary key,
platform text not null,
name text not null,
description text,
cover_url text,
play_count integer not null,
song_count integer not null,
group_name text not null
);
create table catalog_toplist_tracks (
toplist_id text not null,
song_id integer not null,
position integer not null
);
"""
)
conn.executemany(
"""
insert into catalog_tracks (
song_id, platform, remote_song_id, name, singers, album, cover_url, duration_ms, metadata_json
) values (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
(
2001,
"kuwo",
"kw-2001",
"Top Song A",
"Singer A",
"Album A",
"https://img/a.jpg",
210000,
"{}",
),
(
2002,
"kuwo",
"kw-2002",
"Top Song B",
"Singer B",
"Album B",
"https://img/b.jpg",
220000,
"{}",
),
],
)
conn.execute(
"""
insert into catalog_toplists (
toplist_id, platform, name, description, cover_url, play_count, song_count, group_name
) values (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"kuwo_top_16",
"kuwo",
"Cool Toplist",
"desc",
"https://img/top.jpg",
999,
2,
"Kuwo",
),
)
conn.executemany(
"""
insert into catalog_toplist_tracks (toplist_id, song_id, position)
values (?, ?, ?)
""",
[
("kuwo_top_16", 2001, 1),
("kuwo_top_16", 2002, 2),
],
)
conn.commit()
conn.close()
reader = CatalogReader(db_path=str(self._db_path))
toplist = reader.get_toplist("kuwo_top_16")
first_page = reader.list_toplist_tracks("kuwo_top_16", page=1, page_size=1)
second_page = reader.list_toplist_tracks("kuwo_top_16", page=2, page_size=1)
self.assertIsNotNone(toplist)
assert toplist is not None
self.assertEqual("kuwo", toplist["platform"])
self.assertEqual("Cool Toplist", toplist["name"])
self.assertEqual([2001], [row["song_id"] for row in first_page])
self.assertEqual([2002], [row["song_id"] for row in second_page])
- Step 2: Run test to verify it fails
Run:
python -m unittest tests.test_catalog_reader.CatalogReaderTests.test_get_toplist_and_list_toplist_tracks -v
Expected:
-
ERROR -
message includes
AttributeError: 'CatalogReader' object has no attribute 'get_toplist' -
Step 3: Write minimal implementation
在 src/music_server/services/catalog_reader.py 中添加这两个方法:
def get_toplist(self, toplist_id: str) -> ToplistRow | None:
with closing(connect_sqlite(self._db_path)) as conn:
row = conn.execute(
"""
select toplist_id, platform, name, description, cover_url, play_count, song_count, group_name
from catalog_toplists
where toplist_id = ?
""",
(toplist_id,),
).fetchone()
return cast(ToplistRow, dict(row)) if row else None
def list_toplist_tracks(
self, toplist_id: str, page: int, page_size: int
) -> list[PlaylistTrackRow]:
page, page_size = self._normalize_pagination(page, page_size)
offset = (page - 1) * page_size
with closing(connect_sqlite(self._db_path)) as conn:
rows = conn.execute(
"""
select t.song_id, t.name, t.singers, t.album, t.cover_url, t.duration_ms
from catalog_toplist_tracks tt
join catalog_tracks t on t.song_id = tt.song_id
where tt.toplist_id = ?
order by tt.position asc
limit ? offset ?
""",
(toplist_id, page_size, offset),
).fetchall()
return [cast(PlaylistTrackRow, dict(row)) for row in rows]
- Step 4: Run test to verify it passes
Run:
python -m unittest tests.test_catalog_reader.CatalogReaderTests.test_get_toplist_and_list_toplist_tracks -v
Expected:
-
OK -
Step 5: Commit
git add src/music_server/services/catalog_reader.py tests/test_catalog_reader.py
git commit -m "feat: add catalog toplist reader methods"
Task 2: 暴露 GET /mf/v1/toplists/{id} 与 GET /mf/v1/toplists/{id}/tracks
Files:
-
Modify:
src/music_server/routes/mf_catalog.py -
Modify:
tests/test_mf_detail_routes.py -
Step 1: Write the failing tests
在 tests/test_mf_detail_routes.py 中追加这两个测试:
def test_toplist_detail_and_tracks_return_expected_shape(self):
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "catalog_read.db"
self._prepare_catalog_db(db_path)
conn = sqlite3.connect(db_path)
conn.execute(
"""
insert into catalog_tracks (
song_id, platform, remote_song_id, name, singers, album, cover_url, duration_ms, source_meta
) values (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
4001,
"kuwo",
"kw-4001",
"Top Song 2",
"Singer 2",
"Album 2",
"https://img/s2.jpg",
180000,
"{}",
),
)
conn.execute(
"update catalog_toplists set song_count = 2 where toplist_id = ?",
("kuwo_top_16",),
)
conn.execute(
"""
insert into catalog_toplist_tracks (toplist_id, song_id, position)
values (?, ?, ?)
""",
("kuwo_top_16", 4001, 2),
)
conn.commit()
conn.close()
with patch.dict(
"os.environ",
{
"PUBLIC_MUSIC_ACCESS_TOKEN": "dev-token",
"CATALOG_DB_PATH": str(db_path),
},
clear=False,
):
client = TestClient(create_app())
detail_response = client.get(
"/mf/v1/toplists/kuwo_top_16",
headers={"Authorization": "Bearer dev-token"},
)
first_page_response = client.get(
"/mf/v1/toplists/kuwo_top_16/tracks?page=1&page_size=1",
headers={"Authorization": "Bearer dev-token"},
)
second_page_response = client.get(
"/mf/v1/toplists/kuwo_top_16/tracks?page=2&page_size=1",
headers={"Authorization": "Bearer dev-token"},
)
self.assertEqual(200, detail_response.status_code)
self.assertEqual("catalogsync:toplist:kuwo_top_16", detail_response.json()["id"])
self.assertEqual("kuwo", detail_response.json()["platform"])
self.assertEqual("閰锋垜椋欏崌姒?", detail_response.json()["title"])
self.assertEqual(200, first_page_response.status_code)
self.assertFalse(first_page_response.json()["isEnd"])
self.assertEqual(
"catalogsync:song:3476",
first_page_response.json()["musicList"][0]["id"],
)
self.assertEqual(200, second_page_response.status_code)
self.assertTrue(second_page_response.json()["isEnd"])
self.assertEqual(
"catalogsync:song:4001",
second_page_response.json()["musicList"][0]["id"],
)
def test_toplist_detail_returns_404_and_requires_token(self):
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "catalog_read.db"
self._prepare_catalog_db(db_path)
with patch.dict(
"os.environ",
{
"PUBLIC_MUSIC_ACCESS_TOKEN": "dev-token",
"CATALOG_DB_PATH": str(db_path),
},
clear=False,
):
client = TestClient(create_app())
missing_response = client.get(
"/mf/v1/toplists/missing_toplist",
headers={"Authorization": "Bearer dev-token"},
)
unauthorized_response = client.get("/mf/v1/toplists/kuwo_top_16")
self.assertEqual(404, missing_response.status_code)
self.assertEqual(401, unauthorized_response.status_code)
- Step 2: Run test to verify it fails
Run:
python -m unittest tests.test_mf_detail_routes.MfDetailRouteTests.test_toplist_detail_and_tracks_return_expected_shape tests.test_mf_detail_routes.MfDetailRouteTests.test_toplist_detail_returns_404_and_requires_token -v
Expected:
-
first test returns
404 Not Foundfor/mf/v1/toplists/kuwo_top_16 -
Step 3: Write minimal implementation
在 src/music_server/routes/mf_catalog.py 中添加这两个路由:
@router.get("/toplists/{toplist_id}")
def get_toplist(toplist_id: str) -> dict:
row = _reader().get_toplist(toplist_id=toplist_id)
if row is None:
raise HTTPException(status_code=404, detail="toplist not found")
return _to_sheet_item(row)
@router.get("/toplists/{toplist_id}/tracks")
def list_toplist_tracks(
toplist_id: str,
page: int = Query(default=1, ge=1),
page_size: int = Query(default=60, ge=1, le=200),
) -> dict:
reader = _reader()
if reader.get_toplist(toplist_id=toplist_id) is None:
raise HTTPException(status_code=404, detail="toplist not found")
rows = reader.list_toplist_tracks(
toplist_id=toplist_id,
page=page,
page_size=page_size,
)
if len(rows) < page_size:
is_end = True
else:
next_rows = reader.list_toplist_tracks(
toplist_id=toplist_id,
page=page + 1,
page_size=page_size,
)
is_end = len(next_rows) == 0
return {
"isEnd": is_end,
"musicList": [_to_music_item(row) for row in rows],
}
- Step 4: Run test to verify it passes
Run:
python -m unittest tests.test_mf_detail_routes.MfDetailRouteTests.test_toplist_detail_and_tracks_return_expected_shape tests.test_mf_detail_routes.MfDetailRouteTests.test_toplist_detail_returns_404_and_requires_token -v
Expected:
-
OK -
Step 5: Commit
git add src/music_server/routes/mf_catalog.py tests/test_mf_detail_routes.py
git commit -m "feat: expose musicfree toplist detail endpoints"
Task 3: 创建正式插件 music_server.js 的基础骨架
Files:
-
Create:
D:\source\MusicFree\keep-alive-master\Music_Free\music_server.js -
Create:
D:\source\MusicFree\keep-alive-master\Music_Free\music_server.test.cjs -
Step 1: Write the failing test
创建 D:\source\MusicFree\keep-alive-master\Music_Free\music_server.test.cjs:
"use strict";
const test = require("node:test");
const assert = require("node:assert/strict");
const plugin = require("./music_server");
test("music_server plugin exposes pure Music_Server metadata", () => {
assert.equal(plugin.platform, "Music_Server");
assert.deepEqual(plugin.supportedSearchType, ["music"]);
assert.deepEqual(
plugin.userVariables.map((item) => item.key),
["baseUrl", "accessToken"],
);
});
- Step 2: Run test to verify it fails
Run:
node --test D:\source\MusicFree\keep-alive-master\Music_Free\music_server.test.cjs
Expected:
-
Cannot find module './music_server' -
Step 3: Write minimal implementation
创建 D:\source\MusicFree\keep-alive-master\Music_Free\music_server.js:
"use strict";
let testClient = null;
let testConfig = null;
function normalizeBaseUrl(url) {
return String(url || "").replace(/\/+$/, "");
}
function readConfigValue(key) {
if (testConfig && Object.prototype.hasOwnProperty.call(testConfig, key)) {
return String(testConfig[key] || "").trim();
}
const item = plugin.userVariables.find((entry) => entry.key === key);
if (item && typeof item.value === "string") {
return item.value.trim();
}
return "";
}
function createDefaultClient() {
const axios = require("axios");
return axios.create({
baseURL: normalizeBaseUrl(readConfigValue("baseUrl")),
timeout: 10000,
headers: {
Authorization: `Bearer ${readConfigValue("accessToken")}`,
},
});
}
function getClient() {
if (testClient) {
return testClient;
}
return createDefaultClient();
}
const plugin = {
platform: "Music_Server",
version: "0.1.0",
author: "Codex",
appVersion: ">0.1.0-alpha.0",
srcUrl: "",
cacheControl: "no-cache",
primaryKey: ["id"],
userVariables: [
{ key: "baseUrl", name: "Base URL", value: "" },
{ key: "accessToken", name: "Access Token", value: "" },
],
supportedSearchType: ["music"],
__setHttpClientForTests(client) {
testClient = client;
},
__setConfigForTests(config) {
testConfig = config;
},
__clearTestState() {
testClient = null;
testConfig = null;
},
__getClientForTests() {
return getClient();
},
};
module.exports = plugin;
- Step 4: Run test to verify it passes
Run:
node --test D:\source\MusicFree\keep-alive-master\Music_Free\music_server.test.cjs
Expected:
-
pass 1 -
Step 5: Commit
git -C D:\source\MusicFree\keep-alive-master add Music_Free\music_server.js Music_Free\music_server.test.cjs
git -C D:\source\MusicFree\keep-alive-master commit -m "feat: scaffold music server plugin"
Task 4: 实现搜索、推荐、歌单详情和榜单详情方法
Files:
-
Modify:
D:\source\MusicFree\keep-alive-master\Music_Free\music_server.js -
Modify:
D:\source\MusicFree\keep-alive-master\Music_Free\music_server.test.cjs -
Step 1: Write the failing tests
在 D:\source\MusicFree\keep-alive-master\Music_Free\music_server.test.cjs 中追加这些测试:
test("search and recommend methods call Music_Server endpoints", async () => {
plugin.__clearTestState();
const calls = [];
plugin.__setHttpClientForTests({
get: async (path, options = {}) => {
calls.push({ path, params: options.params || null });
if (path === "/mf/v1/search/songs") {
return {
data: {
isEnd: false,
data: [
{
id: "catalogsync:song:3476",
platform: "catalogsync",
title: "Moonlight",
artist: "Artist A",
album: "Album A",
artwork: "https://img/song.jpg",
duration: 245,
},
],
},
};
}
if (path === "/mf/v1/recommend/tags") {
return {
data: {
pinned: [{ id: "all", title: "all" }],
data: [],
},
};
}
if (path === "/mf/v1/recommend/sheets") {
return {
data: {
isEnd: true,
data: [
{
id: "catalogsync:playlist:18165",
platform: "catalogsync",
title: "Sheet A",
coverImg: "https://img/p.jpg",
description: "desc",
worksNum: 3,
playCount: 99,
},
],
},
};
}
throw new Error(`unexpected GET ${path}`);
},
});
const searchResult = await plugin.search("moon", 1, "music");
const tagsResult = await plugin.getRecommendSheetTags();
const sheetsResult = await plugin.getRecommendSheetsByTag({ id: "all" }, 2);
assert.equal(searchResult.isEnd, false);
assert.equal(searchResult.data[0].id, "catalogsync:song:3476");
assert.equal(tagsResult.pinned[0].id, "all");
assert.equal(sheetsResult.isEnd, true);
assert.equal(sheetsResult.data[0].id, "catalogsync:playlist:18165");
assert.deepEqual(calls, [
{
path: "/mf/v1/search/songs",
params: { q: "moon", page: 1, page_size: 20 },
},
{
path: "/mf/v1/recommend/tags",
params: null,
},
{
path: "/mf/v1/recommend/sheets",
params: { tag: "all", page: 2, page_size: 60 },
},
]);
});
test("playlist detail and toplist detail methods map header plus tracks", async () => {
plugin.__clearTestState();
const calls = [];
plugin.__setHttpClientForTests({
get: async (path, options = {}) => {
calls.push({ path, params: options.params || null });
if (path === "/mf/v1/playlists/18165") {
return {
data: {
id: "catalogsync:playlist:18165",
platform: "catalogsync",
title: "Playlist One",
coverImg: "https://img/p1.jpg",
description: "desc1",
worksNum: 2,
playCount: 200,
},
};
}
if (path === "/mf/v1/playlists/18165/tracks") {
return {
data: {
isEnd: false,
musicList: [
{
id: "catalogsync:song:1",
platform: "catalogsync",
title: "Track 1",
artist: "Singer 1",
album: "Album 1",
artwork: "https://img/s1.jpg",
duration: 210,
},
],
},
};
}
if (path === "/mf/v1/toplists/kuwo_top_16") {
return {
data: {
id: "catalogsync:toplist:kuwo_top_16",
platform: "catalogsync",
title: "Toplist One",
coverImg: "https://img/t1.jpg",
description: "desc2",
worksNum: 2,
playCount: 999,
},
};
}
if (path === "/mf/v1/toplists/kuwo_top_16/tracks") {
return {
data: {
isEnd: true,
musicList: [
{
id: "catalogsync:song:2",
platform: "catalogsync",
title: "Top Song",
artist: "Singer 2",
album: "Album 2",
artwork: "https://img/s2.jpg",
duration: 220,
},
],
},
};
}
throw new Error(`unexpected GET ${path}`);
},
});
const playlistResult = await plugin.getMusicSheetInfo(
{ id: "catalogsync:playlist:18165" },
1,
);
const toplistResult = await plugin.getTopListDetail(
{ id: "catalogsync:toplist:kuwo_top_16" },
1,
);
assert.equal(playlistResult.sheetItem.title, "Playlist One");
assert.equal(playlistResult.musicList[0].id, "catalogsync:song:1");
assert.equal(toplistResult.topListItem.title, "Toplist One");
assert.equal(toplistResult.musicList[0].id, "catalogsync:song:2");
assert.deepEqual(calls, [
{
path: "/mf/v1/playlists/18165",
params: null,
},
{
path: "/mf/v1/playlists/18165/tracks",
params: { page: 1, page_size: 60 },
},
{
path: "/mf/v1/toplists/kuwo_top_16",
params: null,
},
{
path: "/mf/v1/toplists/kuwo_top_16/tracks",
params: { page: 1, page_size: 60 },
},
]);
});
- Step 2: Run test to verify it fails
Run:
node --test D:\source\MusicFree\keep-alive-master\Music_Free\music_server.test.cjs
Expected:
-
TypeErrorfor missingsearch,getRecommendSheetTags,getMusicSheetInfo, orgetTopListDetail -
Step 3: Write minimal implementation
把 D:\source\MusicFree\keep-alive-master\Music_Free\music_server.js 更新为:
"use strict";
let testClient = null;
let testConfig = null;
function normalizeBaseUrl(url) {
return String(url || "").replace(/\/+$/, "");
}
function parsePublicId(publicId) {
return String(publicId || "").split(":").pop();
}
function readConfigValue(key) {
if (testConfig && Object.prototype.hasOwnProperty.call(testConfig, key)) {
return String(testConfig[key] || "").trim();
}
const item = plugin.userVariables.find((entry) => entry.key === key);
if (item && typeof item.value === "string") {
return item.value.trim();
}
return "";
}
function createDefaultClient() {
const axios = require("axios");
return axios.create({
baseURL: normalizeBaseUrl(readConfigValue("baseUrl")),
timeout: 10000,
headers: {
Authorization: `Bearer ${readConfigValue("accessToken")}`,
},
});
}
function getClient() {
if (testClient) {
return testClient;
}
return createDefaultClient();
}
function mapSheetItem(item) {
return {
id: item.id,
platform: item.platform || "catalogsync",
title: item.title || "",
coverImg: item.coverImg || "",
description: item.description || "",
worksNum: item.worksNum || 0,
playCount: item.playCount || 0,
};
}
function mapMusicItem(item) {
return {
id: item.id,
platform: item.platform || "catalogsync",
title: item.title || "",
artist: item.artist || "",
album: item.album || "",
artwork: item.artwork || "",
duration: item.duration || 0,
};
}
const plugin = {
platform: "Music_Server",
version: "0.1.0",
author: "Codex",
appVersion: ">0.1.0-alpha.0",
srcUrl: "",
cacheControl: "no-cache",
primaryKey: ["id"],
userVariables: [
{ key: "baseUrl", name: "Base URL", value: "" },
{ key: "accessToken", name: "Access Token", value: "" },
],
supportedSearchType: ["music"],
__setHttpClientForTests(client) {
testClient = client;
},
__setConfigForTests(config) {
testConfig = config;
},
__clearTestState() {
testClient = null;
testConfig = null;
},
async search(query, page, type) {
if (type !== "music") {
return { isEnd: true, data: [] };
}
const response = await getClient().get("/mf/v1/search/songs", {
params: {
q: query,
page,
page_size: 20,
},
});
return {
isEnd: Boolean(response.data.isEnd),
data: (response.data.data || []).map(mapMusicItem),
};
},
async getRecommendSheetTags() {
const response = await getClient().get("/mf/v1/recommend/tags");
return response.data;
},
async getRecommendSheetsByTag(tag, page) {
const response = await getClient().get("/mf/v1/recommend/sheets", {
params: {
tag: (tag && tag.id) || "all",
page: page || 1,
page_size: 60,
},
});
return {
isEnd: Boolean(response.data.isEnd),
data: (response.data.data || []).map(mapSheetItem),
};
},
async getMusicSheetInfo(sheetItem, page) {
const playlistId = parsePublicId(sheetItem && sheetItem.id);
let resolvedSheetItem;
if ((page || 1) === 1) {
const detailResponse = await getClient().get(`/mf/v1/playlists/${playlistId}`);
resolvedSheetItem = mapSheetItem(detailResponse.data);
}
const tracksResponse = await getClient().get(`/mf/v1/playlists/${playlistId}/tracks`, {
params: {
page: page || 1,
page_size: 60,
},
});
return {
isEnd: Boolean(tracksResponse.data.isEnd),
sheetItem: resolvedSheetItem,
musicList: (tracksResponse.data.musicList || []).map(mapMusicItem),
};
},
async getTopLists() {
const response = await getClient().get("/mf/v1/toplists");
return response.data;
},
async getTopListDetail(topListItem, page) {
const toplistId = parsePublicId(topListItem && topListItem.id);
let resolvedTopListItem = topListItem;
if ((page || 1) === 1) {
const detailResponse = await getClient().get(`/mf/v1/toplists/${toplistId}`);
resolvedTopListItem = mapSheetItem(detailResponse.data);
}
const tracksResponse = await getClient().get(`/mf/v1/toplists/${toplistId}/tracks`, {
params: {
page: page || 1,
page_size: 60,
},
});
return {
isEnd: Boolean(tracksResponse.data.isEnd),
topListItem: resolvedTopListItem,
musicList: (tracksResponse.data.musicList || []).map(mapMusicItem),
};
},
};
module.exports = plugin;
- Step 4: Run test to verify it passes
Run:
node --test D:\source\MusicFree\keep-alive-master\Music_Free\music_server.test.cjs
Expected:
-
pass 3 -
Step 5: Commit
git -C D:\source\MusicFree\keep-alive-master add Music_Free\music_server.js Music_Free\music_server.test.cjs
git -C D:\source\MusicFree\keep-alive-master commit -m "feat: add pure music server plugin methods"
Task 5: 增加播放解析、相对流地址拼接、失败降级与兼容壳
Files:
-
Modify:
D:\source\MusicFree\keep-alive-master\Music_Free\music_server.js -
Modify:
D:\source\MusicFree\keep-alive-master\Music_Free\music_server.test.cjs -
Modify:
D:\source\MusicFree\keep-alive-master\Music_Free\netease_17000.js -
Step 1: Write the failing tests
在 D:\source\MusicFree\keep-alive-master\Music_Free\music_server.test.cjs 中追加这些测试:
test("getMediaSource joins relative stream urls and keeps absolute urls", async () => {
plugin.__clearTestState();
let callCount = 0;
plugin.__setConfigForTests({
baseUrl: "http://127.0.0.1:18081/",
accessToken: "dev-token",
});
plugin.__setHttpClientForTests({
post: async () => {
callCount += 1;
if (callCount === 1) {
return {
data: {
stream: {
url: "/mf/v1/media/stream/token-1",
headers: {},
},
selected_source: {
quality: "super",
},
},
};
}
return {
data: {
stream: {
url: "https://cdn.example/file.flac",
headers: { Range: "bytes=0-" },
},
selected_source: {
quality: "standard",
},
},
};
},
});
const relativeResult = await plugin.getMediaSource(
{ id: "catalogsync:song:3476" },
"super",
);
const absoluteResult = await plugin.getMediaSource(
{ id: "catalogsync:song:3476" },
"standard",
);
assert.equal(relativeResult.url, "http://127.0.0.1:18081/mf/v1/media/stream/token-1");
assert.equal(relativeResult.quality, "super");
assert.equal(absoluteResult.url, "https://cdn.example/file.flac");
assert.deepEqual(absoluteResult.headers, { Range: "bytes=0-" });
});
test("browse methods fail closed and media resolve returns null on request error", async () => {
plugin.__clearTestState();
plugin.__setConfigForTests({
baseUrl: "http://127.0.0.1:18081",
accessToken: "dev-token",
});
plugin.__setHttpClientForTests({
get: async () => {
throw new Error("boom");
},
post: async () => {
throw new Error("boom");
},
});
const searchResult = await plugin.search("moon", 1, "music");
const sheetsResult = await plugin.getRecommendSheetsByTag({ id: "all" }, 1);
const playlistResult = await plugin.getMusicSheetInfo(
{ id: "catalogsync:playlist:18165" },
1,
);
const toplistResult = await plugin.getTopListDetail(
{ id: "catalogsync:toplist:kuwo_top_16" },
1,
);
const mediaResult = await plugin.getMediaSource(
{ id: "catalogsync:song:3476" },
"super",
);
assert.deepEqual(searchResult, { isEnd: true, data: [] });
assert.deepEqual(sheetsResult, { isEnd: true, data: [] });
assert.deepEqual(playlistResult, { isEnd: true, sheetItem: undefined, musicList: [] });
assert.deepEqual(
toplistResult,
{
isEnd: true,
topListItem: undefined,
musicList: [],
},
);
assert.equal(mediaResult, null);
});
test("netease_17000 compatibility shell re-exports the new plugin", () => {
const compatPlugin = require("./netease_17000");
assert.strictEqual(compatPlugin, plugin);
});
- Step 2: Run test to verify it fails
Run:
node --test D:\source\MusicFree\keep-alive-master\Music_Free\music_server.test.cjs
Expected:
-
relative URL assertion fails because
getMediaSourcedoes not exist or does not join base URL -
Step 3: Write minimal implementation
把 D:\source\MusicFree\keep-alive-master\Music_Free\music_server.js 更新为:
"use strict";
let testClient = null;
let testConfig = null;
function normalizeBaseUrl(url) {
return String(url || "").replace(/\/+$/, "");
}
function parsePublicId(publicId) {
return String(publicId || "").split(":").pop();
}
function readConfigValue(key) {
if (testConfig && Object.prototype.hasOwnProperty.call(testConfig, key)) {
return String(testConfig[key] || "").trim();
}
const item = plugin.userVariables.find((entry) => entry.key === key);
if (item && typeof item.value === "string") {
return item.value.trim();
}
return "";
}
function getRuntimeConfig() {
return {
baseUrl: normalizeBaseUrl(readConfigValue("baseUrl")),
accessToken: readConfigValue("accessToken"),
};
}
function warn(label, error) {
console.warn(`[Music_Server plugin] ${label}: ${error && error.message ? error.message : error}`);
}
function createDefaultClient() {
const config = getRuntimeConfig();
if (!config.baseUrl) {
throw new Error("Music_Server plugin requires baseUrl");
}
if (!config.accessToken) {
throw new Error("Music_Server plugin requires accessToken");
}
const axios = require("axios");
return axios.create({
baseURL: config.baseUrl,
timeout: 10000,
headers: {
Authorization: `Bearer ${config.accessToken}`,
},
});
}
function getClient() {
if (testClient) {
return testClient;
}
return createDefaultClient();
}
function mapSheetItem(item) {
return {
id: item.id,
platform: item.platform || "catalogsync",
title: item.title || "",
coverImg: item.coverImg || "",
description: item.description || "",
worksNum: item.worksNum || 0,
playCount: item.playCount || 0,
};
}
function mapMusicItem(item) {
return {
id: item.id,
platform: item.platform || "catalogsync",
title: item.title || "",
artist: item.artist || "",
album: item.album || "",
artwork: item.artwork || "",
duration: item.duration || 0,
};
}
function joinStreamUrl(baseUrl, url) {
const raw = String(url || "").trim();
if (!raw) {
return "";
}
if (/^https?:\/\//i.test(raw)) {
return raw;
}
const prefix = normalizeBaseUrl(baseUrl);
if (!prefix) {
return raw;
}
return raw.startsWith("/") ? `${prefix}${raw}` : `${prefix}/${raw}`;
}
const plugin = {
platform: "Music_Server",
version: "0.1.0",
author: "Codex",
appVersion: ">0.1.0-alpha.0",
srcUrl: "",
cacheControl: "no-cache",
primaryKey: ["id"],
userVariables: [
{ key: "baseUrl", name: "Base URL", value: "" },
{ key: "accessToken", name: "Access Token", value: "" },
],
supportedSearchType: ["music"],
__setHttpClientForTests(client) {
testClient = client;
},
__setConfigForTests(config) {
testConfig = config;
},
__clearTestState() {
testClient = null;
testConfig = null;
},
async search(query, page, type) {
if (type !== "music") {
return { isEnd: true, data: [] };
}
const client = getClient();
try {
const response = await client.get("/mf/v1/search/songs", {
params: {
q: query,
page,
page_size: 20,
},
});
return {
isEnd: Boolean(response.data.isEnd),
data: (response.data.data || []).map(mapMusicItem),
};
} catch (error) {
warn("search", error);
return { isEnd: true, data: [] };
}
},
async getRecommendSheetTags() {
const client = getClient();
try {
const response = await client.get("/mf/v1/recommend/tags");
return response.data;
} catch (error) {
warn("getRecommendSheetTags", error);
return { pinned: [], data: [] };
}
},
async getRecommendSheetsByTag(tag, page) {
const client = getClient();
try {
const response = await client.get("/mf/v1/recommend/sheets", {
params: {
tag: (tag && tag.id) || "all",
page: page || 1,
page_size: 60,
},
});
return {
isEnd: Boolean(response.data.isEnd),
data: (response.data.data || []).map(mapSheetItem),
};
} catch (error) {
warn("getRecommendSheetsByTag", error);
return { isEnd: true, data: [] };
}
},
async getMusicSheetInfo(sheetItem, page) {
const client = getClient();
const playlistId = parsePublicId(sheetItem && sheetItem.id);
let resolvedSheetItem;
if ((page || 1) === 1) {
try {
const detailResponse = await client.get(`/mf/v1/playlists/${playlistId}`);
resolvedSheetItem = mapSheetItem(detailResponse.data);
} catch (error) {
warn("getMusicSheetInfo.detail", error);
}
}
try {
const tracksResponse = await client.get(`/mf/v1/playlists/${playlistId}/tracks`, {
params: {
page: page || 1,
page_size: 60,
},
});
return {
isEnd: Boolean(tracksResponse.data.isEnd),
sheetItem: resolvedSheetItem,
musicList: (tracksResponse.data.musicList || []).map(mapMusicItem),
};
} catch (error) {
warn("getMusicSheetInfo.tracks", error);
return {
isEnd: true,
sheetItem: resolvedSheetItem,
musicList: [],
};
}
},
async getTopLists() {
const client = getClient();
try {
const response = await client.get("/mf/v1/toplists");
return response.data;
} catch (error) {
warn("getTopLists", error);
return [];
}
},
async getTopListDetail(topListItem, page) {
const client = getClient();
const toplistId = parsePublicId(topListItem && topListItem.id);
let resolvedTopListItem;
if ((page || 1) === 1) {
try {
const detailResponse = await client.get(`/mf/v1/toplists/${toplistId}`);
resolvedTopListItem = mapSheetItem(detailResponse.data);
} catch (error) {
warn("getTopListDetail.detail", error);
}
}
try {
const tracksResponse = await client.get(`/mf/v1/toplists/${toplistId}/tracks`, {
params: {
page: page || 1,
page_size: 60,
},
});
return {
isEnd: Boolean(tracksResponse.data.isEnd),
topListItem: resolvedTopListItem,
musicList: (tracksResponse.data.musicList || []).map(mapMusicItem),
};
} catch (error) {
warn("getTopListDetail.tracks", error);
return {
isEnd: true,
topListItem: resolvedTopListItem,
musicList: [],
};
}
},
async getMediaSource(musicItem, quality) {
const client = getClient();
const config = getRuntimeConfig();
try {
const response = await client.post("/mf/v1/media/resolve", {
song_id: musicItem.id,
quality,
});
const rawUrl =
response.data &&
response.data.stream &&
response.data.stream.url;
if (!rawUrl) {
return null;
}
return {
url: joinStreamUrl(config.baseUrl, rawUrl),
headers:
(response.data && response.data.stream && response.data.stream.headers) || {},
quality:
(response.data &&
response.data.selected_source &&
response.data.selected_source.quality) ||
quality,
};
} catch (error) {
warn("getMediaSource", error);
return null;
}
},
};
module.exports = plugin;
把 D:\source\MusicFree\keep-alive-master\Music_Free\netease_17000.js 替换为:
"use strict";
module.exports = require("./music_server");
- Step 4: Run test to verify it passes
Run:
node --test D:\source\MusicFree\keep-alive-master\Music_Free\music_server.test.cjs
Expected:
-
all plugin tests pass
-
Step 5: Commit
git -C D:\source\MusicFree\keep-alive-master add Music_Free\music_server.js Music_Free\music_server.test.cjs Music_Free\netease_17000.js
git -C D:\source\MusicFree\keep-alive-master commit -m "feat: switch netease plugin to music server backend"
Task 6: 运行聚焦回归测试
Files:
-
Test only
-
Step 1: Run Music_Server reader and route regressions
Run:
python -m unittest tests.test_catalog_reader tests.test_mf_catalog_routes tests.test_mf_detail_routes -v
Expected:
-
all related
Music_Servertests pass -
Step 2: Run MusicFree plugin tests
Run:
node --test D:\source\MusicFree\keep-alive-master\Music_Free\music_server.test.cjs
Expected:
-
all plugin tests pass
-
Step 3: Manual smoke check after implementation
Verify in MusicFree:
1. 安装或覆盖 `music_server.js`
2. 配置 `baseUrl` 和 `accessToken`
3. 搜索歌曲,确认结果来自 `/mf/v1/search/songs`
4. 打开推荐歌单并查看歌曲列表
5. 打开榜单并查看歌曲列表
6. 播放任意歌曲,确认请求来自 `/mf/v1/media/resolve`