from __future__ import annotations import sqlite3 from contextlib import suppress from pathlib import Path SQLITE_BUSY_TIMEOUT_MS = 30000 REQUIRED_TABLES = { "playlist_pools", "playlists", "playlist_download_preferences", "pool_playlists", "artist_pools", "artists", "pool_artists", "songs", "playlist_songs", "artist_songs", "storage_backends", "file_assets", "file_locations", "download_tasks", "song_backend_presence", "upload_tasks", "job_runs", "job_stages", "job_items", "job_workers", "job_commands", "job_events", "job_logs", "config_revisions", } SCHEMA_STATEMENTS = [ """ CREATE TABLE IF NOT EXISTS playlist_pools ( id INTEGER PRIMARY KEY AUTOINCREMENT, platform TEXT NOT NULL, pool_kind TEXT NOT NULL, external_id TEXT NOT NULL, name TEXT NOT NULL, url TEXT, metadata_json TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, UNIQUE(platform, pool_kind, external_id) ) """, """ CREATE TABLE IF NOT EXISTS playlists ( id INTEGER PRIMARY KEY AUTOINCREMENT, platform TEXT NOT NULL, remote_playlist_id TEXT NOT NULL, name TEXT NOT NULL, url TEXT NOT NULL, parse_strategy TEXT NOT NULL DEFAULT 'playlist_url', cover_url TEXT, creator_name TEXT, play_count INTEGER, collected_song_count INTEGER, metadata_json TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, UNIQUE(platform, remote_playlist_id) ) """, """ CREATE TABLE IF NOT EXISTS playlist_download_preferences ( playlist_id INTEGER PRIMARY KEY, is_wanted INTEGER NOT NULL DEFAULT 1, marked_by TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ) """, """ CREATE TABLE IF NOT EXISTS pool_playlists ( pool_id INTEGER NOT NULL, playlist_id INTEGER NOT NULL, discovered_at TEXT DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(pool_id, playlist_id) ) """, """ CREATE TABLE IF NOT EXISTS artist_pools ( id INTEGER PRIMARY KEY AUTOINCREMENT, platform TEXT NOT NULL, pool_kind TEXT NOT NULL, external_id TEXT NOT NULL, name TEXT NOT NULL, source_playlist_pool_id INTEGER, metadata_json TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, UNIQUE(platform, pool_kind, external_id) ) """, """ CREATE TABLE IF NOT EXISTS artists ( id INTEGER PRIMARY KEY AUTOINCREMENT, artist_key TEXT NOT NULL UNIQUE, platform TEXT NOT NULL, remote_artist_id TEXT, name TEXT NOT NULL, normalized_name TEXT NOT NULL, metadata_json TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ) """, """ CREATE TABLE IF NOT EXISTS pool_artists ( pool_id INTEGER NOT NULL, artist_id INTEGER NOT NULL, discovered_at TEXT DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(pool_id, artist_id) ) """, """ CREATE TABLE IF NOT EXISTS songs ( id INTEGER PRIMARY KEY AUTOINCREMENT, platform TEXT NOT NULL, remote_song_id TEXT NOT NULL, name TEXT NOT NULL, singers TEXT, album TEXT, duration_seconds INTEGER, ext TEXT, file_size_bytes INTEGER, quality_label TEXT, metadata_json TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, UNIQUE(platform, remote_song_id) ) """, """ CREATE TABLE IF NOT EXISTS playlist_songs ( playlist_id INTEGER NOT NULL, song_id INTEGER NOT NULL, position INTEGER, discovered_at TEXT DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(playlist_id, song_id) ) """, """ CREATE TABLE IF NOT EXISTS artist_songs ( artist_id INTEGER NOT NULL, song_id INTEGER NOT NULL, discovered_at TEXT DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(artist_id, song_id) ) """, """ CREATE TABLE IF NOT EXISTS storage_backends ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, backend_type TEXT NOT NULL, base_path TEXT, container_name TEXT, config_json TEXT, is_default INTEGER NOT NULL DEFAULT 0, is_active INTEGER NOT NULL DEFAULT 1, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ) """, """ CREATE TABLE IF NOT EXISTS file_assets ( id INTEGER PRIMARY KEY AUTOINCREMENT, song_id INTEGER NOT NULL, quality_label TEXT, ext TEXT, file_size_bytes INTEGER, checksum_sha256 TEXT, metadata_json TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ) """, """ CREATE TABLE IF NOT EXISTS file_locations ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_asset_id INTEGER NOT NULL, backend_id INTEGER NOT NULL, container_name TEXT, locator TEXT NOT NULL, absolute_path TEXT, remote_file_id TEXT, public_url TEXT, download_url TEXT, status TEXT NOT NULL DEFAULT 'active', is_primary INTEGER NOT NULL DEFAULT 1, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, UNIQUE(file_asset_id, backend_id, locator) ) """, """ CREATE TABLE IF NOT EXISTS download_tasks ( id INTEGER PRIMARY KEY AUTOINCREMENT, song_id INTEGER NOT NULL, target_backend_id INTEGER, status TEXT NOT NULL DEFAULT 'pending', attempts INTEGER NOT NULL DEFAULT 0, last_error TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ) """, """ CREATE TABLE IF NOT EXISTS song_backend_presence ( song_id INTEGER NOT NULL, backend_id INTEGER NOT NULL, has_active_file INTEGER NOT NULL DEFAULT 0, active_file_count INTEGER NOT NULL DEFAULT 0, primary_file_location_id INTEGER, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(song_id, backend_id) ) """, """ CREATE TABLE IF NOT EXISTS upload_tasks ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_asset_id INTEGER NOT NULL, source_location_id INTEGER NOT NULL, target_backend_id INTEGER NOT NULL, target_container_name TEXT, target_locator TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', attempts INTEGER NOT NULL DEFAULT 0, last_error TEXT, queued_at TEXT DEFAULT CURRENT_TIMESTAMP, started_at TEXT, finished_at TEXT, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, UNIQUE(file_asset_id, target_backend_id, target_locator) ) """, """ CREATE TABLE IF NOT EXISTS job_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_type TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'queued', priority INTEGER NOT NULL DEFAULT 100, requested_by TEXT, config_snapshot_json TEXT NOT NULL, sources TEXT, download_sources TEXT, playlist_scope_json TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, started_at TEXT, ended_at TEXT, last_error TEXT, resume_token TEXT ) """, """ CREATE TABLE IF NOT EXISTS job_stages ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_run_id INTEGER NOT NULL, stage_type TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', seq_no INTEGER NOT NULL DEFAULT 0, total_items INTEGER NOT NULL DEFAULT 0, pending_items INTEGER NOT NULL DEFAULT 0, running_items INTEGER NOT NULL DEFAULT 0, success_items INTEGER NOT NULL DEFAULT 0, failed_items INTEGER NOT NULL DEFAULT 0, skipped_items INTEGER NOT NULL DEFAULT 0, started_at TEXT, ended_at TEXT, last_error TEXT, UNIQUE(job_run_id, stage_type) ) """, """ CREATE TABLE IF NOT EXISTS job_items ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_stage_id INTEGER NOT NULL, item_type TEXT NOT NULL, item_key TEXT NOT NULL, playlist_pool_id INTEGER, playlist_id INTEGER, song_id INTEGER, file_location_id INTEGER, status TEXT NOT NULL DEFAULT 'pending', attempt_count INTEGER NOT NULL DEFAULT 0, max_attempts INTEGER NOT NULL DEFAULT 3, worker_id INTEGER, started_at TEXT, ended_at TEXT, last_error TEXT, last_error_code TEXT, payload_json TEXT, UNIQUE(job_stage_id, item_key) ) """, """ CREATE TABLE IF NOT EXISTS job_workers ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_run_id INTEGER, job_stage_id INTEGER, worker_name TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'idle', current_job_item_id INTEGER, current_song_id INTEGER, current_playlist_id INTEGER, current_display_text TEXT, heartbeat_at TEXT, last_progress_text TEXT, processed_count INTEGER NOT NULL DEFAULT 0, error_count INTEGER NOT NULL DEFAULT 0, downloaded_bytes INTEGER, total_bytes INTEGER, speed_bytes_per_sec INTEGER, progress_percent REAL ) """, """ CREATE TABLE IF NOT EXISTS job_commands ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_run_id INTEGER NOT NULL, command_type TEXT NOT NULL, target_item_id INTEGER, status TEXT NOT NULL DEFAULT 'pending', created_at TEXT DEFAULT CURRENT_TIMESTAMP, applied_at TEXT, payload_json TEXT ) """, """ CREATE TABLE IF NOT EXISTS job_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_run_id INTEGER NOT NULL, job_stage_id INTEGER, job_item_id INTEGER, worker_id INTEGER, event_type TEXT NOT NULL, message TEXT, details_json TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP ) """, """ CREATE TABLE IF NOT EXISTS job_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_run_id INTEGER NOT NULL, job_stage_id INTEGER, worker_id INTEGER, level TEXT NOT NULL DEFAULT 'info', message TEXT NOT NULL, created_at TEXT DEFAULT CURRENT_TIMESTAMP ) """, """ CREATE TABLE IF NOT EXISTS config_revisions ( id INTEGER PRIMARY KEY AUTOINCREMENT, source_type TEXT NOT NULL DEFAULT 'env_file', file_path TEXT NOT NULL, content_text TEXT NOT NULL, content_hash TEXT NOT NULL, created_at TEXT DEFAULT CURRENT_TIMESTAMP, applied_at TEXT, note TEXT, UNIQUE(source_type, file_path, content_hash) ) """, """ CREATE INDEX IF NOT EXISTS idx_playlist_download_preferences_is_wanted ON playlist_download_preferences (is_wanted, updated_at DESC) """, """ CREATE INDEX IF NOT EXISTS idx_pool_playlists_playlist_id ON pool_playlists (playlist_id, pool_id) """, """ CREATE INDEX IF NOT EXISTS idx_playlist_songs_song_id ON playlist_songs (song_id, playlist_id) """, """ CREATE INDEX IF NOT EXISTS idx_file_assets_song_id ON file_assets (song_id) """, """ CREATE INDEX IF NOT EXISTS idx_job_items_running_song_id ON job_items (song_id, status) """, ] def connect_database(db_path: str | Path) -> sqlite3.Connection: path = Path(db_path) path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(path, timeout=SQLITE_BUSY_TIMEOUT_MS / 1000) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") conn.execute(f"PRAGMA busy_timeout = {SQLITE_BUSY_TIMEOUT_MS}") with suppress(sqlite3.OperationalError): conn.execute("PRAGMA journal_mode = WAL") with suppress(sqlite3.OperationalError): conn.execute("PRAGMA synchronous = NORMAL") return conn def ensure_default_local_backend(conn: sqlite3.Connection, library_root: str | Path) -> None: resolved_root = str(Path(library_root).resolve()) conn.execute( """ INSERT INTO storage_backends (name, backend_type, base_path, is_default) VALUES (?, ?, ?, 1) ON CONFLICT(name) DO UPDATE SET backend_type = excluded.backend_type, base_path = excluded.base_path, is_default = excluded.is_default, updated_at = CURRENT_TIMESTAMP """, ("default-local", "local_fs", resolved_root), ) _JOB_WORKER_THROUGHPUT_COLUMNS: dict[str, str] = { "downloaded_bytes": "INTEGER", "total_bytes": "INTEGER", "speed_bytes_per_sec": "INTEGER", "progress_percent": "REAL", } _PLAYLIST_COLUMNS: dict[str, str] = { "play_count": "INTEGER", "collected_song_count": "INTEGER", } def _ensure_table_columns( conn: sqlite3.Connection, *, table_name: str, required_columns: dict[str, str], ) -> None: table_exists = conn.execute( "SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?", (table_name,), ).fetchone() if table_exists is None: return existing_columns = { str(row["name"]) for row in conn.execute(f"PRAGMA table_info({table_name})").fetchall() } for column_name, column_type in required_columns.items(): if column_name in existing_columns: continue conn.execute( f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_type}" ) def _ensure_job_worker_throughput_columns(conn: sqlite3.Connection) -> None: _ensure_table_columns( conn, table_name="job_workers", required_columns=_JOB_WORKER_THROUGHPUT_COLUMNS, ) def _ensure_playlist_columns(conn: sqlite3.Connection) -> None: _ensure_table_columns( conn, table_name="playlists", required_columns=_PLAYLIST_COLUMNS, ) def initialize_database( db_path: str | Path, default_library_root: str | Path | None = None, ) -> sqlite3.Connection: conn = connect_database(db_path) for statement in SCHEMA_STATEMENTS: conn.execute(statement) _ensure_job_worker_throughput_columns(conn) _ensure_playlist_columns(conn) if default_library_root is not None: Path(default_library_root).mkdir(parents=True, exist_ok=True) ensure_default_local_backend(conn, default_library_root) conn.commit() return conn