from __future__ import annotations import argparse import json import os import sqlite3 import tempfile from pathlib import Path TOPLIST_PARSE_STRATEGIES = {"netease_toplist", "qq_toplist", "kuwo_toplist"} TOPLIST_GROUP_NAMES = { "qq": "QQ音乐", "netease": "网易云", "kuwo": "酷我", } def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Build catalog_read.db from catalogsync.db for Music_Server." ) parser.add_argument("--source-db", required=True, help="Path to catalogsync.db") parser.add_argument("--target-db", required=True, help="Path to catalog_read.db") return parser.parse_args() def connect(path: str) -> sqlite3.Connection: conn = sqlite3.connect(path) conn.row_factory = sqlite3.Row return conn def create_schema(conn: sqlite3.Connection) -> None: conn.executescript( """ create table catalog_playlists ( playlist_id integer primary key, platform text not null, remote_playlist_id text not null, name text not null, description text, cover_url text, play_count integer not null, song_count integer not null, playable_song_count integer not null ); create table catalog_tracks ( song_id integer primary key, platform text not null, remote_song_id text not null, name text not null, singers text, album text, cover_url text, duration_ms integer, metadata_json text ); create table catalog_playlist_tracks ( playlist_id integer not null, song_id integer not null, position integer not null ); create table catalog_toplists ( toplist_id text primary key, platform text not null, name text not null, description text, cover_url text, play_count integer not null, song_count integer not null, playable_song_count integer not null, group_name text not null ); create table catalog_toplist_tracks ( toplist_id text not null, song_id integer not null, position integer not null ); create table catalog_track_files ( song_id integer not null, quality_label text, ext text, file_size_bytes integer, backend_type text, backend_name text, locator text, public_url text, status text, is_primary integer ); create table catalog_artists ( artist_id integer primary key, artist_key text not null unique, platform text not null, remote_artist_id text, name text not null, normalized_name text not null, avatar_url text, description text, playable_song_count integer not null ); create table catalog_artist_tracks ( artist_id integer not null, song_id integer not null, position integer not null ); create index idx_catalog_playlist_tracks_playlist on catalog_playlist_tracks (playlist_id, position); create index idx_catalog_toplist_tracks_toplist on catalog_toplist_tracks (toplist_id, position); create index idx_catalog_track_files_song on catalog_track_files (song_id, status, is_primary); create index idx_catalog_artist_tracks_artist on catalog_artist_tracks (artist_id, position); """ ) def _extract_song_cover(metadata_json: str | None) -> str | None: if not metadata_json: return None try: payload = json.loads(metadata_json) except json.JSONDecodeError: return None snapshot = payload.get("snapshot") or {} raw_data = snapshot.get("raw_data") or {} if isinstance(snapshot.get("cover_url"), str) and snapshot.get("cover_url"): return snapshot["cover_url"] search = raw_data.get("search") or {} album = search.get("al") or {} if isinstance(album.get("picUrl"), str) and album.get("picUrl"): return album["picUrl"] return None def _extract_duration_ms(duration_seconds: int | None, metadata_json: str | None) -> int | None: if duration_seconds is not None: return int(duration_seconds) * 1000 if not metadata_json: return None try: payload = json.loads(metadata_json) except json.JSONDecodeError: return None snapshot = payload.get("snapshot") or {} raw_data = snapshot.get("raw_data") or {} search = raw_data.get("search") or {} duration_ms = search.get("dt") if duration_ms is None: duration_s = snapshot.get("duration_s") return int(duration_s) * 1000 if duration_s is not None else None return int(duration_ms) def _extract_artist_avatar(metadata_json: str | None) -> str | None: if not metadata_json: return None try: payload = json.loads(metadata_json) except json.JSONDecodeError: return None avatar = payload.get("avatar") or payload.get("avatar_url") or payload.get("cover_url") return avatar if isinstance(avatar, str) and avatar else None def _extract_artist_description(metadata_json: str | None) -> str | None: if not metadata_json: return None try: payload = json.loads(metadata_json) except json.JSONDecodeError: return None description = payload.get("description") or payload.get("desc") return description if isinstance(description, str) and description else None def export_playlists(source: sqlite3.Connection, target: sqlite3.Connection) -> None: rows = source.execute( """ select p.id, p.platform, p.remote_playlist_id, p.name, p.creator_name, p.cover_url, p.play_count, coalesce(ps.song_count, p.collected_song_count, 0) as song_count, coalesce(pps.playable_song_count, 0) as playable_song_count from playlists p left join ( select playlist_id, count(*) as song_count from playlist_songs group by playlist_id ) ps on ps.playlist_id = p.id left join ( select playlist_song_keys.playlist_id, count(*) as playable_song_count from ( select distinct playlist_id, song_id from playlist_songs ) playlist_song_keys where exists ( select 1 from file_assets fa join file_locations fl on fl.file_asset_id = fa.id where fa.song_id = playlist_song_keys.song_id and fl.status = 'active' ) group by playlist_song_keys.playlist_id ) pps on pps.playlist_id = p.id where p.parse_strategy not in ({}) """.format(",".join("?" for _ in TOPLIST_PARSE_STRATEGIES)), tuple(sorted(TOPLIST_PARSE_STRATEGIES)), ).fetchall() target.executemany( """ insert into catalog_playlists ( playlist_id, platform, remote_playlist_id, name, description, cover_url, play_count, song_count, playable_song_count ) values (?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ ( int(row["id"]), row["platform"], row["remote_playlist_id"], row["name"], row["creator_name"], row["cover_url"], int(row["play_count"] or 0), int(row["song_count"] or 0), int(row["playable_song_count"] or 0), ) for row in rows ], ) def export_tracks(source: sqlite3.Connection, target: sqlite3.Connection) -> None: rows = source.execute( """ select id, platform, remote_song_id, name, singers, album, duration_seconds, metadata_json from songs """ ).fetchall() target.executemany( """ insert into catalog_tracks ( song_id, platform, remote_song_id, name, singers, album, cover_url, duration_ms, metadata_json ) values (?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ ( int(row["id"]), row["platform"], row["remote_song_id"], row["name"], row["singers"], row["album"], _extract_song_cover(row["metadata_json"]), _extract_duration_ms(row["duration_seconds"], row["metadata_json"]), row["metadata_json"], ) for row in rows ], ) def export_artists(source: sqlite3.Connection, target: sqlite3.Connection) -> None: exportable_rows = source.execute( """ select distinct a.id as artist_id, a.artist_key, a.platform, a.remote_artist_id, a.name, a.normalized_name, a.metadata_json, songs.id as song_id, songs.name as song_name from artists a join artist_songs s on s.artist_id = a.id join songs songs on songs.id = s.song_id where exists ( select 1 from file_assets fa join file_locations fl on fl.file_asset_id = fa.id where fa.song_id = s.song_id and fl.status = 'active' ) order by a.id asc, lower(songs.name) asc, songs.id asc """ ).fetchall() artist_rows: dict[int, sqlite3.Row] = {} playable_song_counts: dict[int, int] = {} positions: dict[int, int] = {} track_payload: list[tuple[int, int, int]] = [] for row in exportable_rows: artist_id = int(row["artist_id"]) if artist_id not in artist_rows: artist_rows[artist_id] = row playable_song_counts[artist_id] = playable_song_counts.get(artist_id, 0) + 1 positions[artist_id] = positions.get(artist_id, 0) + 1 track_payload.append((artist_id, int(row["song_id"]), positions[artist_id])) target.executemany( """ insert into catalog_artists ( artist_id, artist_key, platform, remote_artist_id, name, normalized_name, avatar_url, description, playable_song_count ) values (?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ ( artist_id, artist_rows[artist_id]["artist_key"], artist_rows[artist_id]["platform"], artist_rows[artist_id]["remote_artist_id"], artist_rows[artist_id]["name"], artist_rows[artist_id]["normalized_name"], _extract_artist_avatar(artist_rows[artist_id]["metadata_json"]), _extract_artist_description(artist_rows[artist_id]["metadata_json"]), playable_song_counts[artist_id], ) for artist_id in sorted(artist_rows) ], ) target.executemany( """ insert into catalog_artist_tracks (artist_id, song_id, position) values (?, ?, ?) """, track_payload, ) def export_playlist_tracks(source: sqlite3.Connection, target: sqlite3.Connection) -> None: rows = source.execute( """ select ps.playlist_id, ps.song_id, coalesce(ps.position, 0) as position from playlist_songs ps join playlists p on p.id = ps.playlist_id where p.parse_strategy not in ({}) """.format(",".join("?" for _ in TOPLIST_PARSE_STRATEGIES)), tuple(sorted(TOPLIST_PARSE_STRATEGIES)), ).fetchall() target.executemany( """ insert into catalog_playlist_tracks (playlist_id, song_id, position) values (?, ?, ?) """, [(int(row["playlist_id"]), int(row["song_id"]), int(row["position"])) for row in rows], ) def export_toplists(source: sqlite3.Connection, target: sqlite3.Connection) -> None: toplist_rows = source.execute( """ select p.id, p.platform, p.remote_playlist_id, p.name, p.creator_name, p.cover_url, p.play_count, coalesce(ps.song_count, p.collected_song_count, 0) as song_count, coalesce(pps.playable_song_count, 0) as playable_song_count, p.parse_strategy from playlists p left join ( select playlist_id, count(*) as song_count from playlist_songs group by playlist_id ) ps on ps.playlist_id = p.id left join ( select playlist_song_keys.playlist_id, count(*) as playable_song_count from ( select distinct playlist_id, song_id from playlist_songs ) playlist_song_keys where exists ( select 1 from file_assets fa join file_locations fl on fl.file_asset_id = fa.id where fa.song_id = playlist_song_keys.song_id and fl.status = 'active' ) group by playlist_song_keys.playlist_id ) pps on pps.playlist_id = p.id where p.parse_strategy in ({}) """.format(",".join("?" for _ in TOPLIST_PARSE_STRATEGIES)), tuple(sorted(TOPLIST_PARSE_STRATEGIES)), ).fetchall() target.executemany( """ insert into catalog_toplists ( toplist_id, platform, name, description, cover_url, play_count, song_count, playable_song_count, group_name ) values (?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ ( f"{row['platform']}_top_{row['remote_playlist_id']}", row["platform"], row["name"], row["creator_name"], row["cover_url"], int(row["play_count"] or 0), int(row["song_count"] or 0), int(row["playable_song_count"] or 0), TOPLIST_GROUP_NAMES.get(row["platform"], row["platform"]), ) for row in toplist_rows ], ) target.executemany( """ insert into catalog_toplist_tracks (toplist_id, song_id, position) values (?, ?, ?) """, [ ( f"{row['platform']}_top_{row['remote_playlist_id']}", int(track["song_id"]), int(track["position"] or 0), ) for row in toplist_rows for track in source.execute( """ select song_id, position from playlist_songs where playlist_id = ? order by position asc """, (row["id"],), ).fetchall() ], ) def export_track_files(source: sqlite3.Connection, target: sqlite3.Connection) -> None: rows = source.execute( """ select fa.song_id, fa.quality_label, fa.ext, fa.file_size_bytes, sb.backend_type, sb.name as backend_name, fl.locator, coalesce(fl.public_url, fl.download_url) as public_url, fl.status, fl.is_primary from file_locations fl join file_assets fa on fa.id = fl.file_asset_id join storage_backends sb on sb.id = fl.backend_id """ ).fetchall() target.executemany( """ insert into catalog_track_files ( song_id, quality_label, ext, file_size_bytes, backend_type, backend_name, locator, public_url, status, is_primary ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ ( int(row["song_id"]), row["quality_label"], row["ext"], row["file_size_bytes"], row["backend_type"], row["backend_name"], row["locator"], row["public_url"], row["status"], int(row["is_primary"] or 0), ) for row in rows ], ) def build_catalog_read(source_db: str, target_db: str) -> None: source_path = Path(source_db).resolve() target_path = Path(target_db).resolve() target_path.parent.mkdir(parents=True, exist_ok=True) fd, temp_path_str = tempfile.mkstemp( prefix=target_path.stem + ".", suffix=".tmp", dir=str(target_path.parent), ) os.close(fd) temp_path = Path(temp_path_str) source: sqlite3.Connection | None = None target: sqlite3.Connection | None = None try: source = connect(str(source_path)) target = connect(str(temp_path)) create_schema(target) export_playlists(source, target) export_tracks(source, target) export_artists(source, target) export_playlist_tracks(source, target) export_toplists(source, target) export_track_files(source, target) target.commit() source.close() source = None target.close() target = None os.replace(temp_path, target_path) finally: if source is not None: source.close() if target is not None: target.close() if temp_path.exists(): temp_path.unlink() def main() -> None: args = parse_args() build_catalog_read(source_db=args.source_db, target_db=args.target_db) print(f"catalog_read.db written to {args.target_db}") if __name__ == "__main__": main()