Files

598 lines
18 KiB
Python

from __future__ import annotations
import argparse
import json
import os
import sqlite3
import tempfile
from pathlib import Path
TOPLIST_PARSE_STRATEGIES = {"netease_toplist", "qq_toplist", "kuwo_toplist"}
TOPLIST_GROUP_NAMES = {
"qq": "QQ音乐",
"netease": "网易云",
"kuwo": "酷我",
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Build catalog_read.db from catalogsync.db for Music_Server."
)
parser.add_argument("--source-db", required=True, help="Path to catalogsync.db")
parser.add_argument("--target-db", required=True, help="Path to catalog_read.db")
return parser.parse_args()
def connect(path: str) -> sqlite3.Connection:
conn = sqlite3.connect(path)
conn.row_factory = sqlite3.Row
return conn
def create_schema(conn: sqlite3.Connection) -> None:
conn.executescript(
"""
create table catalog_playlists (
playlist_id integer primary key,
platform text not null,
remote_playlist_id text not null,
name text not null,
description text,
cover_url text,
play_count integer not null,
song_count integer not null,
playable_song_count integer not null
);
create table catalog_tracks (
song_id integer primary key,
platform text not null,
remote_song_id text not null,
name text not null,
singers text,
album text,
cover_url text,
duration_ms integer,
metadata_json text
);
create table catalog_playlist_tracks (
playlist_id integer not null,
song_id integer not null,
position integer not null
);
create table catalog_toplists (
toplist_id text primary key,
platform text not null,
name text not null,
description text,
cover_url text,
play_count integer not null,
song_count integer not null,
playable_song_count integer not null,
group_name text not null
);
create table catalog_toplist_tracks (
toplist_id text not null,
song_id integer not null,
position integer not null
);
create table catalog_track_files (
song_id integer not null,
quality_label text,
ext text,
file_size_bytes integer,
backend_type text,
backend_name text,
locator text,
public_url text,
status text,
is_primary integer
);
create table catalog_artists (
artist_id integer primary key,
artist_key text not null unique,
platform text not null,
remote_artist_id text,
name text not null,
normalized_name text not null,
avatar_url text,
description text,
playable_song_count integer not null
);
create table catalog_artist_tracks (
artist_id integer not null,
song_id integer not null,
position integer not null
);
create index idx_catalog_playlist_tracks_playlist on catalog_playlist_tracks (playlist_id, position);
create index idx_catalog_toplist_tracks_toplist on catalog_toplist_tracks (toplist_id, position);
create index idx_catalog_track_files_song on catalog_track_files (song_id, status, is_primary);
create index idx_catalog_artist_tracks_artist on catalog_artist_tracks (artist_id, position);
"""
)
def _extract_song_cover(metadata_json: str | None) -> str | None:
if not metadata_json:
return None
try:
payload = json.loads(metadata_json)
except json.JSONDecodeError:
return None
snapshot = payload.get("snapshot") or {}
raw_data = snapshot.get("raw_data") or {}
if isinstance(snapshot.get("cover_url"), str) and snapshot.get("cover_url"):
return snapshot["cover_url"]
search = raw_data.get("search") or {}
album = search.get("al") or {}
if isinstance(album.get("picUrl"), str) and album.get("picUrl"):
return album["picUrl"]
return None
def _extract_duration_ms(duration_seconds: int | None, metadata_json: str | None) -> int | None:
if duration_seconds is not None:
return int(duration_seconds) * 1000
if not metadata_json:
return None
try:
payload = json.loads(metadata_json)
except json.JSONDecodeError:
return None
snapshot = payload.get("snapshot") or {}
raw_data = snapshot.get("raw_data") or {}
search = raw_data.get("search") or {}
duration_ms = search.get("dt")
if duration_ms is None:
duration_s = snapshot.get("duration_s")
return int(duration_s) * 1000 if duration_s is not None else None
return int(duration_ms)
def _extract_artist_avatar(metadata_json: str | None) -> str | None:
if not metadata_json:
return None
try:
payload = json.loads(metadata_json)
except json.JSONDecodeError:
return None
avatar = payload.get("avatar") or payload.get("avatar_url") or payload.get("cover_url")
return avatar if isinstance(avatar, str) and avatar else None
def _extract_artist_description(metadata_json: str | None) -> str | None:
if not metadata_json:
return None
try:
payload = json.loads(metadata_json)
except json.JSONDecodeError:
return None
description = payload.get("description") or payload.get("desc")
return description if isinstance(description, str) and description else None
def export_playlists(source: sqlite3.Connection, target: sqlite3.Connection) -> None:
rows = source.execute(
"""
select
p.id,
p.platform,
p.remote_playlist_id,
p.name,
p.creator_name,
p.cover_url,
p.play_count,
coalesce(ps.song_count, p.collected_song_count, 0) as song_count,
coalesce(pps.playable_song_count, 0) as playable_song_count
from playlists p
left join (
select playlist_id, count(*) as song_count
from playlist_songs
group by playlist_id
) ps on ps.playlist_id = p.id
left join (
select
playlist_song_keys.playlist_id,
count(*) as playable_song_count
from (
select distinct playlist_id, song_id
from playlist_songs
) playlist_song_keys
where exists (
select 1
from file_assets fa
join file_locations fl on fl.file_asset_id = fa.id
where fa.song_id = playlist_song_keys.song_id
and fl.status = 'active'
)
group by playlist_song_keys.playlist_id
) pps on pps.playlist_id = p.id
where p.parse_strategy not in ({})
""".format(",".join("?" for _ in TOPLIST_PARSE_STRATEGIES)),
tuple(sorted(TOPLIST_PARSE_STRATEGIES)),
).fetchall()
target.executemany(
"""
insert into catalog_playlists (
playlist_id,
platform,
remote_playlist_id,
name,
description,
cover_url,
play_count,
song_count,
playable_song_count
) values (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
(
int(row["id"]),
row["platform"],
row["remote_playlist_id"],
row["name"],
row["creator_name"],
row["cover_url"],
int(row["play_count"] or 0),
int(row["song_count"] or 0),
int(row["playable_song_count"] or 0),
)
for row in rows
],
)
def export_tracks(source: sqlite3.Connection, target: sqlite3.Connection) -> None:
rows = source.execute(
"""
select
id,
platform,
remote_song_id,
name,
singers,
album,
duration_seconds,
metadata_json
from songs
"""
).fetchall()
target.executemany(
"""
insert into catalog_tracks (
song_id,
platform,
remote_song_id,
name,
singers,
album,
cover_url,
duration_ms,
metadata_json
) values (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
(
int(row["id"]),
row["platform"],
row["remote_song_id"],
row["name"],
row["singers"],
row["album"],
_extract_song_cover(row["metadata_json"]),
_extract_duration_ms(row["duration_seconds"], row["metadata_json"]),
row["metadata_json"],
)
for row in rows
],
)
def export_artists(source: sqlite3.Connection, target: sqlite3.Connection) -> None:
exportable_rows = source.execute(
"""
select distinct
a.id as artist_id,
a.artist_key,
a.platform,
a.remote_artist_id,
a.name,
a.normalized_name,
a.metadata_json,
songs.id as song_id,
songs.name as song_name
from artists a
join artist_songs s on s.artist_id = a.id
join songs songs on songs.id = s.song_id
where exists (
select 1
from file_assets fa
join file_locations fl on fl.file_asset_id = fa.id
where fa.song_id = s.song_id
and fl.status = 'active'
)
order by a.id asc, lower(songs.name) asc, songs.id asc
"""
).fetchall()
artist_rows: dict[int, sqlite3.Row] = {}
playable_song_counts: dict[int, int] = {}
positions: dict[int, int] = {}
track_payload: list[tuple[int, int, int]] = []
for row in exportable_rows:
artist_id = int(row["artist_id"])
if artist_id not in artist_rows:
artist_rows[artist_id] = row
playable_song_counts[artist_id] = playable_song_counts.get(artist_id, 0) + 1
positions[artist_id] = positions.get(artist_id, 0) + 1
track_payload.append((artist_id, int(row["song_id"]), positions[artist_id]))
target.executemany(
"""
insert into catalog_artists (
artist_id,
artist_key,
platform,
remote_artist_id,
name,
normalized_name,
avatar_url,
description,
playable_song_count
) values (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
(
artist_id,
artist_rows[artist_id]["artist_key"],
artist_rows[artist_id]["platform"],
artist_rows[artist_id]["remote_artist_id"],
artist_rows[artist_id]["name"],
artist_rows[artist_id]["normalized_name"],
_extract_artist_avatar(artist_rows[artist_id]["metadata_json"]),
_extract_artist_description(artist_rows[artist_id]["metadata_json"]),
playable_song_counts[artist_id],
)
for artist_id in sorted(artist_rows)
],
)
target.executemany(
"""
insert into catalog_artist_tracks (artist_id, song_id, position)
values (?, ?, ?)
""",
track_payload,
)
def export_playlist_tracks(source: sqlite3.Connection, target: sqlite3.Connection) -> None:
rows = source.execute(
"""
select ps.playlist_id, ps.song_id, coalesce(ps.position, 0) as position
from playlist_songs ps
join playlists p on p.id = ps.playlist_id
where p.parse_strategy not in ({})
""".format(",".join("?" for _ in TOPLIST_PARSE_STRATEGIES)),
tuple(sorted(TOPLIST_PARSE_STRATEGIES)),
).fetchall()
target.executemany(
"""
insert into catalog_playlist_tracks (playlist_id, song_id, position)
values (?, ?, ?)
""",
[(int(row["playlist_id"]), int(row["song_id"]), int(row["position"])) for row in rows],
)
def export_toplists(source: sqlite3.Connection, target: sqlite3.Connection) -> None:
toplist_rows = source.execute(
"""
select
p.id,
p.platform,
p.remote_playlist_id,
p.name,
p.creator_name,
p.cover_url,
p.play_count,
coalesce(ps.song_count, p.collected_song_count, 0) as song_count,
coalesce(pps.playable_song_count, 0) as playable_song_count,
p.parse_strategy
from playlists p
left join (
select playlist_id, count(*) as song_count
from playlist_songs
group by playlist_id
) ps on ps.playlist_id = p.id
left join (
select
playlist_song_keys.playlist_id,
count(*) as playable_song_count
from (
select distinct playlist_id, song_id
from playlist_songs
) playlist_song_keys
where exists (
select 1
from file_assets fa
join file_locations fl on fl.file_asset_id = fa.id
where fa.song_id = playlist_song_keys.song_id
and fl.status = 'active'
)
group by playlist_song_keys.playlist_id
) pps on pps.playlist_id = p.id
where p.parse_strategy in ({})
""".format(",".join("?" for _ in TOPLIST_PARSE_STRATEGIES)),
tuple(sorted(TOPLIST_PARSE_STRATEGIES)),
).fetchall()
target.executemany(
"""
insert into catalog_toplists (
toplist_id,
platform,
name,
description,
cover_url,
play_count,
song_count,
playable_song_count,
group_name
) values (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
(
f"{row['platform']}_top_{row['remote_playlist_id']}",
row["platform"],
row["name"],
row["creator_name"],
row["cover_url"],
int(row["play_count"] or 0),
int(row["song_count"] or 0),
int(row["playable_song_count"] or 0),
TOPLIST_GROUP_NAMES.get(row["platform"], row["platform"]),
)
for row in toplist_rows
],
)
target.executemany(
"""
insert into catalog_toplist_tracks (toplist_id, song_id, position)
values (?, ?, ?)
""",
[
(
f"{row['platform']}_top_{row['remote_playlist_id']}",
int(track["song_id"]),
int(track["position"] or 0),
)
for row in toplist_rows
for track in source.execute(
"""
select song_id, position
from playlist_songs
where playlist_id = ?
order by position asc
""",
(row["id"],),
).fetchall()
],
)
def export_track_files(source: sqlite3.Connection, target: sqlite3.Connection) -> None:
rows = source.execute(
"""
select
fa.song_id,
fa.quality_label,
fa.ext,
fa.file_size_bytes,
sb.backend_type,
sb.name as backend_name,
fl.locator,
coalesce(fl.public_url, fl.download_url) as public_url,
fl.status,
fl.is_primary
from file_locations fl
join file_assets fa on fa.id = fl.file_asset_id
join storage_backends sb on sb.id = fl.backend_id
"""
).fetchall()
target.executemany(
"""
insert into catalog_track_files (
song_id,
quality_label,
ext,
file_size_bytes,
backend_type,
backend_name,
locator,
public_url,
status,
is_primary
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
(
int(row["song_id"]),
row["quality_label"],
row["ext"],
row["file_size_bytes"],
row["backend_type"],
row["backend_name"],
row["locator"],
row["public_url"],
row["status"],
int(row["is_primary"] or 0),
)
for row in rows
],
)
def build_catalog_read(source_db: str, target_db: str) -> None:
source_path = Path(source_db).resolve()
target_path = Path(target_db).resolve()
target_path.parent.mkdir(parents=True, exist_ok=True)
fd, temp_path_str = tempfile.mkstemp(
prefix=target_path.stem + ".",
suffix=".tmp",
dir=str(target_path.parent),
)
os.close(fd)
temp_path = Path(temp_path_str)
source: sqlite3.Connection | None = None
target: sqlite3.Connection | None = None
try:
source = connect(str(source_path))
target = connect(str(temp_path))
create_schema(target)
export_playlists(source, target)
export_tracks(source, target)
export_artists(source, target)
export_playlist_tracks(source, target)
export_toplists(source, target)
export_track_files(source, target)
target.commit()
source.close()
source = None
target.close()
target = None
os.replace(temp_path, target_path)
finally:
if source is not None:
source.close()
if target is not None:
target.close()
if temp_path.exists():
temp_path.unlink()
def main() -> None:
args = parse_args()
build_catalog_read(source_db=args.source_db, target_db=args.target_db)
print(f"catalog_read.db written to {args.target_db}")
if __name__ == "__main__":
main()