from __future__ import annotations import json import sqlite3 from contextlib import contextmanager from pathlib import Path from typing import Any from .db import connect_database from .models import CatalogSong, PlaylistCandidate ALLOWED_COUNT_TABLES = { "artist_pools", "artist_songs", "artists", "download_tasks", "file_assets", "file_locations", "playlist_pools", "playlist_songs", "playlists", "pool_artists", "pool_playlists", "song_backend_presence", "songs", "storage_backends", "upload_tasks", } UPLOAD_TASK_STATUSES = {"pending", "uploading", "succeeded", "failed", "skipped"} PLAYLIST_STATE_LABELS = { "unsynced": "未同步", "not_downloaded": "未下载", "downloading": "下载中", "partial": "部分已下载", "downloaded": "已下载", } PLAYLIST_STATE_CODES = set(PLAYLIST_STATE_LABELS.keys()) PLAYLIST_SORT_FIELDS = {"id", "platform", "name", "play_count"} PLAYLIST_SORT_DEFAULTS = { "id": "desc", "platform": "asc", "name": "asc", "play_count": "desc", } PLAYLIST_SORT_DIRECTIONS = {"asc", "desc"} def json_dumps(data: dict[str, Any] | None) -> str | None: if data in (None, {}): return None return json.dumps(data, ensure_ascii=False) def _progress_percent(completed: int, total: int) -> int: normalized_total = max(int(total or 0), 0) normalized_completed = max(int(completed or 0), 0) if normalized_total <= 0: return 0 if normalized_completed >= normalized_total: return 100 return int((normalized_completed * 100) / normalized_total) class CatalogRepository: def __init__(self, db_path: str | Path): self.db_path = Path(db_path) def _connect(self) -> sqlite3.Connection: return connect_database(self.db_path) @contextmanager def _connection(self): conn = self._connect() try: yield conn conn.commit() finally: conn.close() def _fetchall(self, query: str, params: tuple[Any, ...] = ()) -> list[sqlite3.Row]: with self._connection() as conn: rows = conn.execute(query, params).fetchall() return rows def _fetchone(self, query: str, params: tuple[Any, ...] = ()) -> sqlite3.Row | None: with self._connection() as conn: row = conn.execute(query, params).fetchone() return row def _execute(self, query: str, params: tuple[Any, ...] = ()) -> int: with self._connection() as conn: cursor = conn.execute(query, params) return cursor.lastrowid def count_rows(self, table_name: str) -> int: if table_name not in ALLOWED_COUNT_TABLES: raise ValueError(f"Unsupported table name: {table_name}") row = self._fetchone(f"SELECT COUNT(*) AS count FROM {table_name}") return int(row["count"]) if row else 0 @staticmethod def _playlist_base_cte_sql() -> str: return """ WITH local_downloaded_songs AS ( SELECT DISTINCT fa.song_id FROM file_locations fl JOIN file_assets fa ON fa.id = fl.file_asset_id JOIN storage_backends sb ON sb.id = fl.backend_id WHERE fl.status = 'active' AND sb.backend_type = 'local_fs' ), running_download_songs AS ( SELECT DISTINCT ji.song_id FROM job_items ji JOIN job_stages js ON js.id = ji.job_stage_id JOIN job_runs jr ON jr.id = js.job_run_id WHERE ji.status = 'running' AND js.stage_type = 'download' AND js.status = 'running' AND jr.status = 'running' AND ji.song_id IS NOT NULL ), playlist_stats AS ( SELECT p.id, p.platform, p.remote_playlist_id, p.name, p.play_count, p.collected_song_count, p.updated_at, GROUP_CONCAT(DISTINCT pp.name) AS pool_names, COUNT(DISTINCT ps.song_id) AS song_count, COUNT(DISTINCT CASE WHEN lds.song_id IS NOT NULL THEN ps.song_id END) AS downloaded_song_count, COUNT(DISTINCT CASE WHEN rds.song_id IS NOT NULL THEN ps.song_id END) AS running_download_song_count, COALESCE(pref.is_wanted, 0) AS is_wanted, pref.marked_by FROM playlists p LEFT JOIN pool_playlists rel ON rel.playlist_id = p.id LEFT JOIN playlist_pools pp ON pp.id = rel.pool_id LEFT JOIN playlist_download_preferences pref ON pref.playlist_id = p.id LEFT JOIN playlist_songs ps ON ps.playlist_id = p.id LEFT JOIN local_downloaded_songs lds ON lds.song_id = ps.song_id LEFT JOIN running_download_songs rds ON rds.song_id = ps.song_id GROUP BY p.id ), playlist_base AS ( SELECT *, CASE WHEN song_count > 0 THEN song_count WHEN collected_song_count IS NOT NULL THEN collected_song_count ELSE 0 END AS display_song_count, CASE WHEN song_count = 0 AND collected_song_count IS NOT NULL THEN 1 ELSE 0 END AS is_song_count_estimated, CASE WHEN song_count = 0 THEN 'unsynced' WHEN running_download_song_count > 0 THEN 'downloading' WHEN downloaded_song_count = 0 THEN 'not_downloaded' WHEN downloaded_song_count < song_count THEN 'partial' ELSE 'downloaded' END AS state_code FROM playlist_stats ) """ def upsert_playlist_pool( self, platform: str, pool_kind: str, external_id: str, name: str, url: str | None = None, metadata: dict[str, Any] | None = None, ) -> int: with self._connection() as conn: conn.execute( """ INSERT INTO playlist_pools (platform, pool_kind, external_id, name, url, metadata_json) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(platform, pool_kind, external_id) DO UPDATE SET name = excluded.name, url = excluded.url, metadata_json = excluded.metadata_json, updated_at = CURRENT_TIMESTAMP """, (platform, pool_kind, external_id, name, url, json_dumps(metadata)), ) row = conn.execute( """ SELECT id FROM playlist_pools WHERE platform = ? AND pool_kind = ? AND external_id = ? """, (platform, pool_kind, external_id), ).fetchone() return int(row["id"]) def get_playlist_pool(self, pool_id: int) -> sqlite3.Row | None: return self._fetchone("SELECT * FROM playlist_pools WHERE id = ?", (pool_id,)) def get_or_create_manual_file_pool(self, playlist_file: str | Path, platform: str) -> int: resolved = Path(playlist_file).resolve() return self.upsert_playlist_pool( platform=platform, pool_kind="manual_file", external_id=f"manual_file:{resolved}:{platform}", name=f"Manual File Import: {resolved.name} ({platform})", url=str(resolved), metadata={"playlist_file": str(resolved)}, ) def upsert_playlist(self, candidate: PlaylistCandidate) -> int: with self._connection() as conn: conn.execute( """ INSERT INTO playlists ( platform, remote_playlist_id, name, url, parse_strategy, cover_url, creator_name, play_count, collected_song_count, metadata_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(platform, remote_playlist_id) DO UPDATE SET name = excluded.name, url = excluded.url, parse_strategy = excluded.parse_strategy, cover_url = excluded.cover_url, creator_name = excluded.creator_name, play_count = excluded.play_count, collected_song_count = excluded.collected_song_count, metadata_json = excluded.metadata_json, updated_at = CURRENT_TIMESTAMP """, ( candidate.platform, candidate.remote_id, candidate.name, candidate.url, candidate.parse_strategy, candidate.cover_url, candidate.creator_name, candidate.play_count, candidate.collected_song_count, json_dumps(candidate.metadata), ), ) row = conn.execute( """ SELECT id FROM playlists WHERE platform = ? AND remote_playlist_id = ? """, (candidate.platform, candidate.remote_id), ).fetchone() return int(row["id"]) def link_pool_playlist(self, pool_id: int, playlist_id: int) -> None: self._execute( """ INSERT OR IGNORE INTO pool_playlists (pool_id, playlist_id) VALUES (?, ?) """, (pool_id, playlist_id), ) def list_playlists(self, sources: list[str] | None = None, limit: int | None = None) -> list[sqlite3.Row]: query = "SELECT * FROM playlists" params: list[Any] = [] if sources: placeholders = ", ".join("?" for _ in sources) query += f" WHERE platform IN ({placeholders})" params.extend(sources) query += " ORDER BY id ASC" if limit is not None: query += " LIMIT ?" params.append(limit) return self._fetchall(query, tuple(params)) def list_playlists_by_ids(self, playlist_ids: list[int]) -> list[sqlite3.Row]: if not playlist_ids: return [] placeholders = ", ".join("?" for _ in playlist_ids) return self._fetchall( f"SELECT * FROM playlists WHERE id IN ({placeholders}) ORDER BY id ASC", tuple(playlist_ids), ) def update_playlist_play_count(self, playlist_id: int, play_count: int) -> None: self._execute( """ UPDATE playlists SET play_count = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (int(play_count), int(playlist_id)), ) def mark_playlists_wanted(self, playlist_ids: list[int], marked_by: str | None = None) -> None: normalized_ids = [int(playlist_id) for playlist_id in playlist_ids if int(playlist_id) > 0] if not normalized_ids: return with self._connection() as conn: conn.executemany( """ INSERT INTO playlist_download_preferences (playlist_id, is_wanted, marked_by) VALUES (?, 1, ?) ON CONFLICT(playlist_id) DO UPDATE SET is_wanted = 1, marked_by = excluded.marked_by, updated_at = CURRENT_TIMESTAMP """, [(playlist_id, marked_by) for playlist_id in normalized_ids], ) def unmark_playlists_wanted(self, playlist_ids: list[int]) -> None: normalized_ids = [int(playlist_id) for playlist_id in playlist_ids if int(playlist_id) > 0] if not normalized_ids: return placeholders = ", ".join("?" for _ in normalized_ids) with self._connection() as conn: conn.execute( f"DELETE FROM playlist_download_preferences WHERE playlist_id IN ({placeholders})", tuple(normalized_ids), ) def list_playlist_page( self, *, page: int = 1, page_size: int = 50, platform: str | None = None, pool_kind: str | None = None, status: str | None = None, keyword: str | None = None, wanted_only: bool = False, sort_by: str | None = None, sort_dir: str | None = None, ) -> dict[str, Any]: normalized_page = max(int(page), 1) normalized_page_size = max(int(page_size), 1) offset = (normalized_page - 1) * normalized_page_size if status is not None and status not in PLAYLIST_STATE_CODES: raise ValueError(f"Unsupported playlist status: {status}") if sort_by is not None and sort_by not in PLAYLIST_SORT_FIELDS: raise ValueError(f"Unsupported playlist sort field: {sort_by}") normalized_sort_dir = None if sort_by is not None: normalized_sort_dir = str(sort_dir or PLAYLIST_SORT_DEFAULTS[sort_by]).lower() if normalized_sort_dir not in PLAYLIST_SORT_DIRECTIONS: raise ValueError(f"Unsupported playlist sort direction: {sort_dir}") filters: list[str] = [] params: list[Any] = [] if platform: filters.append("pb.platform = ?") params.append(platform) if pool_kind: filters.append( """ EXISTS ( SELECT 1 FROM pool_playlists rel JOIN playlist_pools pp ON pp.id = rel.pool_id WHERE rel.playlist_id = pb.id AND pp.pool_kind = ? ) """.strip() ) params.append(pool_kind) if status: filters.append("pb.state_code = ?") params.append(status) if keyword: filters.append("(pb.name LIKE ? OR pb.remote_playlist_id LIKE ?)") params.extend([f"%{keyword}%", f"%{keyword}%"]) if wanted_only: filters.append("pb.is_wanted = 1") where_sql = f"WHERE {' AND '.join(filters)}" if filters else "" base_sql = self._playlist_base_cte_sql() rows = self._fetchall( base_sql + f""" SELECT * FROM playlist_base pb {where_sql} {self._playlist_page_order_sql(sort_by=sort_by, sort_dir=normalized_sort_dir)} LIMIT ? OFFSET ? """, tuple(params + [normalized_page_size, offset]), ) total_row = self._fetchone( base_sql + f""" SELECT COUNT(*) AS count_value FROM playlist_base pb {where_sql} """, tuple(params), ) total_count = int(total_row["count_value"]) if total_row else 0 total_pages = (total_count + normalized_page_size - 1) // normalized_page_size items: list[dict[str, Any]] = [] for row in rows: payload = dict(row) payload["state_label"] = PLAYLIST_STATE_LABELS.get(payload["state_code"], payload["state_code"]) payload["progress_percent"] = _progress_percent( int(payload.get("downloaded_song_count") or 0), int(payload.get("song_count") or 0), ) items.append(payload) return { "items": items, "page": normalized_page, "page_size": normalized_page_size, "total_count": total_count, "total_pages": max(total_pages, 1), } def list_playlist_export_state_rows(self, playlist_ids: list[int]) -> list[dict[str, Any]]: normalized_ids: list[int] = [] seen: set[int] = set() for value in playlist_ids: playlist_id = int(value) if playlist_id <= 0 or playlist_id in seen: continue normalized_ids.append(playlist_id) seen.add(playlist_id) if not normalized_ids: return [] placeholders = ", ".join("?" for _ in normalized_ids) rows = self._fetchall( self._playlist_base_cte_sql() + f""" SELECT pb.id, pb.platform, pb.remote_playlist_id, pb.name, pb.play_count, pb.collected_song_count, pb.song_count, pb.downloaded_song_count, pb.running_download_song_count, pb.state_code FROM playlist_base pb WHERE pb.id IN ({placeholders}) """, tuple(normalized_ids), ) payload_by_id: dict[int, dict[str, Any]] = {} for row in rows: payload = dict(row) payload["state_label"] = PLAYLIST_STATE_LABELS.get(payload["state_code"], payload["state_code"]) payload["progress_percent"] = _progress_percent( int(payload.get("downloaded_song_count") or 0), int(payload.get("song_count") or 0), ) payload_by_id[int(payload["id"])] = payload return [payload_by_id[playlist_id] for playlist_id in normalized_ids if playlist_id in payload_by_id] @staticmethod def _playlist_page_order_sql(*, sort_by: str | None, sort_dir: str | None) -> str: if sort_by is None: return "ORDER BY pb.updated_at DESC, pb.id DESC" direction = str(sort_dir or PLAYLIST_SORT_DEFAULTS[sort_by]).upper() if sort_by == "id": return f"ORDER BY pb.id {direction}" if sort_by == "platform": return f"ORDER BY LOWER(pb.platform) {direction}, pb.id DESC" if sort_by == "name": return f"ORDER BY LOWER(pb.name) {direction}, pb.id DESC" return ( "ORDER BY CASE WHEN pb.play_count IS NULL THEN 1 ELSE 0 END ASC, " f"pb.play_count {direction}, pb.id DESC" ) def list_playlist_song_details( self, playlist_id: int, *, limit: int = 2000, ) -> list[dict[str, Any]]: normalized_playlist_id = int(playlist_id) normalized_limit = max(min(int(limit), 5000), 1) rows = self._fetchall( """ SELECT ps.song_id, ps.position, s.platform, s.remote_song_id, s.name, s.singers, s.album, s.metadata_json, COALESCE( ( SELECT fa.ext FROM file_assets fa WHERE fa.song_id = s.id ORDER BY COALESCE(fa.file_size_bytes, -1) DESC, fa.id ASC LIMIT 1 ), s.ext ) AS ext, COALESCE( ( SELECT fa.file_size_bytes FROM file_assets fa WHERE fa.song_id = s.id ORDER BY COALESCE(fa.file_size_bytes, -1) DESC, fa.id ASC LIMIT 1 ), s.file_size_bytes ) AS file_size_bytes FROM playlist_songs ps JOIN songs s ON s.id = ps.song_id WHERE ps.playlist_id = ? ORDER BY COALESCE(ps.position, 2147483647) ASC, ps.song_id ASC LIMIT ? """, (normalized_playlist_id, normalized_limit), ) items = [dict(row) for row in rows] if not items: return [] for item in items: metadata_payload: dict[str, Any] = {} metadata_text = item.get("metadata_json") if metadata_text: try: parsed = json.loads(str(metadata_text)) metadata_payload = parsed if isinstance(parsed, dict) else {} except Exception: metadata_payload = {} snapshot_payload = metadata_payload.get("snapshot") snapshot = snapshot_payload if isinstance(snapshot_payload, dict) else {} raw_payload = metadata_payload.get("raw_data") raw_data = raw_payload if isinstance(raw_payload, dict) else {} search_payload = raw_data.get("search") search_data = search_payload if isinstance(search_payload, dict) else {} cover_candidates = [ snapshot.get("cover_url"), snapshot.get("coverUrl"), raw_data.get("cover_url"), raw_data.get("coverUrl"), search_data.get("cover_url"), search_data.get("coverUrl"), search_data.get("picUrl"), search_data.get("albumpic"), search_data.get("hts_MVPIC"), search_data.get("pic"), ] cover_url = next( (str(value).strip() for value in cover_candidates if str(value or "").strip()), "", ) if not item.get("album"): album_candidates = [ snapshot.get("album"), raw_data.get("album"), search_data.get("album"), search_data.get("albumname"), search_data.get("albumName"), ] album_name = next( (str(value).strip() for value in album_candidates if str(value or "").strip()), "", ) item["album"] = album_name or None item["cover_url"] = cover_url or None item.pop("metadata_json", None) song_ids = [int(item["song_id"]) for item in items] placeholders = ", ".join("?" for _ in song_ids) location_rows = self._fetchall( f""" SELECT fa.song_id, fl.absolute_path, fl.public_url, fl.download_url, fl.container_name, fl.locator, fl.is_primary, sb.name AS backend_name, sb.backend_type FROM file_locations fl JOIN file_assets fa ON fa.id = fl.file_asset_id JOIN storage_backends sb ON sb.id = fl.backend_id WHERE fl.status = 'active' AND fa.song_id IN ({placeholders}) ORDER BY fa.song_id ASC, CASE WHEN sb.backend_type = 'local_fs' THEN 0 ELSE 1 END ASC, fl.is_primary DESC, fl.id ASC """, tuple(song_ids), ) locations_by_song: dict[int, list[dict[str, Any]]] = {} for row in location_rows: payload = dict(row) song_id = int(payload["song_id"]) locations_by_song.setdefault(song_id, []).append(payload) for item in items: song_id = int(item["song_id"]) local_file_path: str | None = None uploaded_locations: list[dict[str, Any]] = [] for location in locations_by_song.get(song_id, []): backend_type = str(location.get("backend_type") or "") if backend_type == "local_fs": if not local_file_path and location.get("absolute_path"): local_file_path = str(location["absolute_path"]) continue uploaded_locations.append( { "backend_name": str(location.get("backend_name") or ""), "backend_type": backend_type, "container_name": str(location.get("container_name") or ""), "locator": str(location.get("locator") or ""), "url": ( str(location.get("public_url") or "") or str(location.get("download_url") or "") or "" ), "public_url": str(location.get("public_url") or ""), "download_url": str(location.get("download_url") or ""), } ) item["local_file_path"] = local_file_path item["uploaded_locations"] = uploaded_locations return items def list_pool_playlist_links(self, pool_id: int) -> list[sqlite3.Row]: return self._fetchall( "SELECT * FROM pool_playlists WHERE pool_id = ? ORDER BY playlist_id ASC", (pool_id,), ) def get_pool_ids_for_playlist(self, playlist_id: int) -> list[int]: rows = self._fetchall( "SELECT pool_id FROM pool_playlists WHERE playlist_id = ? ORDER BY pool_id ASC", (playlist_id,), ) return [int(row["pool_id"]) for row in rows] def upsert_song(self, song: CatalogSong) -> int: with self._connection() as conn: conn.execute( """ INSERT INTO songs ( platform, remote_song_id, name, singers, album, ext, file_size_bytes, quality_label, metadata_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(platform, remote_song_id) DO UPDATE SET name = excluded.name, singers = excluded.singers, album = excluded.album, ext = excluded.ext, file_size_bytes = excluded.file_size_bytes, quality_label = excluded.quality_label, metadata_json = excluded.metadata_json, updated_at = CURRENT_TIMESTAMP """, ( song.platform, song.remote_song_id, song.name or song.remote_song_id, song.singers, song.album, song.ext, song.file_size_bytes, song.quality_label, json_dumps(song.metadata), ), ) row = conn.execute( """ SELECT id FROM songs WHERE platform = ? AND remote_song_id = ? """, (song.platform, song.remote_song_id), ).fetchone() return int(row["id"]) def get_song(self, song_id: int) -> sqlite3.Row | None: return self._fetchone("SELECT * FROM songs WHERE id = ?", (song_id,)) def link_playlist_song(self, playlist_id: int, song_id: int, position: int | None) -> None: self._execute( """ INSERT INTO playlist_songs (playlist_id, song_id, position) VALUES (?, ?, ?) ON CONFLICT(playlist_id, song_id) DO UPDATE SET position = excluded.position """, (playlist_id, song_id, position), ) def upsert_artist_pool( self, platform: str, pool_kind: str, external_id: str, name: str, source_playlist_pool_id: int | None = None, metadata: dict[str, Any] | None = None, ) -> int: with self._connection() as conn: conn.execute( """ INSERT INTO artist_pools ( platform, pool_kind, external_id, name, source_playlist_pool_id, metadata_json ) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(platform, pool_kind, external_id) DO UPDATE SET name = excluded.name, source_playlist_pool_id = excluded.source_playlist_pool_id, metadata_json = excluded.metadata_json, updated_at = CURRENT_TIMESTAMP """, ( platform, pool_kind, external_id, name, source_playlist_pool_id, json_dumps(metadata), ), ) row = conn.execute( """ SELECT id FROM artist_pools WHERE platform = ? AND pool_kind = ? AND external_id = ? """, (platform, pool_kind, external_id), ).fetchone() return int(row["id"]) def ensure_derived_artist_pool(self, platform: str, source_pool_id: int, source_pool_name: str) -> int: return self.upsert_artist_pool( platform=platform, pool_kind="derived_artist", external_id=f"derived:{source_pool_id}", name=f"{source_pool_name} 派生歌手池", source_playlist_pool_id=source_pool_id, ) def upsert_artist( self, platform: str, name: str, remote_artist_id: str | None = None, metadata: dict[str, Any] | None = None, ) -> int: normalized_name = name.strip().lower() artist_key = f"{platform}:{remote_artist_id or normalized_name}" with self._connection() as conn: conn.execute( """ INSERT INTO artists ( artist_key, platform, remote_artist_id, name, normalized_name, metadata_json ) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(artist_key) DO UPDATE SET name = excluded.name, metadata_json = excluded.metadata_json, updated_at = CURRENT_TIMESTAMP """, ( artist_key, platform, remote_artist_id, name, normalized_name, json_dumps(metadata), ), ) row = conn.execute( "SELECT id FROM artists WHERE artist_key = ?", (artist_key,), ).fetchone() return int(row["id"]) def link_pool_artist(self, pool_id: int, artist_id: int) -> None: self._execute( """ INSERT OR IGNORE INTO pool_artists (pool_id, artist_id) VALUES (?, ?) """, (pool_id, artist_id), ) def link_artist_song(self, artist_id: int, song_id: int) -> None: self._execute( """ INSERT OR IGNORE INTO artist_songs (artist_id, song_id) VALUES (?, ?) """, (artist_id, song_id), ) def get_default_backend_id(self) -> int: row = self._fetchone( """ SELECT id FROM storage_backends WHERE is_default = 1 ORDER BY id ASC LIMIT 1 """ ) if not row: raise RuntimeError("No default storage backend configured") return int(row["id"]) def get_default_local_library_root(self) -> Path | None: try: backend_id = self.get_default_backend_id() except Exception: return None backend = self.get_backend(backend_id) if not backend: return None if str(backend["backend_type"] or "") != "local_fs": return None base_path = str(backend["base_path"] or "").strip() if not base_path: return None return Path(base_path).resolve() def get_backend(self, backend_id: int) -> sqlite3.Row | None: return self._fetchone("SELECT * FROM storage_backends WHERE id = ?", (backend_id,)) def get_backend_by_name(self, name: str) -> sqlite3.Row | None: return self._fetchone("SELECT * FROM storage_backends WHERE name = ?", (name,)) def upsert_object_storage_backend( self, name: str, container_name: str, endpoint: str, region: str | None, base_prefix: str | None, credential_env_prefix: str, addressing_style: str | None = None, public_base_url: str | None = None, ) -> int: config = { "endpoint": endpoint, "region": region, "base_prefix": base_prefix, "addressing_style": addressing_style, "public_base_url": public_base_url, "credential_env_prefix": credential_env_prefix, } with self._connection() as conn: conn.execute( """ INSERT INTO storage_backends (name, backend_type, container_name, config_json) VALUES (?, 'object_storage', ?, ?) ON CONFLICT(name) DO UPDATE SET backend_type = excluded.backend_type, container_name = excluded.container_name, config_json = excluded.config_json, updated_at = CURRENT_TIMESTAMP """, (name, container_name, json_dumps(config)), ) row = conn.execute( "SELECT id FROM storage_backends WHERE name = ?", (name,), ).fetchone() return int(row["id"]) def ensure_local_backend(self, base_path: str | Path, name: str | None = None, is_default: bool = False) -> int: resolved_path = str(Path(base_path).resolve()) backend_name = name or f"local:{resolved_path}" with self._connection() as conn: conn.execute( """ INSERT INTO storage_backends (name, backend_type, base_path, is_default) VALUES (?, 'local_fs', ?, ?) ON CONFLICT(name) DO UPDATE SET base_path = excluded.base_path, is_default = CASE WHEN excluded.is_default = 1 THEN 1 ELSE storage_backends.is_default END, updated_at = CURRENT_TIMESTAMP """, (backend_name, resolved_path, 1 if is_default else 0), ) row = conn.execute( "SELECT id FROM storage_backends WHERE name = ?", (backend_name,), ).fetchone() return int(row["id"]) def record_local_file( self, song_id: int, backend_id: int, relative_path: str, file_size_bytes: int | None, ext: str | None, quality_label: str | None, ) -> int: backend = self.get_backend(backend_id) if not backend: raise RuntimeError(f"Unknown backend: {backend_id}") absolute_path = str(Path(backend["base_path"]) / relative_path) with self._connection() as conn: asset_row = conn.execute( """ SELECT id FROM file_assets WHERE song_id = ? AND COALESCE(quality_label, '') = COALESCE(?, '') AND COALESCE(ext, '') = COALESCE(?, '') AND COALESCE(file_size_bytes, -1) = COALESCE(?, -1) """, (song_id, quality_label, ext, file_size_bytes), ).fetchone() if asset_row: asset_id = int(asset_row["id"]) else: asset_id = conn.execute( """ INSERT INTO file_assets (song_id, quality_label, ext, file_size_bytes) VALUES (?, ?, ?, ?) """, (song_id, quality_label, ext, file_size_bytes), ).lastrowid conn.execute( """ INSERT INTO file_locations ( file_asset_id, backend_id, container_name, locator, absolute_path, status, is_primary ) VALUES (?, ?, ?, ?, ?, 'active', 1) ON CONFLICT(file_asset_id, backend_id, locator) DO UPDATE SET absolute_path = excluded.absolute_path, status = excluded.status, updated_at = CURRENT_TIMESTAMP """, (asset_id, backend_id, backend["base_path"], relative_path, absolute_path), ) return int(asset_id) def _refresh_song_backend_presence_with_connection( self, conn: sqlite3.Connection, song_id: int, backend_id: int ) -> None: summary = conn.execute( """ SELECT COUNT(*) AS active_file_count, MIN(fl.id) AS primary_file_location_id FROM file_locations fl JOIN file_assets fa ON fa.id = fl.file_asset_id WHERE fa.song_id = ? AND fl.backend_id = ? AND fl.status = 'active' """, (song_id, backend_id), ).fetchone() active_file_count = int(summary["active_file_count"]) if summary else 0 has_active_file = 1 if active_file_count > 0 else 0 primary_file_location_id = summary["primary_file_location_id"] if summary else None conn.execute( """ INSERT INTO song_backend_presence ( song_id, backend_id, has_active_file, active_file_count, primary_file_location_id ) VALUES (?, ?, ?, ?, ?) ON CONFLICT(song_id, backend_id) DO UPDATE SET has_active_file = excluded.has_active_file, active_file_count = excluded.active_file_count, primary_file_location_id = excluded.primary_file_location_id, updated_at = CURRENT_TIMESTAMP """, (song_id, backend_id, has_active_file, active_file_count, primary_file_location_id), ) def refresh_song_backend_presence(self, song_id: int, backend_id: int) -> None: with self._connection() as conn: self._refresh_song_backend_presence_with_connection(conn, song_id, backend_id) def get_song_backend_presence(self, song_id: int, backend_id: int) -> sqlite3.Row | None: return self._fetchone( "SELECT * FROM song_backend_presence WHERE song_id = ? AND backend_id = ?", (song_id, backend_id), ) def song_has_active_backend_file(self, song_id: int, backend_id: int) -> bool: row = self.get_song_backend_presence(song_id=song_id, backend_id=backend_id) if row is None: return False return bool(int(row["has_active_file"])) def record_remote_file( self, file_asset_id: int, backend_id: int, container_name: str, locator: str, public_url: str | None, download_url: str | None, ) -> int: with self._connection() as conn: conn.execute( """ INSERT INTO file_locations ( file_asset_id, backend_id, container_name, locator, absolute_path, public_url, download_url, status, is_primary ) VALUES (?, ?, ?, ?, NULL, ?, ?, 'active', 0) ON CONFLICT(file_asset_id, backend_id, locator) DO UPDATE SET container_name = excluded.container_name, public_url = excluded.public_url, download_url = excluded.download_url, status = excluded.status, is_primary = 0, updated_at = CURRENT_TIMESTAMP """, (file_asset_id, backend_id, container_name, locator, public_url, download_url), ) location = conn.execute( """ SELECT id FROM file_locations WHERE file_asset_id = ? AND backend_id = ? AND locator = ? """, (file_asset_id, backend_id, locator), ).fetchone() asset = conn.execute( "SELECT song_id FROM file_assets WHERE id = ?", (file_asset_id,), ).fetchone() if asset: self._refresh_song_backend_presence_with_connection(conn, int(asset["song_id"]), backend_id) return int(location["id"]) def song_has_active_local_file(self, song_id: int) -> bool: row = self._fetchone( """ SELECT 1 FROM file_locations fl JOIN storage_backends sb ON sb.id = fl.backend_id JOIN file_assets fa ON fa.id = fl.file_asset_id WHERE fa.song_id = ? AND fl.status = 'active' AND sb.backend_type = 'local_fs' LIMIT 1 """, (song_id,), ) return bool(row) def list_pending_download_songs( self, sources: list[str] | None = None, limit: int | None = None, playlist_ids: list[int] | None = None, ) -> list[sqlite3.Row]: if playlist_ids is not None and not playlist_ids: return [] query = ["SELECT DISTINCT s.*", "FROM songs s"] params: list[Any] = [] where_clauses: list[str] = [ """ NOT EXISTS ( SELECT 1 FROM file_locations fl JOIN file_assets fa ON fa.id = fl.file_asset_id JOIN storage_backends sb ON sb.id = fl.backend_id WHERE fa.song_id = s.id AND fl.status = 'active' AND sb.backend_type = 'local_fs' ) """.strip() ] if playlist_ids: placeholders = ", ".join("?" for _ in playlist_ids) query.append("JOIN playlist_songs ps ON ps.song_id = s.id") where_clauses.append(f"ps.playlist_id IN ({placeholders})") params.extend(playlist_ids) if sources: placeholders = ", ".join("?" for _ in sources) where_clauses.append(f"s.platform IN ({placeholders})") params.extend(sources) query.append("WHERE " + " AND ".join(where_clauses)) query.append("ORDER BY s.id ASC") if limit is not None: query.append("LIMIT ?") params.append(limit) return self._fetchall("\n".join(query), tuple(params)) def list_local_songs_for_lyrics( self, sources: list[str] | None = None, limit: int | None = None, playlist_ids: list[int] | None = None, ) -> list[sqlite3.Row]: if playlist_ids is not None and not playlist_ids: return [] query = [ "SELECT DISTINCT s.*,", "(", " SELECT fl.absolute_path", " FROM file_locations fl", " JOIN file_assets fa ON fa.id = fl.file_asset_id", " JOIN storage_backends sb ON sb.id = fl.backend_id", " WHERE fa.song_id = s.id", " AND fl.status = 'active'", " AND sb.backend_type = 'local_fs'", " ORDER BY fl.is_primary DESC, fl.id ASC", " LIMIT 1", ") AS local_file_path", "FROM songs s", ] params: list[Any] = [] where_clauses: list[str] = [ """ EXISTS ( SELECT 1 FROM file_locations fl JOIN file_assets fa ON fa.id = fl.file_asset_id JOIN storage_backends sb ON sb.id = fl.backend_id WHERE fa.song_id = s.id AND fl.status = 'active' AND sb.backend_type = 'local_fs' ) """.strip() ] if playlist_ids: placeholders = ", ".join("?" for _ in playlist_ids) query.append("JOIN playlist_songs ps ON ps.song_id = s.id") where_clauses.append(f"ps.playlist_id IN ({placeholders})") params.extend(playlist_ids) if sources: placeholders = ", ".join("?" for _ in sources) where_clauses.append(f"s.platform IN ({placeholders})") params.extend(sources) query.append("WHERE " + " AND ".join(where_clauses)) query.append("ORDER BY s.id ASC") if limit is not None: query.append("LIMIT ?") params.append(limit) return self._fetchall("\n".join(query), tuple(params)) def enqueue_upload_task( self, file_asset_id: int, source_location_id: int, target_backend_id: int, target_container_name: str | None, target_locator: str, ) -> int: with self._connection() as conn: conn.execute( """ INSERT INTO upload_tasks ( file_asset_id, source_location_id, target_backend_id, target_container_name, target_locator ) VALUES (?, ?, ?, ?, ?) ON CONFLICT(file_asset_id, target_backend_id, target_locator) DO UPDATE SET source_location_id = excluded.source_location_id, target_container_name = excluded.target_container_name, status = CASE WHEN upload_tasks.status IN ('uploading', 'succeeded') THEN upload_tasks.status ELSE 'pending' END, last_error = CASE WHEN upload_tasks.status IN ('uploading', 'succeeded') THEN upload_tasks.last_error ELSE NULL END, started_at = CASE WHEN upload_tasks.status = 'uploading' THEN upload_tasks.started_at ELSE NULL END, finished_at = CASE WHEN upload_tasks.status IN ('uploading', 'succeeded') THEN upload_tasks.finished_at ELSE NULL END, updated_at = CURRENT_TIMESTAMP """, (file_asset_id, source_location_id, target_backend_id, target_container_name, target_locator), ) row = conn.execute( """ SELECT id FROM upload_tasks WHERE file_asset_id = ? AND target_backend_id = ? AND target_locator = ? """, (file_asset_id, target_backend_id, target_locator), ).fetchone() return int(row["id"]) def list_pending_upload_tasks( self, target_backend_id: int, limit: int | None = None, ) -> list[sqlite3.Row]: query = [ "SELECT ut.*, fl.absolute_path, fa.song_id", "FROM upload_tasks ut", "JOIN file_locations fl ON fl.id = ut.source_location_id", "JOIN file_assets fa ON fa.id = ut.file_asset_id", "WHERE ut.target_backend_id = ?", "AND ut.status = 'pending'", "ORDER BY ut.id ASC", ] params: list[Any] = [target_backend_id] if limit is not None: query.append("LIMIT ?") params.append(limit) return self._fetchall("\n".join(query), tuple(params)) def claim_next_upload_task(self, target_backend_id: int) -> sqlite3.Row | None: conn = self._connect() try: conn.execute("BEGIN IMMEDIATE") row = conn.execute( """ SELECT id FROM upload_tasks WHERE target_backend_id = ? AND status = 'pending' ORDER BY id ASC LIMIT 1 """, (target_backend_id,), ).fetchone() if row is None: conn.commit() return None task_id = int(row["id"]) updated = conn.execute( """ UPDATE upload_tasks SET status = 'uploading', attempts = attempts + 1, last_error = NULL, started_at = CURRENT_TIMESTAMP, finished_at = NULL, updated_at = CURRENT_TIMESTAMP WHERE id = ? AND status = 'pending' """, (task_id,), ).rowcount if updated != 1: conn.rollback() return None claimed = conn.execute( """ SELECT ut.*, fl.absolute_path, fl.locator AS source_locator, fa.song_id FROM upload_tasks ut JOIN file_locations fl ON fl.id = ut.source_location_id JOIN file_assets fa ON fa.id = ut.file_asset_id WHERE ut.id = ? """, (task_id,), ).fetchone() conn.commit() return claimed finally: conn.close() def mark_upload_task_status(self, task_id: int, status: str, last_error: str | None = None) -> None: if status not in UPLOAD_TASK_STATUSES: raise ValueError(f"Unsupported upload task status: {status}") allowed_previous_statuses = { "pending": {"failed", "skipped"}, "uploading": {"pending", "failed"}, "succeeded": {"uploading"}, "failed": {"uploading"}, "skipped": {"pending", "uploading"}, } with self._connection() as conn: current = conn.execute( "SELECT status FROM upload_tasks WHERE id = ?", (task_id,), ).fetchone() if current is None: raise RuntimeError(f"Unknown upload task: {task_id}") current_status = str(current["status"]) if current_status not in allowed_previous_statuses[status]: raise RuntimeError( f"Unsupported upload task transition: {current_status} -> {status}" ) conn.execute( """ UPDATE upload_tasks SET status = ?, last_error = ?, attempts = CASE WHEN ? = 'uploading' THEN attempts + 1 ELSE attempts END, started_at = CASE WHEN ? = 'uploading' THEN CURRENT_TIMESTAMP WHEN ? = 'pending' THEN NULL ELSE started_at END, finished_at = CASE WHEN ? IN ('succeeded', 'failed', 'skipped') THEN CURRENT_TIMESTAMP ELSE NULL END, updated_at = CURRENT_TIMESTAMP WHERE id = ? AND status = ? """, (status, last_error, status, status, status, status, task_id, current_status), ) def list_missing_object_upload_candidates( self, target_backend_id: int, sources: list[str] | None = None, limit: int | None = None, playlist_ids: list[int] | None = None, ) -> list[sqlite3.Row]: if playlist_ids is not None and not playlist_ids: return [] backend = self.get_backend(target_backend_id) if backend is None: raise RuntimeError(f"Unknown backend: {target_backend_id}") if backend["backend_type"] != "object_storage": raise RuntimeError(f"Backend {target_backend_id} is not object storage") config = json.loads(backend["config_json"] or "{}") base_prefix = str(config.get("base_prefix") or "").strip("/") query = [ "SELECT DISTINCT", " s.id AS song_id,", " fa.id AS file_asset_id,", " fl.id AS source_location_id,", " fl.locator AS source_locator,", " ? AS target_container_name,", " CASE", " WHEN ? = '' THEN fl.locator", " ELSE (? || '/' || fl.locator)", " END AS target_locator", "FROM file_locations fl", "JOIN file_assets fa ON fa.id = fl.file_asset_id", "JOIN songs s ON s.id = fa.song_id", "JOIN storage_backends sb ON sb.id = fl.backend_id", ] params: list[Any] = [backend["container_name"], base_prefix, base_prefix] if playlist_ids: query.append("JOIN playlist_songs ps ON ps.song_id = s.id") where_clauses: list[str] = [ "fl.status = 'active'", "sb.backend_type = 'local_fs'", """ NOT EXISTS ( SELECT 1 FROM file_locations remote WHERE remote.file_asset_id = fa.id AND remote.backend_id = ? AND remote.status = 'active' AND remote.locator = CASE WHEN ? = '' THEN fl.locator ELSE (? || '/' || fl.locator) END ) """.strip(), ] params.extend([target_backend_id, base_prefix, base_prefix]) if sources: placeholders = ", ".join("?" for _ in sources) where_clauses.append(f"s.platform IN ({placeholders})") params.extend(sources) if playlist_ids: placeholders = ", ".join("?" for _ in playlist_ids) where_clauses.append(f"ps.playlist_id IN ({placeholders})") params.extend(playlist_ids) query.append("WHERE " + " AND ".join(where_clauses)) query.append("ORDER BY s.id ASC, fa.id ASC, fl.id ASC") if limit is not None: query.append("LIMIT ?") params.append(limit) return self._fetchall("\n".join(query), tuple(params))