refactor: harden CLI for production borg execution and missing mount validation

This commit is contained in:
Eddie Nielsen 2026-03-23 15:15:02 +00:00
parent e6f6d18c8a
commit cd80dadf32

View file

@ -4,8 +4,9 @@ import argparse
import json import json
import logging import logging
import shlex import shlex
import socket import shutil
import subprocess import subprocess
import socket
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Any, Iterable from typing import Any, Iterable
@ -80,10 +81,10 @@ def classify_entries(
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]: ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
""" """
classify_compose() returnerer aktuelt en liste af MountEntry. classify_compose() returnerer aktuelt en liste af MountEntry.
Vi mapper dem til CLI-sektioner sådan her: Mapping til CLI-sektioner:
- critical -> include - critical -> include
- optional -> skip - optional / skip / ignored -> skip
- alt andet -> review - alt andet -> review
""" """
entries = normalize_entries(raw_plan) entries = normalize_entries(raw_plan)
@ -93,7 +94,7 @@ def classify_entries(
skip_entries: list[dict[str, Any]] = [] skip_entries: list[dict[str, Any]] = []
for entry in entries: for entry in entries:
classification = (entry.get("priority") or "").strip().lower() classification = str(entry.get("priority") or "").strip().lower()
if classification == "critical": if classification == "critical":
include_entries.append(entry) include_entries.append(entry)
@ -121,6 +122,10 @@ def extract_paths(entries: Iterable[dict[str, Any]]) -> list[str]:
return paths return paths
def find_missing_entries(entries: Iterable[dict[str, Any]]) -> list[dict[str, Any]]:
return [entry for entry in entries if entry.get("exists") is False]
def entry_to_line(entry: dict[str, Any]) -> str: def entry_to_line(entry: dict[str, Any]) -> str:
path = entry.get("path") or "(unknown)" path = entry.get("path") or "(unknown)"
priority = entry.get("priority") or "unknown" priority = entry.get("priority") or "unknown"
@ -161,8 +166,16 @@ def build_borg_command(repo: str, archive_name: str, include_paths: list[str]) -
return cmd return cmd
def print_human_plan(raw_plan: Any, compose_path: Path, repo: str | None = None) -> None: def ensure_borg_available() -> bool:
if shutil.which("borg") is None:
LOGGER.error("Borg binary not found in PATH")
return False
return True
def print_human_plan(raw_plan: Any, compose_path: Path) -> None:
include_entries, review_entries, skip_entries = classify_entries(raw_plan) include_entries, review_entries, skip_entries = classify_entries(raw_plan)
missing_include = find_missing_entries(include_entries)
print() print()
print("DockerVault Backup Plan") print("DockerVault Backup Plan")
@ -179,6 +192,12 @@ def print_human_plan(raw_plan: Any, compose_path: Path, repo: str | None = None)
print(" - (none)") print(" - (none)")
print() print()
if missing_include:
print("WARNING: Missing critical paths detected")
for entry in missing_include:
print(f" - {entry.get('path')} (service={entry.get('service')})")
print()
print("REVIEW PATHS:") print("REVIEW PATHS:")
if review_entries: if review_entries:
for entry in review_entries: for entry in review_entries:
@ -195,24 +214,11 @@ def print_human_plan(raw_plan: Any, compose_path: Path, repo: str | None = None)
print(" - (none)") print(" - (none)")
print() print()
if repo:
include_paths = extract_paths(include_entries)
archive_name = default_archive_name()
cmd = build_borg_command(repo=repo, archive_name=archive_name, include_paths=include_paths)
print("Suggested borg create command")
print("============================")
if cmd:
rendered = " \\\n ".join(shlex.quote(part) for part in cmd)
print(rendered)
else:
print("(no command generated)")
print()
def print_automation_output(raw_plan: Any, compose_path: Path, repo: str | None = None) -> None: def print_automation_output(raw_plan: Any, compose_path: Path, repo: str | None = None) -> None:
include_entries, review_entries, skip_entries = classify_entries(raw_plan) include_entries, review_entries, skip_entries = classify_entries(raw_plan)
include_paths = extract_paths(include_entries) include_paths = extract_paths(include_entries)
missing_include = find_missing_entries(include_entries)
payload: dict[str, Any] = { payload: dict[str, Any] = {
"compose_file": str(compose_path.resolve()), "compose_file": str(compose_path.resolve()),
@ -220,14 +226,16 @@ def print_automation_output(raw_plan: Any, compose_path: Path, repo: str | None
"include_paths": include_paths, "include_paths": include_paths,
"review_paths": [str(e["path"]) for e in review_entries if e.get("path")], "review_paths": [str(e["path"]) for e in review_entries if e.get("path")],
"skip_paths": [str(e["path"]) for e in skip_entries if e.get("path")], "skip_paths": [str(e["path"]) for e in skip_entries if e.get("path")],
"missing_critical_paths": [str(e["path"]) for e in missing_include if e.get("path")],
} }
if repo: if repo:
archive_name = default_archive_name()
payload["borg_repo"] = repo payload["borg_repo"] = repo
payload["suggested_archive_name"] = default_archive_name() payload["suggested_archive_name"] = archive_name
payload["borg_command"] = build_borg_command( payload["borg_command"] = build_borg_command(
repo=repo, repo=repo,
archive_name=payload["suggested_archive_name"], archive_name=archive_name,
include_paths=include_paths, include_paths=include_paths,
) )
@ -239,18 +247,20 @@ def run_borg(repo: str, include_paths: list[str], dry_run: bool = False) -> int:
LOGGER.warning("No include paths found. Nothing to back up.") LOGGER.warning("No include paths found. Nothing to back up.")
return 0 return 0
if not ensure_borg_available():
return 3
archive_name = default_archive_name() archive_name = default_archive_name()
cmd = build_borg_command(repo=repo, archive_name=archive_name, include_paths=include_paths) cmd = build_borg_command(repo=repo, archive_name=archive_name, include_paths=include_paths)
LOGGER.info("Borg repository: %s", repo)
LOGGER.info("Archive name: %s", archive_name)
if dry_run: if dry_run:
LOGGER.info("Dry-run enabled. Borg command will not be executed.")
print(" ".join(shlex.quote(part) for part in cmd)) print(" ".join(shlex.quote(part) for part in cmd))
return 0 return 0
LOGGER.info("Borg repository: %s", repo)
LOGGER.info("Archive name: %s", archive_name)
LOGGER.info("Running borg backup...") LOGGER.info("Running borg backup...")
result = subprocess.run(cmd, text=True) result = subprocess.run(cmd, text=True)
if result.returncode != 0: if result.returncode != 0:
@ -326,17 +336,26 @@ def main(argv: list[str] | None = None) -> int:
include_entries, _, _ = classify_entries(raw_plan) include_entries, _, _ = classify_entries(raw_plan)
include_paths = extract_paths(include_entries) include_paths = extract_paths(include_entries)
missing_include = find_missing_entries(include_entries)
if args.automation: if args.automation:
print_automation_output(raw_plan, compose_path, repo=args.repo) print_automation_output(raw_plan, compose_path, repo=args.repo)
elif not args.quiet: elif not args.quiet:
print_human_plan(raw_plan, compose_path, repo=args.repo) print_human_plan(raw_plan, compose_path)
if not include_paths:
LOGGER.warning("No include paths found. Nothing to back up.")
return 0
if args.run_borg: if args.run_borg:
if not args.repo: if not args.repo:
LOGGER.error("--repo is required when using --run-borg") LOGGER.error("--repo is required when using --run-borg")
return 2 return 2
if missing_include:
LOGGER.error("Refusing to run borg: missing critical paths detected")
return 4
return run_borg( return run_borg(
repo=args.repo, repo=args.repo,
include_paths=include_paths, include_paths=include_paths,
@ -348,6 +367,9 @@ def main(argv: list[str] | None = None) -> int:
LOGGER.error("--repo is required when using --dry-run") LOGGER.error("--repo is required when using --dry-run")
return 2 return 2
if not ensure_borg_available():
return 3
archive_name = default_archive_name() archive_name = default_archive_name()
cmd = build_borg_command(args.repo, archive_name, include_paths) cmd = build_borg_command(args.repo, archive_name, include_paths)
print(" ".join(shlex.quote(part) for part in cmd)) print(" ".join(shlex.quote(part) for part in cmd))