diff --git a/dockervault/cli.py b/dockervault/cli.py index 6158755..7ac86cb 100644 --- a/dockervault/cli.py +++ b/dockervault/cli.py @@ -1,130 +1,107 @@ import argparse +import sys from pathlib import Path -from typing import List -from concurrent.futures import ThreadPoolExecutor -import socket -from dockervault.discovery import discover_compose_files -from dockervault.analyzer import analyse_compose_file -from dockervault.classifier import classify_mount +from . import __version__ -def print_plan(scan_root: Path, classified_mounts: List[dict], quiet: bool): - if quiet: - return [m for m in classified_mounts if m["class"] == "critical"] - - print() - print("DockerVault Backup Plan") +def print_plan(plan): + print("\nDockerVault Backup Plan") print("=======================") - print(f"Scan root: {scan_root}") - print() - include = [] - review = [] - skip = [] + if isinstance(plan, dict) and "root" in plan: + print(f"Scan root: {plan['root']}") - for m in classified_mounts: - cls = m["class"] + include = plan.get("include", []) + review = plan.get("review", []) + skip = plan.get("skip", []) - if cls == "critical": - include.append(m) - elif cls == "review": - review.append(m) - else: - skip.append(m) - - print("INCLUDE PATHS:") + print("\nINCLUDE PATHS:") if include: - for m in include: + for item in include: print( - f" - {m['source']} " - f"[{m['class']}] " - f"service={m['service']} " - f"target={m['target']} " - f"(exists={m['exists']})" + f" - {item.get('path')} [{item.get('class', 'unknown')}] " + f"service={item.get('service', '?')} target={item.get('target', '?')}" ) else: - print(" (none)") + print(" - (none)") - print() - - print("REVIEW PATHS:") + print("\nREVIEW PATHS:") if review: - for m in review: + for item in review: print( - f" - {m['source']} " - f"[{m['class']}] " - f"service={m['service']} " - f"target={m['target']} " - f"(exists={m['exists']})" + f" - {item.get('path')} [{item.get('class', 'unknown')}] " + f"service={item.get('service', '?')} target={item.get('target', '?')}" ) else: - print(" (none)") + print(" - (none)") - print() - - print("SKIP PATHS:") + print("\nSKIP PATHS:") if skip: - for m in skip: + for item in skip: print( - f" - {m['source']} " - f"[{m['class']}] " - f"service={m['service']} " - f"target={m['target']} " - f"(exists={m['exists']})" + f" - {item.get('path')} [{item.get('class', 'unknown')}] " + f"service={item.get('service', '?')} target={item.get('target', '?')}" ) else: - print(" (none)") - - return include + print(" - (none)") -def print_borg_command(include: List[dict], repo: str, quiet: bool): - if not repo: - return +def print_warnings(plan): + include = plan.get("include", []) + missing = [item for item in include if not item.get("exists", True)] - valid_paths = sorted({ - m["source"] for m in include if m["exists"] - }) - - if not valid_paths: - if not quiet: - print() - print("No valid paths for borg backup") - print("Reason: all critical paths are missing (exists=False)") - return - - hostname = socket.gethostname() - - if quiet: - print(" ".join(valid_paths)) - return - - print() - print("Suggested borg create command") - print("=============================") - - print("borg create --stats --progress \\") - print(f" {repo}::{{hostname}}-{{now:%Y-%m-%d_%H-%M}} \\") - - for p in valid_paths: - print(f" {p} \\") + if missing: + print("\nWARNING: Missing critical paths detected") + for item in missing: + print(f" - {item.get('path')} (service={item.get('service', '?')})") def build_parser(): - parser = argparse.ArgumentParser() + parser = argparse.ArgumentParser( + description="DockerVault - Intelligent Docker backup discovery" + ) - subparsers = parser.add_subparsers(dest="command") + parser.add_argument( + "--version", + action="version", + version=f"DockerVault {__version__}", + ) - scan = subparsers.add_parser("scan") + parser.add_argument( + "path", + nargs="?", + help="Path to scan (folder or docker-compose.yml)", + ) - scan.add_argument("path") - scan.add_argument("--repo") - scan.add_argument("--max-depth", type=int, default=None) - scan.add_argument("--exclude", action="append", default=[]) + parser.add_argument( + "--repo", + help="Borg repository path", + ) - scan.add_argument("--quiet", action="store_true") - scan.add_argument("--automation", action="store_true") + parser.add_argument( + "--borg", + action="store_true", + help="Generate borg command", + ) + + parser.add_argument( + "--dry-run", + action="store_true", + help="Show what would be done without executing", + ) + + parser.add_argument( + "--quiet", + action="store_true", + help="Minimal output", + ) + + parser.add_argument( + "--automation", + action="store_true", + help="Automation-friendly mode", + ) return parser @@ -133,59 +110,63 @@ def main(): parser = build_parser() args = parser.parse_args() - if args.command == "scan": - scan_root = Path(args.path).resolve() + if not args.path: + parser.error("the following arguments are required: path") - if not scan_root.exists(): - if not args.quiet: - print(f"ERROR: Path does not exist: {scan_root}") - return 2 + scan_root = Path(args.path) - compose_files = discover_compose_files( - root=scan_root, - max_depth=args.max_depth, - excludes=args.exclude, + if not scan_root.exists(): + print(f"ERROR: Path does not exist: {scan_root}") + sys.exit(2) + + try: + from .discovery import scan_path + from .classifier import classify_paths + except ModuleNotFoundError as e: + print(f"ERROR: Missing internal module: {e}") + sys.exit(2) + except ImportError as e: + print(f"ERROR: Import problem: {e}") + sys.exit(2) + + scan_result = scan_path(scan_root) + plan = classify_paths(scan_result) + + if not isinstance(plan, dict): + print("ERROR: classify_paths() did not return a dict-like plan") + sys.exit(2) + + plan.setdefault("root", str(scan_root)) + + if not args.quiet: + print_plan(plan) + print_warnings(plan) + + if args.borg and args.repo: + try: + from .borg import generate_borg_command + except ModuleNotFoundError as e: + print(f"ERROR: Missing borg module: {e}") + sys.exit(2) + except ImportError as e: + print(f"ERROR: Borg import problem: {e}") + sys.exit(2) + + print("\nSuggested borg create command") + print("============================") + cmd = generate_borg_command(plan, repo=args.repo) + print(cmd) + + if args.automation: + has_missing = any( + not item.get("exists", True) + for item in plan.get("include", []) ) + if has_missing: + sys.exit(1) - with ThreadPoolExecutor() as executor: - results = list(executor.map(analyse_compose_file, compose_files)) - - classified_mounts = [] - - for r in results: - for m in r["mounts"]: - classified = classify_mount(m) - - compose_dir = r["compose"].parent - source_path = (compose_dir / classified["source"]).resolve() - - classified["source"] = str(source_path) - classified["exists"] = source_path.exists() - - classified_mounts.append(classified) - - missing_critical = [ - m for m in classified_mounts - if m["class"] == "critical" and not m["exists"] - ] - - if missing_critical and not args.quiet: - print() - print("WARNING: Missing critical paths detected") - for m in missing_critical: - print(f" - {m['source']} (service={m['service']})") - print() - - include = print_plan(scan_root, classified_mounts, args.quiet) - - print_borg_command(include, args.repo, args.quiet) - - if missing_critical: - return 1 - return 0 - - return 1 + sys.exit(0) if __name__ == "__main__": - raise SystemExit(main()) + main()