From b81b29e674e36c4ea288a229515fbeb27df0f01a Mon Sep 17 00:00:00 2001 From: Eddie Nielsen <“ed”@edcore.dk”> Date: Sun, 22 Mar 2026 13:42:57 +0000 Subject: [PATCH] feat: add missing path validation and fail-on-missing --- dockervault/cli.py | 386 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 306 insertions(+), 80 deletions(-) diff --git a/dockervault/cli.py b/dockervault/cli.py index 2469eba..3f390ce 100644 --- a/dockervault/cli.py +++ b/dockervault/cli.py @@ -2,106 +2,332 @@ from __future__ import annotations import argparse import json +import socket import sys +from datetime import datetime from pathlib import Path +from typing import Any -from dockervault.scanner import scan_projects +from dockervault.classifier import classify_compose -def build_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser( - prog="dockervault", - description="DockerVault CLI" - ) - subparsers = parser.add_subparsers(dest="command", required=True) +def check_path_exists(path: str) -> bool: + return Path(path).exists() - scan_parser = subparsers.add_parser( - "scan", - help="Scan a folder for Docker Compose projects" - ) - scan_parser.add_argument( - "path", - nargs="?", - default=".", - help="Base path to scan (default: current directory)" - ) - scan_parser.add_argument( - "--json", - action="store_true", - help="Output scan results as JSON" + +def build_mkdir_suggestion(paths: list[str]) -> str: + unique_paths = sorted(set(paths)) + lines = ["mkdir -p \\"] + for index, path in enumerate(unique_paths): + suffix = " \\" if index < len(unique_paths) - 1 else "" + lines.append(f" {path}{suffix}") + return "\n".join(lines) + + +def render_borg_archive(template: str, project: str, compose_path: Path) -> str: + now = datetime.now() + hostname = socket.gethostname() + compose_stem = compose_path.stem + + return template.format( + hostname=hostname, + project=project, + compose_stem=compose_stem, + now=now, ) - return parser +def build_borg_command( + repo: str, + archive_name: str, + include_paths: list[str], +) -> str: + lines = [ + "borg create --stats --progress \\", + f" {repo}::{archive_name} \\", + ] -def render_text(projects: list) -> str: - if not projects: - return "No Docker Compose projects found." - - lines: list[str] = [] - lines.append(f"Found {len(projects)} project(s):") - - for project in projects: - lines.append("") - lines.append(f"- {project.name}") - lines.append(f" Path: {project.root_path}") - lines.append(f" Compose files: {', '.join(project.compose_files) or '-'}") - lines.append(f" Services: {', '.join(project.service_names) or '-'}") - lines.append(f" Named volumes: {', '.join(project.named_volumes) or '-'}") - - if project.backup_paths: - lines.append(" Backup candidates:") - for backup_path in project.backup_paths: - lines.append(f" - {backup_path}") - else: - lines.append(" Backup candidates: -") - - for service in project.services: - lines.append(f" Service: {service.name}") - lines.append(f" Image: {service.image or '-'}") - lines.append(f" Restart: {service.restart or '-'}") - lines.append(f" Env files: {', '.join(service.env_files) or '-'}") - - if service.mounts: - lines.append(" Mounts:") - for mount in service.mounts: - ro = " (ro)" if mount.read_only else "" - lines.append( - f" - {mount.kind}: {mount.source or '[anonymous]'} -> {mount.target}{ro}" - ) - else: - lines.append(" Mounts: -") + for index, path in enumerate(include_paths): + suffix = " \\" if index < len(include_paths) - 1 else "" + lines.append(f" {path}{suffix}") return "\n".join(lines) -def main() -> int: - parser = build_parser() +def plan_to_json_dict( + compose_file: Path, + project_root: Path, + plan: dict[str, Any], + borg_repo: str | None = None, + borg_archive: str | None = None, + borg_command: str | None = None, + missing_include: list[dict[str, Any]] | None = None, + missing_review: list[dict[str, Any]] | None = None, +) -> dict[str, Any]: + return { + "compose_file": str(compose_file.resolve()), + "project_root": str(project_root.resolve()), + "include": plan.get("include", []), + "review": plan.get("review", []), + "skip": plan.get("skip", []), + "missing": { + "include": missing_include or [], + "review": missing_review or [], + }, + "borg": { + "repo": borg_repo, + "archive": borg_archive, + "command": borg_command, + } + if borg_repo or borg_archive or borg_command + else None, + } + + +def print_human_summary(compose_file: Path, project_root: Path, plan: dict[str, Any]) -> None: + print("DockerVault Backup Plan") + print("=======================") + print(f"Compose file: {compose_file.resolve()}") + print(f"Project root: {project_root.resolve()}") + print() + + print("INCLUDE PATHS:") + include = plan.get("include", []) + if include: + for item in include: + exists = check_path_exists(item["source"]) + status = "✔ exists" if exists else "❌ missing" + + print( + f" - {item['source']} " + f"[{item['priority']}] {status} " + f"service={item['service']} target={item['target']}" + ) + else: + print(" - (none)") + print() + + print("REVIEW PATHS:") + review = plan.get("review", []) + if review: + for item in review: + exists = check_path_exists(item["source"]) + status = "✔ exists" if exists else "❌ missing" + + print( + f" - {item['source']} " + f"[{item['priority']}] {status} " + f"service={item['service']} target={item['target']}" + ) + else: + print(" - (none)") + print() + + print("SKIP PATHS:") + skip = plan.get("skip", []) + if skip: + for item in skip: + exists = check_path_exists(item["source"]) + status = "✔ exists" if exists else "❌ missing" + + print( + f" - {item['source']} " + f"[{item['priority']}] {status} " + f"service={item['service']} target={item['target']}" + ) + else: + print(" - (none)") + + +def find_missing_paths(plan: dict[str, Any]) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + missing_include = [ + item for item in plan.get("include", []) + if not check_path_exists(item["source"]) + ] + + missing_review = [ + item for item in plan.get("review", []) + if not check_path_exists(item["source"]) + ] + + return missing_include, missing_review + + +def print_missing_paths_report( + missing_include: list[dict[str, Any]], + missing_review: list[dict[str, Any]], +) -> None: + all_missing = missing_include + missing_review + + if not all_missing: + return + + print() + print("WARNING: Missing paths detected:") + for item in all_missing: + bucket = "include" if item in missing_include else "review" + print(f" - {item['source']} (service={item['service']}, bucket={bucket})") + + mkdir_paths = [item["source"] for item in all_missing] + + print() + print("Suggested fix for missing paths") + print("================================") + print(build_mkdir_suggestion(mkdir_paths)) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="DockerVault - intelligent Docker backup discovery" + ) + + parser.add_argument( + "compose_file", + nargs="?", + default="docker-compose.yml", + help="Path to docker-compose.yml", + ) + + parser.add_argument( + "--summary-only", + action="store_true", + help="Print human-readable summary only", + ) + + parser.add_argument( + "--json", + action="store_true", + help="Print backup plan as JSON", + ) + + parser.add_argument( + "--borg", + action="store_true", + help="Generate borg backup command output", + ) + + parser.add_argument( + "--borg-json", + action="store_true", + help="Print borg-related output as JSON", + ) + + parser.add_argument( + "--borg-repo", + default="/backup-repo", + help="Borg repository path or URI (default: /backup-repo)", + ) + + parser.add_argument( + "--borg-archive", + default="{hostname}-{now:%Y-%m-%d_%H-%M}", + help=( + "Archive naming template. Supported fields: " + "{hostname}, {project}, {compose_stem}, {now:...}" + ), + ) + + parser.add_argument( + "--fail-on-missing", + action="store_true", + help="Exit with status 2 if include/review paths are missing", + ) + args = parser.parse_args() - if args.command == "scan": + compose_file = Path(args.compose_file).resolve() + if not compose_file.exists(): + raise SystemExit(f"Compose file not found: {compose_file}") + + project_root = compose_file.parent + project_name = project_root.name or compose_file.stem + + plan = classify_compose(compose_file) + missing_include, missing_review = find_missing_paths(plan) + all_missing = missing_include + missing_review + + if args.json: + print( + json.dumps( + plan_to_json_dict( + compose_file, + project_root, + plan, + missing_include=missing_include, + missing_review=missing_review, + ), + indent=2, + ) + ) + if args.fail_on_missing and all_missing: + sys.exit(2) + return + + if args.borg or args.borg_json: + include_items = plan.get("include", []) + include_paths = [item["source"] for item in include_items] + try: - projects = scan_projects(Path(args.path)) - except (FileNotFoundError, NotADirectoryError) as exc: - print(f"Error: {exc}", file=sys.stderr) - return 2 - except json.JSONDecodeError as exc: - print(f"Error: invalid JSON/YAML data: {exc}", file=sys.stderr) - return 2 - except Exception as exc: - print(f"Error: {exc}", file=sys.stderr) - return 2 + archive_name = render_borg_archive( + args.borg_archive, + project_name, + compose_file, + ) + except KeyError as exc: + raise SystemExit( + f"Invalid borg archive template field: {exc}. " + "Allowed: hostname, project, compose_stem, now" + ) from exc - if args.json: - print(json.dumps([project.to_dict() for project in projects], indent=2)) - else: - print(render_text(projects)) + borg_command = build_borg_command( + repo=args.borg_repo, + archive_name=archive_name, + include_paths=include_paths, + ) - return 0 + if args.borg_json: + print( + json.dumps( + plan_to_json_dict( + compose_file, + project_root, + plan, + borg_repo=args.borg_repo, + borg_archive=archive_name, + borg_command=borg_command, + missing_include=missing_include, + missing_review=missing_review, + ), + indent=2, + ) + ) + if args.fail_on_missing and all_missing: + sys.exit(2) + return - parser.print_help() - return 1 + print_human_summary(compose_file, project_root, plan) + print_missing_paths_report(missing_include, missing_review) + + print() + print("Suggested borg create command") + print("=============================") + print(borg_command) + + if args.fail_on_missing and all_missing: + print() + print("ERROR: Failing because include/review paths are missing.") + sys.exit(2) + + return + + print_human_summary(compose_file, project_root, plan) + print_missing_paths_report(missing_include, missing_review) + + if args.fail_on_missing and all_missing: + print() + print("ERROR: Failing because include/review paths are missing.") + sys.exit(2) if __name__ == "__main__": - raise SystemExit(main()) + main()