598 lines
18 KiB
Python
598 lines
18 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
|
|
TOPLIST_PARSE_STRATEGIES = {"netease_toplist", "qq_toplist", "kuwo_toplist"}
|
|
TOPLIST_GROUP_NAMES = {
|
|
"qq": "QQ音乐",
|
|
"netease": "网易云",
|
|
"kuwo": "酷我",
|
|
}
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Build catalog_read.db from catalogsync.db for Music_Server."
|
|
)
|
|
parser.add_argument("--source-db", required=True, help="Path to catalogsync.db")
|
|
parser.add_argument("--target-db", required=True, help="Path to catalog_read.db")
|
|
return parser.parse_args()
|
|
|
|
|
|
def connect(path: str) -> sqlite3.Connection:
|
|
conn = sqlite3.connect(path)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def create_schema(conn: sqlite3.Connection) -> None:
|
|
conn.executescript(
|
|
"""
|
|
create table catalog_playlists (
|
|
playlist_id integer primary key,
|
|
platform text not null,
|
|
remote_playlist_id text not null,
|
|
name text not null,
|
|
description text,
|
|
cover_url text,
|
|
play_count integer not null,
|
|
song_count integer not null,
|
|
playable_song_count integer not null
|
|
);
|
|
|
|
create table catalog_tracks (
|
|
song_id integer primary key,
|
|
platform text not null,
|
|
remote_song_id text not null,
|
|
name text not null,
|
|
singers text,
|
|
album text,
|
|
cover_url text,
|
|
duration_ms integer,
|
|
metadata_json text
|
|
);
|
|
|
|
create table catalog_playlist_tracks (
|
|
playlist_id integer not null,
|
|
song_id integer not null,
|
|
position integer not null
|
|
);
|
|
|
|
create table catalog_toplists (
|
|
toplist_id text primary key,
|
|
platform text not null,
|
|
name text not null,
|
|
description text,
|
|
cover_url text,
|
|
play_count integer not null,
|
|
song_count integer not null,
|
|
playable_song_count integer not null,
|
|
group_name text not null
|
|
);
|
|
|
|
create table catalog_toplist_tracks (
|
|
toplist_id text not null,
|
|
song_id integer not null,
|
|
position integer not null
|
|
);
|
|
|
|
create table catalog_track_files (
|
|
song_id integer not null,
|
|
quality_label text,
|
|
ext text,
|
|
file_size_bytes integer,
|
|
backend_type text,
|
|
backend_name text,
|
|
locator text,
|
|
public_url text,
|
|
status text,
|
|
is_primary integer
|
|
);
|
|
|
|
create table catalog_artists (
|
|
artist_id integer primary key,
|
|
artist_key text not null unique,
|
|
platform text not null,
|
|
remote_artist_id text,
|
|
name text not null,
|
|
normalized_name text not null,
|
|
avatar_url text,
|
|
description text,
|
|
playable_song_count integer not null
|
|
);
|
|
|
|
create table catalog_artist_tracks (
|
|
artist_id integer not null,
|
|
song_id integer not null,
|
|
position integer not null
|
|
);
|
|
|
|
create index idx_catalog_playlist_tracks_playlist on catalog_playlist_tracks (playlist_id, position);
|
|
create index idx_catalog_toplist_tracks_toplist on catalog_toplist_tracks (toplist_id, position);
|
|
create index idx_catalog_track_files_song on catalog_track_files (song_id, status, is_primary);
|
|
create index idx_catalog_artist_tracks_artist on catalog_artist_tracks (artist_id, position);
|
|
"""
|
|
)
|
|
|
|
|
|
def _extract_song_cover(metadata_json: str | None) -> str | None:
|
|
if not metadata_json:
|
|
return None
|
|
try:
|
|
payload = json.loads(metadata_json)
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
snapshot = payload.get("snapshot") or {}
|
|
raw_data = snapshot.get("raw_data") or {}
|
|
if isinstance(snapshot.get("cover_url"), str) and snapshot.get("cover_url"):
|
|
return snapshot["cover_url"]
|
|
|
|
search = raw_data.get("search") or {}
|
|
album = search.get("al") or {}
|
|
if isinstance(album.get("picUrl"), str) and album.get("picUrl"):
|
|
return album["picUrl"]
|
|
return None
|
|
|
|
|
|
def _extract_duration_ms(duration_seconds: int | None, metadata_json: str | None) -> int | None:
|
|
if duration_seconds is not None:
|
|
return int(duration_seconds) * 1000
|
|
if not metadata_json:
|
|
return None
|
|
try:
|
|
payload = json.loads(metadata_json)
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
snapshot = payload.get("snapshot") or {}
|
|
raw_data = snapshot.get("raw_data") or {}
|
|
search = raw_data.get("search") or {}
|
|
duration_ms = search.get("dt")
|
|
if duration_ms is None:
|
|
duration_s = snapshot.get("duration_s")
|
|
return int(duration_s) * 1000 if duration_s is not None else None
|
|
return int(duration_ms)
|
|
|
|
|
|
def _extract_artist_avatar(metadata_json: str | None) -> str | None:
|
|
if not metadata_json:
|
|
return None
|
|
try:
|
|
payload = json.loads(metadata_json)
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
avatar = payload.get("avatar") or payload.get("avatar_url") or payload.get("cover_url")
|
|
return avatar if isinstance(avatar, str) and avatar else None
|
|
|
|
|
|
def _extract_artist_description(metadata_json: str | None) -> str | None:
|
|
if not metadata_json:
|
|
return None
|
|
try:
|
|
payload = json.loads(metadata_json)
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
description = payload.get("description") or payload.get("desc")
|
|
return description if isinstance(description, str) and description else None
|
|
|
|
|
|
def export_playlists(source: sqlite3.Connection, target: sqlite3.Connection) -> None:
|
|
rows = source.execute(
|
|
"""
|
|
select
|
|
p.id,
|
|
p.platform,
|
|
p.remote_playlist_id,
|
|
p.name,
|
|
p.creator_name,
|
|
p.cover_url,
|
|
p.play_count,
|
|
coalesce(ps.song_count, p.collected_song_count, 0) as song_count,
|
|
coalesce(pps.playable_song_count, 0) as playable_song_count
|
|
from playlists p
|
|
left join (
|
|
select playlist_id, count(*) as song_count
|
|
from playlist_songs
|
|
group by playlist_id
|
|
) ps on ps.playlist_id = p.id
|
|
left join (
|
|
select
|
|
playlist_song_keys.playlist_id,
|
|
count(*) as playable_song_count
|
|
from (
|
|
select distinct playlist_id, song_id
|
|
from playlist_songs
|
|
) playlist_song_keys
|
|
where exists (
|
|
select 1
|
|
from file_assets fa
|
|
join file_locations fl on fl.file_asset_id = fa.id
|
|
where fa.song_id = playlist_song_keys.song_id
|
|
and fl.status = 'active'
|
|
)
|
|
group by playlist_song_keys.playlist_id
|
|
) pps on pps.playlist_id = p.id
|
|
where p.parse_strategy not in ({})
|
|
""".format(",".join("?" for _ in TOPLIST_PARSE_STRATEGIES)),
|
|
tuple(sorted(TOPLIST_PARSE_STRATEGIES)),
|
|
).fetchall()
|
|
target.executemany(
|
|
"""
|
|
insert into catalog_playlists (
|
|
playlist_id,
|
|
platform,
|
|
remote_playlist_id,
|
|
name,
|
|
description,
|
|
cover_url,
|
|
play_count,
|
|
song_count,
|
|
playable_song_count
|
|
) values (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
[
|
|
(
|
|
int(row["id"]),
|
|
row["platform"],
|
|
row["remote_playlist_id"],
|
|
row["name"],
|
|
row["creator_name"],
|
|
row["cover_url"],
|
|
int(row["play_count"] or 0),
|
|
int(row["song_count"] or 0),
|
|
int(row["playable_song_count"] or 0),
|
|
)
|
|
for row in rows
|
|
],
|
|
)
|
|
|
|
|
|
def export_tracks(source: sqlite3.Connection, target: sqlite3.Connection) -> None:
|
|
rows = source.execute(
|
|
"""
|
|
select
|
|
id,
|
|
platform,
|
|
remote_song_id,
|
|
name,
|
|
singers,
|
|
album,
|
|
duration_seconds,
|
|
metadata_json
|
|
from songs
|
|
"""
|
|
).fetchall()
|
|
target.executemany(
|
|
"""
|
|
insert into catalog_tracks (
|
|
song_id,
|
|
platform,
|
|
remote_song_id,
|
|
name,
|
|
singers,
|
|
album,
|
|
cover_url,
|
|
duration_ms,
|
|
metadata_json
|
|
) values (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
[
|
|
(
|
|
int(row["id"]),
|
|
row["platform"],
|
|
row["remote_song_id"],
|
|
row["name"],
|
|
row["singers"],
|
|
row["album"],
|
|
_extract_song_cover(row["metadata_json"]),
|
|
_extract_duration_ms(row["duration_seconds"], row["metadata_json"]),
|
|
row["metadata_json"],
|
|
)
|
|
for row in rows
|
|
],
|
|
)
|
|
|
|
|
|
def export_artists(source: sqlite3.Connection, target: sqlite3.Connection) -> None:
|
|
exportable_rows = source.execute(
|
|
"""
|
|
select distinct
|
|
a.id as artist_id,
|
|
a.artist_key,
|
|
a.platform,
|
|
a.remote_artist_id,
|
|
a.name,
|
|
a.normalized_name,
|
|
a.metadata_json,
|
|
songs.id as song_id,
|
|
songs.name as song_name
|
|
from artists a
|
|
join artist_songs s on s.artist_id = a.id
|
|
join songs songs on songs.id = s.song_id
|
|
where exists (
|
|
select 1
|
|
from file_assets fa
|
|
join file_locations fl on fl.file_asset_id = fa.id
|
|
where fa.song_id = s.song_id
|
|
and fl.status = 'active'
|
|
)
|
|
order by a.id asc, lower(songs.name) asc, songs.id asc
|
|
"""
|
|
).fetchall()
|
|
|
|
artist_rows: dict[int, sqlite3.Row] = {}
|
|
playable_song_counts: dict[int, int] = {}
|
|
positions: dict[int, int] = {}
|
|
track_payload: list[tuple[int, int, int]] = []
|
|
for row in exportable_rows:
|
|
artist_id = int(row["artist_id"])
|
|
if artist_id not in artist_rows:
|
|
artist_rows[artist_id] = row
|
|
playable_song_counts[artist_id] = playable_song_counts.get(artist_id, 0) + 1
|
|
positions[artist_id] = positions.get(artist_id, 0) + 1
|
|
track_payload.append((artist_id, int(row["song_id"]), positions[artist_id]))
|
|
|
|
target.executemany(
|
|
"""
|
|
insert into catalog_artists (
|
|
artist_id,
|
|
artist_key,
|
|
platform,
|
|
remote_artist_id,
|
|
name,
|
|
normalized_name,
|
|
avatar_url,
|
|
description,
|
|
playable_song_count
|
|
) values (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
[
|
|
(
|
|
artist_id,
|
|
artist_rows[artist_id]["artist_key"],
|
|
artist_rows[artist_id]["platform"],
|
|
artist_rows[artist_id]["remote_artist_id"],
|
|
artist_rows[artist_id]["name"],
|
|
artist_rows[artist_id]["normalized_name"],
|
|
_extract_artist_avatar(artist_rows[artist_id]["metadata_json"]),
|
|
_extract_artist_description(artist_rows[artist_id]["metadata_json"]),
|
|
playable_song_counts[artist_id],
|
|
)
|
|
for artist_id in sorted(artist_rows)
|
|
],
|
|
)
|
|
target.executemany(
|
|
"""
|
|
insert into catalog_artist_tracks (artist_id, song_id, position)
|
|
values (?, ?, ?)
|
|
""",
|
|
track_payload,
|
|
)
|
|
|
|
|
|
def export_playlist_tracks(source: sqlite3.Connection, target: sqlite3.Connection) -> None:
|
|
rows = source.execute(
|
|
"""
|
|
select ps.playlist_id, ps.song_id, coalesce(ps.position, 0) as position
|
|
from playlist_songs ps
|
|
join playlists p on p.id = ps.playlist_id
|
|
where p.parse_strategy not in ({})
|
|
""".format(",".join("?" for _ in TOPLIST_PARSE_STRATEGIES)),
|
|
tuple(sorted(TOPLIST_PARSE_STRATEGIES)),
|
|
).fetchall()
|
|
target.executemany(
|
|
"""
|
|
insert into catalog_playlist_tracks (playlist_id, song_id, position)
|
|
values (?, ?, ?)
|
|
""",
|
|
[(int(row["playlist_id"]), int(row["song_id"]), int(row["position"])) for row in rows],
|
|
)
|
|
|
|
|
|
def export_toplists(source: sqlite3.Connection, target: sqlite3.Connection) -> None:
|
|
toplist_rows = source.execute(
|
|
"""
|
|
select
|
|
p.id,
|
|
p.platform,
|
|
p.remote_playlist_id,
|
|
p.name,
|
|
p.creator_name,
|
|
p.cover_url,
|
|
p.play_count,
|
|
coalesce(ps.song_count, p.collected_song_count, 0) as song_count,
|
|
coalesce(pps.playable_song_count, 0) as playable_song_count,
|
|
p.parse_strategy
|
|
from playlists p
|
|
left join (
|
|
select playlist_id, count(*) as song_count
|
|
from playlist_songs
|
|
group by playlist_id
|
|
) ps on ps.playlist_id = p.id
|
|
left join (
|
|
select
|
|
playlist_song_keys.playlist_id,
|
|
count(*) as playable_song_count
|
|
from (
|
|
select distinct playlist_id, song_id
|
|
from playlist_songs
|
|
) playlist_song_keys
|
|
where exists (
|
|
select 1
|
|
from file_assets fa
|
|
join file_locations fl on fl.file_asset_id = fa.id
|
|
where fa.song_id = playlist_song_keys.song_id
|
|
and fl.status = 'active'
|
|
)
|
|
group by playlist_song_keys.playlist_id
|
|
) pps on pps.playlist_id = p.id
|
|
where p.parse_strategy in ({})
|
|
""".format(",".join("?" for _ in TOPLIST_PARSE_STRATEGIES)),
|
|
tuple(sorted(TOPLIST_PARSE_STRATEGIES)),
|
|
).fetchall()
|
|
|
|
target.executemany(
|
|
"""
|
|
insert into catalog_toplists (
|
|
toplist_id,
|
|
platform,
|
|
name,
|
|
description,
|
|
cover_url,
|
|
play_count,
|
|
song_count,
|
|
playable_song_count,
|
|
group_name
|
|
) values (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
[
|
|
(
|
|
f"{row['platform']}_top_{row['remote_playlist_id']}",
|
|
row["platform"],
|
|
row["name"],
|
|
row["creator_name"],
|
|
row["cover_url"],
|
|
int(row["play_count"] or 0),
|
|
int(row["song_count"] or 0),
|
|
int(row["playable_song_count"] or 0),
|
|
TOPLIST_GROUP_NAMES.get(row["platform"], row["platform"]),
|
|
)
|
|
for row in toplist_rows
|
|
],
|
|
)
|
|
|
|
target.executemany(
|
|
"""
|
|
insert into catalog_toplist_tracks (toplist_id, song_id, position)
|
|
values (?, ?, ?)
|
|
""",
|
|
[
|
|
(
|
|
f"{row['platform']}_top_{row['remote_playlist_id']}",
|
|
int(track["song_id"]),
|
|
int(track["position"] or 0),
|
|
)
|
|
for row in toplist_rows
|
|
for track in source.execute(
|
|
"""
|
|
select song_id, position
|
|
from playlist_songs
|
|
where playlist_id = ?
|
|
order by position asc
|
|
""",
|
|
(row["id"],),
|
|
).fetchall()
|
|
],
|
|
)
|
|
|
|
|
|
def export_track_files(source: sqlite3.Connection, target: sqlite3.Connection) -> None:
|
|
rows = source.execute(
|
|
"""
|
|
select
|
|
fa.song_id,
|
|
fa.quality_label,
|
|
fa.ext,
|
|
fa.file_size_bytes,
|
|
sb.backend_type,
|
|
sb.name as backend_name,
|
|
fl.locator,
|
|
coalesce(fl.public_url, fl.download_url) as public_url,
|
|
fl.status,
|
|
fl.is_primary
|
|
from file_locations fl
|
|
join file_assets fa on fa.id = fl.file_asset_id
|
|
join storage_backends sb on sb.id = fl.backend_id
|
|
"""
|
|
).fetchall()
|
|
target.executemany(
|
|
"""
|
|
insert into catalog_track_files (
|
|
song_id,
|
|
quality_label,
|
|
ext,
|
|
file_size_bytes,
|
|
backend_type,
|
|
backend_name,
|
|
locator,
|
|
public_url,
|
|
status,
|
|
is_primary
|
|
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
[
|
|
(
|
|
int(row["song_id"]),
|
|
row["quality_label"],
|
|
row["ext"],
|
|
row["file_size_bytes"],
|
|
row["backend_type"],
|
|
row["backend_name"],
|
|
row["locator"],
|
|
row["public_url"],
|
|
row["status"],
|
|
int(row["is_primary"] or 0),
|
|
)
|
|
for row in rows
|
|
],
|
|
)
|
|
|
|
|
|
def build_catalog_read(source_db: str, target_db: str) -> None:
|
|
source_path = Path(source_db).resolve()
|
|
target_path = Path(target_db).resolve()
|
|
target_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
fd, temp_path_str = tempfile.mkstemp(
|
|
prefix=target_path.stem + ".",
|
|
suffix=".tmp",
|
|
dir=str(target_path.parent),
|
|
)
|
|
os.close(fd)
|
|
temp_path = Path(temp_path_str)
|
|
source: sqlite3.Connection | None = None
|
|
target: sqlite3.Connection | None = None
|
|
|
|
try:
|
|
source = connect(str(source_path))
|
|
target = connect(str(temp_path))
|
|
create_schema(target)
|
|
export_playlists(source, target)
|
|
export_tracks(source, target)
|
|
export_artists(source, target)
|
|
export_playlist_tracks(source, target)
|
|
export_toplists(source, target)
|
|
export_track_files(source, target)
|
|
target.commit()
|
|
source.close()
|
|
source = None
|
|
target.close()
|
|
target = None
|
|
os.replace(temp_path, target_path)
|
|
finally:
|
|
if source is not None:
|
|
source.close()
|
|
if target is not None:
|
|
target.close()
|
|
if temp_path.exists():
|
|
temp_path.unlink()
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
build_catalog_read(source_db=args.source_db, target_db=args.target_db)
|
|
print(f"catalog_read.db written to {args.target_db}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|