from __future__ import annotations import hashlib from pathlib import Path from typing import Any from .repository import OpsRepository def _parse_sources(value: str | None) -> list[str]: if not value: return [] return [item.strip() for item in value.split(",") if item and item.strip()] def _normalize_env_value(raw_value: str) -> str: stripped_value = raw_value.strip() if ( len(stripped_value) >= 2 and stripped_value[0] == stripped_value[-1] and stripped_value[0] in {"'", '"'} ): return stripped_value[1:-1] return raw_value def _parse_env(content: str) -> dict[str, str]: mapping: dict[str, str] = {} for raw_line in content.splitlines(): line = raw_line.strip() if not line or line.startswith("#"): continue normalized = raw_line.lstrip() if normalized.startswith("export "): normalized = normalized[len("export ") :] if "=" not in normalized: continue key, value = normalized.split("=", 1) key = key.strip() if not key: continue mapping[key] = _normalize_env_value(value) return mapping class CatalogsyncEnvManager: def __init__( self, *, db_path: str | Path, env_file_path: str | Path, repository: OpsRepository | None = None, ): self.env_file_path = Path(env_file_path) self.repository = repository or OpsRepository(db_path) def load_current(self) -> dict[str, str]: if not self.env_file_path.exists(): return {} content = self.env_file_path.read_text(encoding="utf-8") return _parse_env(content) def build_job_snapshot(self) -> dict[str, Any]: current = self.load_current() snapshot: dict[str, Any] = dict(current) snapshot["download_sources"] = _parse_sources(current.get("DOWNLOAD_SOURCES")) return snapshot def save_revision(self, note: str | None = None, source_type: str = "env_file") -> int: content = "" if self.env_file_path.exists(): content = self.env_file_path.read_text(encoding="utf-8") content_hash = hashlib.sha256(content.encode("utf-8")).hexdigest() return self.repository.create_config_revision( source_type=source_type, file_path=str(self.env_file_path.resolve()), content_text=content, content_hash=content_hash, note=note, ) def list_revisions(self, limit: int = 50) -> list[dict[str, Any]]: return self.repository.list_config_revisions(limit=limit) def apply_revision(self, revision_id: int) -> None: revision = self.repository.get_config_revision(revision_id) if revision is None: raise ValueError(f"config revision not found: {revision_id}") self.env_file_path.parent.mkdir(parents=True, exist_ok=True) self.env_file_path.write_text(revision["content_text"], encoding="utf-8") self.repository.mark_config_revision_applied(revision_id)