Files

493 lines
15 KiB
Python

from __future__ import annotations
import sqlite3
from contextlib import suppress
from pathlib import Path
SQLITE_BUSY_TIMEOUT_MS = 30000
REQUIRED_TABLES = {
"playlist_pools",
"playlists",
"playlist_download_preferences",
"pool_playlists",
"artist_pools",
"artists",
"pool_artists",
"songs",
"playlist_songs",
"artist_songs",
"storage_backends",
"file_assets",
"file_locations",
"download_tasks",
"song_backend_presence",
"upload_tasks",
"job_runs",
"job_stages",
"job_items",
"job_workers",
"job_commands",
"job_events",
"job_logs",
"config_revisions",
}
SCHEMA_STATEMENTS = [
"""
CREATE TABLE IF NOT EXISTS playlist_pools (
id INTEGER PRIMARY KEY AUTOINCREMENT,
platform TEXT NOT NULL,
pool_kind TEXT NOT NULL,
external_id TEXT NOT NULL,
name TEXT NOT NULL,
url TEXT,
metadata_json TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(platform, pool_kind, external_id)
)
""",
"""
CREATE TABLE IF NOT EXISTS playlists (
id INTEGER PRIMARY KEY AUTOINCREMENT,
platform TEXT NOT NULL,
remote_playlist_id TEXT NOT NULL,
name TEXT NOT NULL,
url TEXT NOT NULL,
parse_strategy TEXT NOT NULL DEFAULT 'playlist_url',
cover_url TEXT,
creator_name TEXT,
play_count INTEGER,
collected_song_count INTEGER,
metadata_json TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(platform, remote_playlist_id)
)
""",
"""
CREATE TABLE IF NOT EXISTS playlist_download_preferences (
playlist_id INTEGER PRIMARY KEY,
is_wanted INTEGER NOT NULL DEFAULT 1,
marked_by TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""",
"""
CREATE TABLE IF NOT EXISTS pool_playlists (
pool_id INTEGER NOT NULL,
playlist_id INTEGER NOT NULL,
discovered_at TEXT DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(pool_id, playlist_id)
)
""",
"""
CREATE TABLE IF NOT EXISTS artist_pools (
id INTEGER PRIMARY KEY AUTOINCREMENT,
platform TEXT NOT NULL,
pool_kind TEXT NOT NULL,
external_id TEXT NOT NULL,
name TEXT NOT NULL,
source_playlist_pool_id INTEGER,
metadata_json TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(platform, pool_kind, external_id)
)
""",
"""
CREATE TABLE IF NOT EXISTS artists (
id INTEGER PRIMARY KEY AUTOINCREMENT,
artist_key TEXT NOT NULL UNIQUE,
platform TEXT NOT NULL,
remote_artist_id TEXT,
name TEXT NOT NULL,
normalized_name TEXT NOT NULL,
metadata_json TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""",
"""
CREATE TABLE IF NOT EXISTS pool_artists (
pool_id INTEGER NOT NULL,
artist_id INTEGER NOT NULL,
discovered_at TEXT DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(pool_id, artist_id)
)
""",
"""
CREATE TABLE IF NOT EXISTS songs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
platform TEXT NOT NULL,
remote_song_id TEXT NOT NULL,
name TEXT NOT NULL,
singers TEXT,
album TEXT,
duration_seconds INTEGER,
ext TEXT,
file_size_bytes INTEGER,
quality_label TEXT,
metadata_json TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(platform, remote_song_id)
)
""",
"""
CREATE TABLE IF NOT EXISTS playlist_songs (
playlist_id INTEGER NOT NULL,
song_id INTEGER NOT NULL,
position INTEGER,
discovered_at TEXT DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(playlist_id, song_id)
)
""",
"""
CREATE TABLE IF NOT EXISTS artist_songs (
artist_id INTEGER NOT NULL,
song_id INTEGER NOT NULL,
discovered_at TEXT DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(artist_id, song_id)
)
""",
"""
CREATE TABLE IF NOT EXISTS storage_backends (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
backend_type TEXT NOT NULL,
base_path TEXT,
container_name TEXT,
config_json TEXT,
is_default INTEGER NOT NULL DEFAULT 0,
is_active INTEGER NOT NULL DEFAULT 1,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""",
"""
CREATE TABLE IF NOT EXISTS file_assets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
song_id INTEGER NOT NULL,
quality_label TEXT,
ext TEXT,
file_size_bytes INTEGER,
checksum_sha256 TEXT,
metadata_json TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""",
"""
CREATE TABLE IF NOT EXISTS file_locations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_asset_id INTEGER NOT NULL,
backend_id INTEGER NOT NULL,
container_name TEXT,
locator TEXT NOT NULL,
absolute_path TEXT,
remote_file_id TEXT,
public_url TEXT,
download_url TEXT,
status TEXT NOT NULL DEFAULT 'active',
is_primary INTEGER NOT NULL DEFAULT 1,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(file_asset_id, backend_id, locator)
)
""",
"""
CREATE TABLE IF NOT EXISTS download_tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
song_id INTEGER NOT NULL,
target_backend_id INTEGER,
status TEXT NOT NULL DEFAULT 'pending',
attempts INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""",
"""
CREATE TABLE IF NOT EXISTS song_backend_presence (
song_id INTEGER NOT NULL,
backend_id INTEGER NOT NULL,
has_active_file INTEGER NOT NULL DEFAULT 0,
active_file_count INTEGER NOT NULL DEFAULT 0,
primary_file_location_id INTEGER,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(song_id, backend_id)
)
""",
"""
CREATE TABLE IF NOT EXISTS upload_tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_asset_id INTEGER NOT NULL,
source_location_id INTEGER NOT NULL,
target_backend_id INTEGER NOT NULL,
target_container_name TEXT,
target_locator TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
attempts INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
queued_at TEXT DEFAULT CURRENT_TIMESTAMP,
started_at TEXT,
finished_at TEXT,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(file_asset_id, target_backend_id, target_locator)
)
""",
"""
CREATE TABLE IF NOT EXISTS job_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_type TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'queued',
priority INTEGER NOT NULL DEFAULT 100,
requested_by TEXT,
config_snapshot_json TEXT NOT NULL,
sources TEXT,
download_sources TEXT,
playlist_scope_json TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
started_at TEXT,
ended_at TEXT,
last_error TEXT,
resume_token TEXT
)
""",
"""
CREATE TABLE IF NOT EXISTS job_stages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_run_id INTEGER NOT NULL,
stage_type TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
seq_no INTEGER NOT NULL DEFAULT 0,
total_items INTEGER NOT NULL DEFAULT 0,
pending_items INTEGER NOT NULL DEFAULT 0,
running_items INTEGER NOT NULL DEFAULT 0,
success_items INTEGER NOT NULL DEFAULT 0,
failed_items INTEGER NOT NULL DEFAULT 0,
skipped_items INTEGER NOT NULL DEFAULT 0,
started_at TEXT,
ended_at TEXT,
last_error TEXT,
UNIQUE(job_run_id, stage_type)
)
""",
"""
CREATE TABLE IF NOT EXISTS job_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_stage_id INTEGER NOT NULL,
item_type TEXT NOT NULL,
item_key TEXT NOT NULL,
playlist_pool_id INTEGER,
playlist_id INTEGER,
song_id INTEGER,
file_location_id INTEGER,
status TEXT NOT NULL DEFAULT 'pending',
attempt_count INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 3,
worker_id INTEGER,
started_at TEXT,
ended_at TEXT,
last_error TEXT,
last_error_code TEXT,
payload_json TEXT,
UNIQUE(job_stage_id, item_key)
)
""",
"""
CREATE TABLE IF NOT EXISTS job_workers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_run_id INTEGER,
job_stage_id INTEGER,
worker_name TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'idle',
current_job_item_id INTEGER,
current_song_id INTEGER,
current_playlist_id INTEGER,
current_display_text TEXT,
heartbeat_at TEXT,
last_progress_text TEXT,
processed_count INTEGER NOT NULL DEFAULT 0,
error_count INTEGER NOT NULL DEFAULT 0,
downloaded_bytes INTEGER,
total_bytes INTEGER,
speed_bytes_per_sec INTEGER,
progress_percent REAL
)
""",
"""
CREATE TABLE IF NOT EXISTS job_commands (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_run_id INTEGER NOT NULL,
command_type TEXT NOT NULL,
target_item_id INTEGER,
status TEXT NOT NULL DEFAULT 'pending',
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
applied_at TEXT,
payload_json TEXT
)
""",
"""
CREATE TABLE IF NOT EXISTS job_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_run_id INTEGER NOT NULL,
job_stage_id INTEGER,
job_item_id INTEGER,
worker_id INTEGER,
event_type TEXT NOT NULL,
message TEXT,
details_json TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""",
"""
CREATE TABLE IF NOT EXISTS job_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_run_id INTEGER NOT NULL,
job_stage_id INTEGER,
worker_id INTEGER,
level TEXT NOT NULL DEFAULT 'info',
message TEXT NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""",
"""
CREATE TABLE IF NOT EXISTS config_revisions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_type TEXT NOT NULL DEFAULT 'env_file',
file_path TEXT NOT NULL,
content_text TEXT NOT NULL,
content_hash TEXT NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
applied_at TEXT,
note TEXT,
UNIQUE(source_type, file_path, content_hash)
)
""",
"""
CREATE INDEX IF NOT EXISTS idx_playlist_download_preferences_is_wanted
ON playlist_download_preferences (is_wanted, updated_at DESC)
""",
"""
CREATE INDEX IF NOT EXISTS idx_pool_playlists_playlist_id
ON pool_playlists (playlist_id, pool_id)
""",
"""
CREATE INDEX IF NOT EXISTS idx_playlist_songs_song_id
ON playlist_songs (song_id, playlist_id)
""",
"""
CREATE INDEX IF NOT EXISTS idx_file_assets_song_id
ON file_assets (song_id)
""",
"""
CREATE INDEX IF NOT EXISTS idx_job_items_running_song_id
ON job_items (song_id, status)
""",
]
def connect_database(db_path: str | Path) -> sqlite3.Connection:
path = Path(db_path)
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(path, timeout=SQLITE_BUSY_TIMEOUT_MS / 1000)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
conn.execute(f"PRAGMA busy_timeout = {SQLITE_BUSY_TIMEOUT_MS}")
with suppress(sqlite3.OperationalError):
conn.execute("PRAGMA journal_mode = WAL")
with suppress(sqlite3.OperationalError):
conn.execute("PRAGMA synchronous = NORMAL")
return conn
def ensure_default_local_backend(conn: sqlite3.Connection, library_root: str | Path) -> None:
resolved_root = str(Path(library_root).resolve())
conn.execute(
"""
INSERT INTO storage_backends (name, backend_type, base_path, is_default)
VALUES (?, ?, ?, 1)
ON CONFLICT(name) DO UPDATE SET
backend_type = excluded.backend_type,
base_path = excluded.base_path,
is_default = excluded.is_default,
updated_at = CURRENT_TIMESTAMP
""",
("default-local", "local_fs", resolved_root),
)
_JOB_WORKER_THROUGHPUT_COLUMNS: dict[str, str] = {
"downloaded_bytes": "INTEGER",
"total_bytes": "INTEGER",
"speed_bytes_per_sec": "INTEGER",
"progress_percent": "REAL",
}
_PLAYLIST_COLUMNS: dict[str, str] = {
"play_count": "INTEGER",
"collected_song_count": "INTEGER",
}
def _ensure_table_columns(
conn: sqlite3.Connection,
*,
table_name: str,
required_columns: dict[str, str],
) -> None:
table_exists = conn.execute(
"SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?",
(table_name,),
).fetchone()
if table_exists is None:
return
existing_columns = {
str(row["name"])
for row in conn.execute(f"PRAGMA table_info({table_name})").fetchall()
}
for column_name, column_type in required_columns.items():
if column_name in existing_columns:
continue
conn.execute(
f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_type}"
)
def _ensure_job_worker_throughput_columns(conn: sqlite3.Connection) -> None:
_ensure_table_columns(
conn,
table_name="job_workers",
required_columns=_JOB_WORKER_THROUGHPUT_COLUMNS,
)
def _ensure_playlist_columns(conn: sqlite3.Connection) -> None:
_ensure_table_columns(
conn,
table_name="playlists",
required_columns=_PLAYLIST_COLUMNS,
)
def initialize_database(
db_path: str | Path,
default_library_root: str | Path | None = None,
) -> sqlite3.Connection:
conn = connect_database(db_path)
for statement in SCHEMA_STATEMENTS:
conn.execute(statement)
_ensure_job_worker_throughput_columns(conn)
_ensure_playlist_columns(conn)
if default_library_root is not None:
Path(default_library_root).mkdir(parents=True, exist_ok=True)
ensure_default_local_backend(conn, default_library_root)
conn.commit()
return conn