From 80ec0a74e8f80359a0a9664cafe44a3b9725009a Mon Sep 17 00:00:00 2001 From: Eddie Nielsen <“ed”@edcore.dk”> Date: Mon, 23 Mar 2026 11:45:19 +0000 Subject: [PATCH] feat: add borg execution, mkdir apply, and missing path validation --- dockervault/cli.py | 249 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 213 insertions(+), 36 deletions(-) diff --git a/dockervault/cli.py b/dockervault/cli.py index 411dc75..3e88ab3 100644 --- a/dockervault/cli.py +++ b/dockervault/cli.py @@ -2,7 +2,9 @@ from __future__ import annotations import argparse import json +import shlex import socket +import subprocess import sys from datetime import datetime from pathlib import Path @@ -16,7 +18,7 @@ def check_path_exists(path: str) -> bool: def create_missing_paths(paths: list[str]) -> list[str]: - created = [] + created: list[str] = [] for path in sorted(set(paths)): p = Path(path) if not p.exists(): @@ -60,7 +62,20 @@ def build_borg_command(repo: str, archive_name: str, include_paths: list[str]) - return "\n".join(lines) -def find_missing_paths(plan: dict[str, Any]) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: +def build_borg_argv(repo: str, archive_name: str, include_paths: list[str]) -> list[str]: + return [ + "borg", + "create", + "--stats", + "--progress", + f"{repo}::{archive_name}", + *include_paths, + ] + + +def find_missing_paths( + plan: dict[str, Any], +) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: missing_include = [ item for item in plan.get("include", []) if not check_path_exists(item["source"]) @@ -98,61 +113,223 @@ def print_human_summary(compose_file: Path, project_root: Path, plan: dict[str, print() -def main() -> None: - parser = argparse.ArgumentParser(description="DockerVault") +def print_missing_paths_report( + missing_include: list[dict[str, Any]], + missing_review: list[dict[str, Any]], +) -> None: + all_missing = missing_include + missing_review + if not all_missing: + return - parser.add_argument("compose_file", nargs="?", default="docker-compose.yml") - parser.add_argument("--borg", action="store_true") - parser.add_argument("--borg-repo", default="/backup-repo") - parser.add_argument("--borg-archive", default="{hostname}-{now:%Y-%m-%d_%H-%M}") - parser.add_argument("--fail-on-missing", action="store_true") - parser.add_argument("--apply-mkdir", action="store_true") + print("WARNING: Missing paths detected:") + for item in all_missing: + bucket = "include" if item in missing_include else "review" + print(f" - {item['source']} (service={item['service']}, bucket={bucket})") + print() + + +def print_created_paths(created_paths: list[str]) -> None: + if not created_paths: + return + + print("Created missing paths:") + for path in created_paths: + print(f" - {path}") + print() + + +def plan_to_json_dict( + compose_file: Path, + project_root: Path, + plan: dict[str, Any], + borg_repo: str | None = None, + borg_archive: str | None = None, + borg_command: str | None = None, + missing_include: list[dict[str, Any]] | None = None, + missing_review: list[dict[str, Any]] | None = None, +) -> dict[str, Any]: + return { + "compose_file": str(compose_file.resolve()), + "project_root": str(project_root.resolve()), + "include": plan.get("include", []), + "review": plan.get("review", []), + "skip": plan.get("skip", []), + "missing": { + "include": missing_include or [], + "review": missing_review or [], + }, + "borg": { + "repo": borg_repo, + "archive": borg_archive, + "command": borg_command, + } + if borg_repo or borg_archive or borg_command + else None, + } + + +def main() -> None: + parser = argparse.ArgumentParser( + description="DockerVault - intelligent Docker backup discovery" + ) + + parser.add_argument( + "compose_file", + nargs="?", + default="docker-compose.yml", + help="Path to docker-compose.yml", + ) + + parser.add_argument( + "--json", + action="store_true", + help="Print plan as JSON", + ) + + parser.add_argument( + "--borg", + action="store_true", + help="Show suggested borg create command", + ) + + parser.add_argument( + "--run-borg", + action="store_true", + help="Execute borg create", + ) + + parser.add_argument( + "--borg-repo", + default="/backup-repo", + help="Borg repository path or URI (default: /backup-repo)", + ) + + parser.add_argument( + "--borg-archive", + default="{hostname}-{now:%Y-%m-%d_%H-%M}", + help=( + "Archive naming template. Supported fields: " + "{hostname}, {project}, {compose_stem}, {now:...}" + ), + ) + + parser.add_argument( + "--fail-on-missing", + action="store_true", + help="Exit with status 2 if include/review paths are missing", + ) + + parser.add_argument( + "--apply-mkdir", + action="store_true", + help="Create missing include/review paths", + ) args = parser.parse_args() compose_file = Path(args.compose_file).resolve() + if not compose_file.exists(): + raise SystemExit(f"Compose file not found: {compose_file}") + project_root = compose_file.parent - project_name = project_root.name + project_name = project_root.name or compose_file.stem plan = classify_compose(compose_file) + missing_include, missing_review = find_missing_paths(plan) all_missing = missing_include + missing_review - print_human_summary(compose_file, project_root, plan) + created_paths: list[str] = [] + if args.apply_mkdir and all_missing: + created_paths = create_missing_paths([item["source"] for item in all_missing]) + missing_include, missing_review = find_missing_paths(plan) + all_missing = missing_include + missing_review - if all_missing: - print("WARNING: Missing paths detected:") - for item in all_missing: - print(f" - {item['source']} ({item['service']})") + borg_command: str | None = None + borg_argv: list[str] | None = None + archive_name: str | None = None - paths = [item["source"] for item in all_missing] - - if args.apply_mkdir: - created = create_missing_paths(paths) - - print() - print("Created missing paths:") - for p in created: - print(f" - {p}") - - else: - print() - print("Suggested fix:") - print(build_mkdir_suggestion(paths)) - - if args.borg: - archive = render_borg_archive(args.borg_archive, project_name, compose_file) + if args.borg or args.run_borg: include_paths = [item["source"] for item in plan.get("include", [])] + try: + archive_name = render_borg_archive( + args.borg_archive, + project_name, + compose_file, + ) + except KeyError as exc: + raise SystemExit( + f"Invalid borg archive template field: {exc}. " + "Allowed: hostname, project, compose_stem, now" + ) from exc + + borg_command = build_borg_command( + repo=args.borg_repo, + archive_name=archive_name, + include_paths=include_paths, + ) + + borg_argv = build_borg_argv( + repo=args.borg_repo, + archive_name=archive_name, + include_paths=include_paths, + ) + + if args.json: + print( + json.dumps( + plan_to_json_dict( + compose_file=compose_file, + project_root=project_root, + plan=plan, + borg_repo=args.borg_repo if (args.borg or args.run_borg) else None, + borg_archive=archive_name, + borg_command=borg_command, + missing_include=missing_include, + missing_review=missing_review, + ), + indent=2, + ) + ) + if args.fail_on_missing and all_missing: + sys.exit(2) + return + + print_human_summary(compose_file, project_root, plan) + print_missing_paths_report(missing_include, missing_review) + print_created_paths(created_paths) + + if all_missing and not args.apply_mkdir: + print("Suggested fix:") + print(build_mkdir_suggestion([item["source"] for item in all_missing])) print() + + if borg_command: print("Suggested borg command:") - print(build_borg_command(args.borg_repo, archive, include_paths)) + print(borg_command) + print() if args.fail_on_missing and all_missing: - print() - print("ERROR: Missing required paths") + print("ERROR: Failing because include/review paths are missing.") sys.exit(2) + if args.run_borg: + if borg_argv is None: + raise SystemExit("Internal error: borg command was not prepared") + + print("Running borg create...") + print(" ".join(shlex.quote(part) for part in borg_argv)) + print() + + try: + completed = subprocess.run(borg_argv, check=False) + except FileNotFoundError as exc: + raise SystemExit("borg executable not found in PATH") from exc + + if completed.returncode != 0: + raise SystemExit(completed.returncode) + if __name__ == "__main__": main()