From e6f6d18c8a843602d8ba1c25e9fa15a4cc60b65c Mon Sep 17 00:00:00 2001 From: Eddie Nielsen <“ed”@edcore.dk”> Date: Mon, 23 Mar 2026 15:03:01 +0000 Subject: [PATCH] fix: align CLI with classifier list output and mount classifications --- dockervault/cli.py | 483 ++++++++++++++++++++++++++------------------- 1 file changed, 280 insertions(+), 203 deletions(-) diff --git a/dockervault/cli.py b/dockervault/cli.py index 75100b0..54246fd 100644 --- a/dockervault/cli.py +++ b/dockervault/cli.py @@ -1,293 +1,370 @@ from __future__ import annotations import argparse -import sys +import json +import logging +import shlex +import socket +import subprocess +from datetime import datetime from pathlib import Path from typing import Any, Iterable -from .borg import ( - build_borg_create_command, - command_to_shell, - run_borg_create, -) +from . import __version__ from .classifier import classify_compose - -def _get_value(obj: Any, *names: str, default: Any = None) -> Any: - for name in names: - if isinstance(obj, dict) and name in obj: - return obj[name] - if hasattr(obj, name): - return getattr(obj, name) - return default +LOGGER = logging.getLogger("dockervault") -def _normalize_entries(entries: Any) -> list[dict[str, Any]]: +def setup_logging(verbose: bool = False) -> None: + level = logging.DEBUG if verbose else logging.INFO + logging.basicConfig(level=level, format="%(levelname)s: %(message)s") + + +def safe_get(obj: Any, key: str, default: Any = None) -> Any: + if obj is None: + return default + if isinstance(obj, dict): + return obj.get(key, default) + return getattr(obj, key, default) + + +def normalize_entries(entries: Any) -> list[dict[str, Any]]: + """ + Normaliserer classifier-output til ensartede dict entries. + + Understøtter: + - list[MountEntry] + - list[dict] + - enkeltobjekter + """ if not entries: return [] + if not isinstance(entries, (list, tuple)): + entries = [entries] + normalized: list[dict[str, Any]] = [] for entry in entries: if isinstance(entry, dict): normalized.append( { - "source": ( - entry.get("source") - or entry.get("path") - or entry.get("host_path") - or entry.get("src") - ), + "path": entry.get("path") or entry.get("source") or entry.get("host_path"), + "priority": entry.get("priority") or entry.get("classification"), "service": entry.get("service"), - "target": ( - entry.get("target") - or entry.get("mount_target") - or entry.get("container_path") - or entry.get("destination") - ), - "classification": ( - entry.get("classification") - or entry.get("priority") - or entry.get("category") - or entry.get("kind") - ), + "target": entry.get("target") or entry.get("container_path"), + "source_type": entry.get("source_type"), "reason": entry.get("reason"), + "exists": entry.get("exists"), + } + ) + else: + normalized.append( + { + "path": safe_get(entry, "path", safe_get(entry, "source")), + "priority": safe_get(entry, "priority", safe_get(entry, "classification")), + "service": safe_get(entry, "service"), + "target": safe_get(entry, "target", safe_get(entry, "container_path")), + "source_type": safe_get(entry, "source_type"), + "reason": safe_get(entry, "reason"), + "exists": safe_get(entry, "exists"), } ) - continue - - normalized.append( - { - "source": _get_value(entry, "source", "path", "host_path", "src"), - "service": _get_value(entry, "service"), - "target": _get_value( - entry, "target", "mount_target", "container_path", "destination" - ), - "classification": _get_value( - entry, "classification", "priority", "category", "kind" - ), - "reason": _get_value(entry, "reason"), - } - ) return normalized -def _extract_plan_sections( - plan: Any, +def classify_entries( + raw_plan: Any, ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]: - include_entries = _normalize_entries( - _get_value(plan, "include", "include_paths", "includes", default=[]) - ) - review_entries = _normalize_entries( - _get_value(plan, "review", "review_paths", "reviews", default=[]) - ) - skip_entries = _normalize_entries( - _get_value(plan, "skip", "skip_paths", "skips", default=[]) - ) + """ + classify_compose() returnerer aktuelt en liste af MountEntry. + Vi mapper dem til CLI-sektioner sådan her: + + - critical -> include + - optional -> skip + - alt andet -> review + """ + entries = normalize_entries(raw_plan) + + include_entries: list[dict[str, Any]] = [] + review_entries: list[dict[str, Any]] = [] + skip_entries: list[dict[str, Any]] = [] + + for entry in entries: + classification = (entry.get("priority") or "").strip().lower() + + if classification == "critical": + include_entries.append(entry) + elif classification in {"optional", "skip", "ignored"}: + skip_entries.append(entry) + else: + review_entries.append(entry) return include_entries, review_entries, skip_entries -def _entry_path(entry: dict[str, Any]) -> str: - return str(entry.get("source") or "(unknown)") - - -def _entry_label(entry: dict[str, Any]) -> str: - classification = entry.get("classification") or "unknown" - service = entry.get("service") or "unknown" - target = entry.get("target") or "unknown" - reason = entry.get("reason") - - label = f"[{classification}] service={service} target={target}" - if reason: - label += f" reason={reason}" - return label - - -def _print_section(title: str, entries: Iterable[dict[str, Any]]) -> None: - entries = list(entries) - print(f"{title}:") - if not entries: - print(" - (none)") - return - - for entry in entries: - print(f" - {_entry_path(entry):<40} {_entry_label(entry)}") - - -def _collect_include_paths(include_entries: Iterable[dict[str, Any]]) -> list[str]: +def extract_paths(entries: Iterable[dict[str, Any]]) -> list[str]: paths: list[str] = [] seen: set[str] = set() - for entry in include_entries: - path = _entry_path(entry) - if path == "(unknown)" or path in seen: + for entry in entries: + path = entry.get("path") + if not path: continue - seen.add(path) - paths.append(path) + path_str = str(path) + if path_str not in seen: + seen.add(path_str) + paths.append(path_str) return paths -def _print_borg_plan( - compose_path: Path, - project_root: Path, - include_entries: list[dict[str, Any]], - review_entries: list[dict[str, Any]], - skip_entries: list[dict[str, Any]], - repo: str | None, -) -> None: +def entry_to_line(entry: dict[str, Any]) -> str: + path = entry.get("path") or "(unknown)" + priority = entry.get("priority") or "unknown" + service = entry.get("service") or "unknown" + target = entry.get("target") or "unknown" + exists = entry.get("exists") + + extra = [] + if entry.get("source_type"): + extra.append(f"type={entry['source_type']}") + if exists is not None: + extra.append(f"exists={exists}") + if entry.get("reason"): + extra.append(f"reason={entry['reason']}") + + suffix = f" ({', '.join(extra)})" if extra else "" + return f" - {path} [{priority}] service={service} target={target}{suffix}" + + +def default_archive_name() -> str: + hostname = socket.gethostname() + now = datetime.now().strftime("%Y-%m-%d_%H-%M") + return f"{hostname}-{now}" + + +def build_borg_command(repo: str, archive_name: str, include_paths: list[str]) -> list[str]: + if not repo or not include_paths: + return [] + + cmd = [ + "borg", + "create", + "--stats", + "--progress", + f"{repo}::{archive_name}", + ] + cmd.extend(include_paths) + return cmd + + +def print_human_plan(raw_plan: Any, compose_path: Path, repo: str | None = None) -> None: + include_entries, review_entries, skip_entries = classify_entries(raw_plan) + print() - print("Borg Backup Plan") - print("================") - print(f"Compose file: {compose_path}") - print(f"Project root: {project_root}") + print("DockerVault Backup Plan") + print("=======================") + print(f"Compose file: {compose_path.resolve()}") + print(f"Project root: {compose_path.resolve().parent}") print() - _print_section("INCLUDE PATHS", include_entries) + print("INCLUDE PATHS:") + if include_entries: + for entry in include_entries: + print(entry_to_line(entry)) + else: + print(" - (none)") print() - _print_section("REVIEW PATHS", review_entries) + + print("REVIEW PATHS:") + if review_entries: + for entry in review_entries: + print(entry_to_line(entry)) + else: + print(" - (none)") print() - _print_section("SKIP PATHS", skip_entries) - include_paths = _collect_include_paths(include_entries) + print("SKIP PATHS:") + if skip_entries: + for entry in skip_entries: + print(entry_to_line(entry)) + else: + print(" - (none)") + print() - if repo and include_paths: - command = build_borg_create_command( + if repo: + include_paths = extract_paths(include_entries) + archive_name = default_archive_name() + cmd = build_borg_command(repo=repo, archive_name=archive_name, include_paths=include_paths) + + print("Suggested borg create command") + print("============================") + if cmd: + rendered = " \\\n ".join(shlex.quote(part) for part in cmd) + print(rendered) + else: + print("(no command generated)") + print() + + +def print_automation_output(raw_plan: Any, compose_path: Path, repo: str | None = None) -> None: + include_entries, review_entries, skip_entries = classify_entries(raw_plan) + include_paths = extract_paths(include_entries) + + payload: dict[str, Any] = { + "compose_file": str(compose_path.resolve()), + "project_root": str(compose_path.resolve().parent), + "include_paths": include_paths, + "review_paths": [str(e["path"]) for e in review_entries if e.get("path")], + "skip_paths": [str(e["path"]) for e in skip_entries if e.get("path")], + } + + if repo: + payload["borg_repo"] = repo + payload["suggested_archive_name"] = default_archive_name() + payload["borg_command"] = build_borg_command( repo=repo, + archive_name=payload["suggested_archive_name"], include_paths=include_paths, ) - print() - print("Suggested borg create command") - print("=============================") - print(command_to_shell(command)) + + print(json.dumps(payload, indent=2)) -def build_parser() -> argparse.ArgumentParser: +def run_borg(repo: str, include_paths: list[str], dry_run: bool = False) -> int: + if not include_paths: + LOGGER.warning("No include paths found. Nothing to back up.") + return 0 + + archive_name = default_archive_name() + cmd = build_borg_command(repo=repo, archive_name=archive_name, include_paths=include_paths) + + LOGGER.info("Borg repository: %s", repo) + LOGGER.info("Archive name: %s", archive_name) + + if dry_run: + LOGGER.info("Dry-run enabled. Borg command will not be executed.") + print(" ".join(shlex.quote(part) for part in cmd)) + return 0 + + LOGGER.info("Running borg backup...") + result = subprocess.run(cmd, text=True) + + if result.returncode != 0: + LOGGER.error("Borg exited with status %s", result.returncode) + return result.returncode + + LOGGER.info("Borg backup completed successfully.") + return 0 + + +def parse_args(argv: list[str] | None = None) -> argparse.Namespace: parser = argparse.ArgumentParser( prog="dockervault", - description="DockerVault - intelligent Docker backup discovery", - ) - - parser.add_argument("compose", help="Path to docker-compose.yml") - - parser.add_argument( - "--borg", - action="store_true", - help="Show borg backup plan and suggested command", + description="Intelligent Docker backup discovery with Borg integration", ) + parser.add_argument("compose", help="Path to docker-compose.yml or compose.yaml") + parser.add_argument("--repo", help="Borg repository path") parser.add_argument( "--run-borg", action="store_true", - help="Run borg create using discovered include paths", + help="Run borg create after building the backup plan", ) - parser.add_argument( - "--repo", - help="Borg repository path, e.g. /mnt/backups/borg/dockervault", - ) - - parser.add_argument( - "--passphrase", - help="Optional borg passphrase", - ) - - parser.add_argument( - "--quiet", + "--dry-run", action="store_true", - help="Suppress borg stdout/stderr output during execution", + help="Show borg command without executing it", ) - parser.add_argument( "--automation", action="store_true", - help="Automation mode: minimal output, non-interactive behavior", + help="Output machine-readable JSON", ) - parser.add_argument( - "--fail-on-review", + "--quiet", action="store_true", - help="Exit with code 4 if review paths are present", + help="Suppress normal human-readable plan output", + ) + parser.add_argument( + "--verbose", + action="store_true", + help="Enable verbose logging", + ) + parser.add_argument( + "--version", + action="version", + version=f"%(prog)s {__version__}", ) - return parser + return parser.parse_args(argv) -def main() -> None: - parser = build_parser() - args = parser.parse_args() +def main(argv: list[str] | None = None) -> int: + args = parse_args(argv) + setup_logging(args.verbose) - compose_path = Path(args.compose).expanduser().resolve() - - if not compose_path.exists(): - print(f"Error: compose file not found: {compose_path}", file=sys.stderr) - sys.exit(1) + compose_path = Path(args.compose) try: - plan = classify_compose(compose_path) - except Exception as exc: - print(f"Error: failed to classify compose file: {exc}", file=sys.stderr) - sys.exit(1) + if not compose_path.exists(): + LOGGER.error("Compose file not found: %s", compose_path) + return 1 - include_entries, review_entries, skip_entries = _extract_plan_sections(plan) - include_paths = _collect_include_paths(include_entries) - project_root = compose_path.parent + if not compose_path.is_file(): + LOGGER.error("Compose path is not a file: %s", compose_path) + return 1 - should_show_plan = args.borg or (not args.automation and not args.quiet) + raw_plan = classify_compose(compose_path) - if should_show_plan: - _print_borg_plan( - compose_path=compose_path, - project_root=project_root, - include_entries=include_entries, - review_entries=review_entries, - skip_entries=skip_entries, - repo=args.repo, - ) + if args.verbose: + LOGGER.debug("Raw plan type: %s", type(raw_plan)) + LOGGER.debug("Raw plan repr: %r", raw_plan) - if args.fail_on_review and review_entries: - if args.automation or args.quiet: - print("REVIEW required", file=sys.stderr) - else: - print() - print("Review required before automated backup can proceed.", file=sys.stderr) - sys.exit(4) + include_entries, _, _ = classify_entries(raw_plan) + include_paths = extract_paths(include_entries) - if args.run_borg: - if not args.repo: - print("Error: --run-borg requires --repo", file=sys.stderr) - sys.exit(2) + if args.automation: + print_automation_output(raw_plan, compose_path, repo=args.repo) + elif not args.quiet: + print_human_plan(raw_plan, compose_path, repo=args.repo) - if not include_paths: - print("Error: no include paths found for borg backup", file=sys.stderr) - sys.exit(3) + if args.run_borg: + if not args.repo: + LOGGER.error("--repo is required when using --run-borg") + return 2 - if not args.quiet: - print() - print("Running borg backup...") - print("======================") + return run_borg( + repo=args.repo, + include_paths=include_paths, + dry_run=args.dry_run, + ) - exit_code = run_borg_create( - repo=args.repo, - include_paths=include_paths, - passphrase=args.passphrase, - quiet=args.quiet, - stats=not args.quiet, - progress=not args.quiet, - ) + if args.dry_run: + if not args.repo: + LOGGER.error("--repo is required when using --dry-run") + return 2 - if exit_code != 0: - print(f"Error: borg exited with status {exit_code}", file=sys.stderr) - sys.exit(exit_code) + archive_name = default_archive_name() + cmd = build_borg_command(args.repo, archive_name, include_paths) + print(" ".join(shlex.quote(part) for part in cmd)) + return 0 - if not args.quiet: - print() - print("Borg backup completed successfully.") + return 0 - sys.exit(0) + except FileNotFoundError as exc: + LOGGER.error("File not found: %s", exc) + return 1 + except KeyboardInterrupt: + LOGGER.error("Interrupted by user") + return 130 + except Exception: + LOGGER.exception("Unexpected error") + return 99 if __name__ == "__main__": - main() + raise SystemExit(main())