from __future__ import annotations import argparse import csv import json import re import sqlite3 import sys from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, TextIO, Tuple LIVE_NAME_KEYWORDS = ("live", "现场", "演唱会") LIVE_ALBUM_KEYWORDS = ( "演唱会", "我是歌手", "我们的歌", "声生不息", "时光音乐会", "天赐的声音", "披荆斩棘", "乘风", ) @dataclass(frozen=True) class SuspectedLiveSong: song_id: int platform: str remote_song_id: str name: str singers: str album: str reason_codes: Tuple[str, ...] def _normalize_text(value: Any) -> str: return str(value or "").strip() def _normalize_compact_text(value: Any) -> str: text = _normalize_text(value).lower() return re.sub(r"[\s\W_]+", "", text, flags=re.UNICODE) def detect_suspected_live_reason_codes(name: Any, album: Any) -> List[str]: name_text = _normalize_text(name) album_text = _normalize_text(album) normalized_name = name_text.lower() normalized_album = album_text.lower() reason_codes: List[str] = [] if any(keyword in normalized_name for keyword in LIVE_NAME_KEYWORDS): reason_codes.append("name_keyword") if not album_text or album_text.upper() == "NULL": return reason_codes compact_name = _normalize_compact_text(name_text) compact_album = _normalize_compact_text(album_text) if compact_name and compact_album and ( compact_album == compact_name or compact_album.startswith(compact_name) ): return reason_codes if any(keyword in normalized_album for keyword in LIVE_ALBUM_KEYWORDS): reason_codes.append("album_show_keyword") return reason_codes def _connect_readonly_database(db_path: str | Path) -> sqlite3.Connection: path = Path(db_path).resolve() if not path.exists(): raise FileNotFoundError(f"Database not found: {path}") conn = sqlite3.connect(f"{path.as_uri()}?mode=ro", uri=True) conn.row_factory = sqlite3.Row return conn def _song_scan_query(downloaded_only: bool) -> str: where_clause = "WHERE d.song_id IS NOT NULL" if downloaded_only else "" return f""" WITH downloaded_song_ids AS ( SELECT DISTINCT fa.song_id FROM file_locations AS fl JOIN file_assets AS fa ON fa.id = fl.file_asset_id JOIN storage_backends AS sb ON sb.id = fl.backend_id WHERE fl.status = 'active' AND sb.backend_type = 'local_fs' ) SELECT s.id, s.platform, s.remote_song_id, s.name, s.singers, s.album FROM songs AS s LEFT JOIN downloaded_song_ids AS d ON d.song_id = s.id {where_clause} ORDER BY s.id DESC """ def scan_suspected_live_songs( db_path: str | Path, *, downloaded_only: bool = True, limit: Optional[int] = None, ) -> List[SuspectedLiveSong]: normalized_limit = None if limit is None else max(int(limit), 0) if normalized_limit == 0: return [] conn = _connect_readonly_database(db_path) try: rows = conn.execute(_song_scan_query(downloaded_only)).fetchall() finally: conn.close() matches: List[SuspectedLiveSong] = [] for row in rows: reason_codes = detect_suspected_live_reason_codes( name=row["name"], album=row["album"], ) if not reason_codes: continue matches.append( SuspectedLiveSong( song_id=int(row["id"]), platform=_normalize_text(row["platform"]), remote_song_id=_normalize_text(row["remote_song_id"]), name=_normalize_text(row["name"]), singers=_normalize_text(row["singers"]), album=_normalize_text(row["album"]), reason_codes=tuple(reason_codes), ) ) if normalized_limit is not None and len(matches) >= normalized_limit: break return matches def _song_to_row(song: SuspectedLiveSong) -> Dict[str, Any]: return { "song_id": song.song_id, "platform": song.platform, "remote_song_id": song.remote_song_id, "name": song.name, "singers": song.singers, "album": song.album, "reason_codes": ",".join(song.reason_codes), } def _write_csv(rows: Iterable[Dict[str, Any]], stream: TextIO) -> None: fieldnames = [ "song_id", "platform", "remote_song_id", "name", "singers", "album", "reason_codes", ] writer = csv.DictWriter(stream, fieldnames=fieldnames) writer.writeheader() for row in rows: writer.writerow(row) def _write_jsonl(rows: Iterable[Dict[str, Any]], stream: TextIO) -> None: for row in rows: stream.write(json.dumps(row, ensure_ascii=False) + "\n") def _write_table(rows: Iterable[Dict[str, Any]], stream: TextIO) -> None: headers = [ "song_id", "platform", "remote_song_id", "name", "singers", "album", "reason_codes", ] stream.write("\t".join(headers) + "\n") for row in rows: stream.write("\t".join(str(row[header]) for header in headers) + "\n") def _write_report( songs: List[SuspectedLiveSong], *, output_format: str, stream: TextIO, ) -> None: rows = [_song_to_row(song) for song in songs] if output_format == "csv": _write_csv(rows, stream) return if output_format == "jsonl": _write_jsonl(rows, stream) return _write_table(rows, stream) def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace: parser = argparse.ArgumentParser( description="List suspected live/stage versions without modifying catalog-sync data.", ) parser.add_argument("--db", required=True, help="Path to catalogsync.db") parser.add_argument( "--limit", type=int, default=None, help="Maximum number of matched songs to return.", ) parser.add_argument( "--include-undownloaded", action="store_true", help="Scan all songs instead of only songs with active local files.", ) parser.add_argument( "--format", choices=("table", "csv", "jsonl"), default="table", help="Output format for stdout and optional file output.", ) parser.add_argument( "--output", help="Optional path to write the report file.", ) return parser.parse_args(argv) def main(argv: Optional[List[str]] = None) -> int: args = parse_args(argv) songs = scan_suspected_live_songs( args.db, downloaded_only=not args.include_undownloaded, limit=args.limit, ) print(f"matched_song_count={len(songs)}", file=sys.stderr) _write_report(songs, output_format=args.format, stream=sys.stdout) if args.output: output_path = Path(args.output).resolve() output_path.parent.mkdir(parents=True, exist_ok=True) with output_path.open("w", encoding="utf-8", newline="") as handle: _write_report(songs, output_format=args.format, stream=handle) print(f"wrote_report={output_path}", file=sys.stderr) return 0 if __name__ == "__main__": raise SystemExit(main())