263 lines
7.2 KiB
Python
263 lines
7.2 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import re
|
|
import sqlite3
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Iterable, List, Optional, TextIO, Tuple
|
|
|
|
|
|
LIVE_NAME_KEYWORDS = ("live", "现场", "演唱会")
|
|
LIVE_ALBUM_KEYWORDS = (
|
|
"演唱会",
|
|
"我是歌手",
|
|
"我们的歌",
|
|
"声生不息",
|
|
"时光音乐会",
|
|
"天赐的声音",
|
|
"披荆斩棘",
|
|
"乘风",
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SuspectedLiveSong:
|
|
song_id: int
|
|
platform: str
|
|
remote_song_id: str
|
|
name: str
|
|
singers: str
|
|
album: str
|
|
reason_codes: Tuple[str, ...]
|
|
|
|
|
|
def _normalize_text(value: Any) -> str:
|
|
return str(value or "").strip()
|
|
|
|
|
|
def _normalize_compact_text(value: Any) -> str:
|
|
text = _normalize_text(value).lower()
|
|
return re.sub(r"[\s\W_]+", "", text, flags=re.UNICODE)
|
|
|
|
|
|
def detect_suspected_live_reason_codes(name: Any, album: Any) -> List[str]:
|
|
name_text = _normalize_text(name)
|
|
album_text = _normalize_text(album)
|
|
normalized_name = name_text.lower()
|
|
normalized_album = album_text.lower()
|
|
reason_codes: List[str] = []
|
|
|
|
if any(keyword in normalized_name for keyword in LIVE_NAME_KEYWORDS):
|
|
reason_codes.append("name_keyword")
|
|
|
|
if not album_text or album_text.upper() == "NULL":
|
|
return reason_codes
|
|
|
|
compact_name = _normalize_compact_text(name_text)
|
|
compact_album = _normalize_compact_text(album_text)
|
|
if compact_name and compact_album and (
|
|
compact_album == compact_name or compact_album.startswith(compact_name)
|
|
):
|
|
return reason_codes
|
|
|
|
if any(keyword in normalized_album for keyword in LIVE_ALBUM_KEYWORDS):
|
|
reason_codes.append("album_show_keyword")
|
|
|
|
return reason_codes
|
|
|
|
|
|
def _connect_readonly_database(db_path: str | Path) -> sqlite3.Connection:
|
|
path = Path(db_path).resolve()
|
|
if not path.exists():
|
|
raise FileNotFoundError(f"Database not found: {path}")
|
|
conn = sqlite3.connect(f"{path.as_uri()}?mode=ro", uri=True)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def _song_scan_query(downloaded_only: bool) -> str:
|
|
where_clause = "WHERE d.song_id IS NOT NULL" if downloaded_only else ""
|
|
return f"""
|
|
WITH downloaded_song_ids AS (
|
|
SELECT DISTINCT fa.song_id
|
|
FROM file_locations AS fl
|
|
JOIN file_assets AS fa ON fa.id = fl.file_asset_id
|
|
JOIN storage_backends AS sb ON sb.id = fl.backend_id
|
|
WHERE fl.status = 'active'
|
|
AND sb.backend_type = 'local_fs'
|
|
)
|
|
SELECT
|
|
s.id,
|
|
s.platform,
|
|
s.remote_song_id,
|
|
s.name,
|
|
s.singers,
|
|
s.album
|
|
FROM songs AS s
|
|
LEFT JOIN downloaded_song_ids AS d ON d.song_id = s.id
|
|
{where_clause}
|
|
ORDER BY s.id DESC
|
|
"""
|
|
|
|
|
|
def scan_suspected_live_songs(
|
|
db_path: str | Path,
|
|
*,
|
|
downloaded_only: bool = True,
|
|
limit: Optional[int] = None,
|
|
) -> List[SuspectedLiveSong]:
|
|
normalized_limit = None if limit is None else max(int(limit), 0)
|
|
if normalized_limit == 0:
|
|
return []
|
|
|
|
conn = _connect_readonly_database(db_path)
|
|
try:
|
|
rows = conn.execute(_song_scan_query(downloaded_only)).fetchall()
|
|
finally:
|
|
conn.close()
|
|
|
|
matches: List[SuspectedLiveSong] = []
|
|
for row in rows:
|
|
reason_codes = detect_suspected_live_reason_codes(
|
|
name=row["name"],
|
|
album=row["album"],
|
|
)
|
|
if not reason_codes:
|
|
continue
|
|
matches.append(
|
|
SuspectedLiveSong(
|
|
song_id=int(row["id"]),
|
|
platform=_normalize_text(row["platform"]),
|
|
remote_song_id=_normalize_text(row["remote_song_id"]),
|
|
name=_normalize_text(row["name"]),
|
|
singers=_normalize_text(row["singers"]),
|
|
album=_normalize_text(row["album"]),
|
|
reason_codes=tuple(reason_codes),
|
|
)
|
|
)
|
|
if normalized_limit is not None and len(matches) >= normalized_limit:
|
|
break
|
|
return matches
|
|
|
|
|
|
def _song_to_row(song: SuspectedLiveSong) -> Dict[str, Any]:
|
|
return {
|
|
"song_id": song.song_id,
|
|
"platform": song.platform,
|
|
"remote_song_id": song.remote_song_id,
|
|
"name": song.name,
|
|
"singers": song.singers,
|
|
"album": song.album,
|
|
"reason_codes": ",".join(song.reason_codes),
|
|
}
|
|
|
|
|
|
def _write_csv(rows: Iterable[Dict[str, Any]], stream: TextIO) -> None:
|
|
fieldnames = [
|
|
"song_id",
|
|
"platform",
|
|
"remote_song_id",
|
|
"name",
|
|
"singers",
|
|
"album",
|
|
"reason_codes",
|
|
]
|
|
writer = csv.DictWriter(stream, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
for row in rows:
|
|
writer.writerow(row)
|
|
|
|
|
|
def _write_jsonl(rows: Iterable[Dict[str, Any]], stream: TextIO) -> None:
|
|
for row in rows:
|
|
stream.write(json.dumps(row, ensure_ascii=False) + "\n")
|
|
|
|
|
|
def _write_table(rows: Iterable[Dict[str, Any]], stream: TextIO) -> None:
|
|
headers = [
|
|
"song_id",
|
|
"platform",
|
|
"remote_song_id",
|
|
"name",
|
|
"singers",
|
|
"album",
|
|
"reason_codes",
|
|
]
|
|
stream.write("\t".join(headers) + "\n")
|
|
for row in rows:
|
|
stream.write("\t".join(str(row[header]) for header in headers) + "\n")
|
|
|
|
|
|
def _write_report(
|
|
songs: List[SuspectedLiveSong],
|
|
*,
|
|
output_format: str,
|
|
stream: TextIO,
|
|
) -> None:
|
|
rows = [_song_to_row(song) for song in songs]
|
|
if output_format == "csv":
|
|
_write_csv(rows, stream)
|
|
return
|
|
if output_format == "jsonl":
|
|
_write_jsonl(rows, stream)
|
|
return
|
|
_write_table(rows, stream)
|
|
|
|
|
|
def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="List suspected live/stage versions without modifying catalog-sync data.",
|
|
)
|
|
parser.add_argument("--db", required=True, help="Path to catalogsync.db")
|
|
parser.add_argument(
|
|
"--limit",
|
|
type=int,
|
|
default=None,
|
|
help="Maximum number of matched songs to return.",
|
|
)
|
|
parser.add_argument(
|
|
"--include-undownloaded",
|
|
action="store_true",
|
|
help="Scan all songs instead of only songs with active local files.",
|
|
)
|
|
parser.add_argument(
|
|
"--format",
|
|
choices=("table", "csv", "jsonl"),
|
|
default="table",
|
|
help="Output format for stdout and optional file output.",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
help="Optional path to write the report file.",
|
|
)
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def main(argv: Optional[List[str]] = None) -> int:
|
|
args = parse_args(argv)
|
|
songs = scan_suspected_live_songs(
|
|
args.db,
|
|
downloaded_only=not args.include_undownloaded,
|
|
limit=args.limit,
|
|
)
|
|
print(f"matched_song_count={len(songs)}", file=sys.stderr)
|
|
|
|
_write_report(songs, output_format=args.format, stream=sys.stdout)
|
|
|
|
if args.output:
|
|
output_path = Path(args.output).resolve()
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with output_path.open("w", encoding="utf-8", newline="") as handle:
|
|
_write_report(songs, output_format=args.format, stream=handle)
|
|
print(f"wrote_report={output_path}", file=sys.stderr)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|