diff --git a/CHANGELOG.md b/CHANGELOG.md index 86b38c4..3744aab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,23 +1,5 @@ -# Changelog +M# Changelog All notable changes to this project will be documented in this file. ---- - -## [0.1.0] - 2026-03-24 - -### Added -- Initial DockerVault CLI -- Recursive Docker Compose scanning -- Classification engine (critical / review / skip) -- Named volume detection and resolution -- Missing path detection -- Borg backup command generation -- Automation mode (--automation, --quiet) -- Exit codes for scripting -- Initial pytest test suite -- Project README and documentation - -### Notes -- First public foundation release of DockerVault -- Focused on backup discovery for real Docker environments +## [0.1.0 diff --git a/dockervault/cli.py b/dockervault/cli.py index 7ac86cb..ef02476 100644 --- a/dockervault/cli.py +++ b/dockervault/cli.py @@ -5,56 +5,140 @@ from pathlib import Path from . import __version__ +def entry_to_dict(entry): + if isinstance(entry, dict): + data = dict(entry) + else: + data = {} + for attr in ( + "path", + "source", + "source_path", + "host_path", + "mount_source", + "classification", + "service", + "target", + "exists", + "reason", + "compose_file", + ): + if hasattr(entry, attr): + data[attr] = getattr(entry, attr) + + real_path = ( + data.get("path") + or data.get("source") + or data.get("source_path") + or data.get("host_path") + or data.get("mount_source") + ) + + if real_path is not None: + real_path = str(real_path) + + compose_file = data.get("compose_file") + if compose_file is not None: + compose_file = str(compose_file) + + return { + "path": real_path, + "classification": data.get("classification"), + "service": data.get("service"), + "target": data.get("target"), + "exists": data.get("exists"), + "reason": data.get("reason"), + "compose_file": compose_file, + "source": data.get("source"), + } + + +def normalize_entries(entries): + normalized = [] + for entry in entries: + item = entry_to_dict(entry) + + classification = item.get("classification", "review") + if classification == "critical": + bucket = "include" + elif classification == "skip": + bucket = "skip" + else: + bucket = "review" + + normalized.append( + { + "path": item.get("path") or "?", + "class": classification, + "bucket": bucket, + "service": item.get("service", "?"), + "target": item.get("target", "?"), + "exists": item.get("exists", True), + "reason": item.get("reason"), + "compose_file": item.get("compose_file"), + "source": item.get("source"), + } + ) + return normalized + + +def build_plan(scan_root, entries, compose_count): + normalized = normalize_entries(entries) + return { + "root": str(scan_root), + "compose_files_found": compose_count, + "include": [e for e in normalized if e["bucket"] == "include"], + "review": [e for e in normalized if e["bucket"] == "review"], + "skip": [e for e in normalized if e["bucket"] == "skip"], + } + + def print_plan(plan): print("\nDockerVault Backup Plan") print("=======================") - - if isinstance(plan, dict) and "root" in plan: - print(f"Scan root: {plan['root']}") - - include = plan.get("include", []) - review = plan.get("review", []) - skip = plan.get("skip", []) + print(f"Scan root: {plan['root']}") + print(f"Compose files found: {plan.get('compose_files_found', 0)}") print("\nINCLUDE PATHS:") - if include: - for item in include: + if plan["include"]: + for item in plan["include"]: + extra = f" reason={item['reason']}" if item.get("reason") else "" print( - f" - {item.get('path')} [{item.get('class', 'unknown')}] " - f"service={item.get('service', '?')} target={item.get('target', '?')}" + f" - {item['path']} [{item['class']}] " + f"service={item['service']} target={item['target']}{extra}" ) else: print(" - (none)") print("\nREVIEW PATHS:") - if review: - for item in review: + if plan["review"]: + for item in plan["review"]: + extra = f" reason={item['reason']}" if item.get("reason") else "" print( - f" - {item.get('path')} [{item.get('class', 'unknown')}] " - f"service={item.get('service', '?')} target={item.get('target', '?')}" + f" - {item['path']} [{item['class']}] " + f"service={item['service']} target={item['target']}{extra}" ) else: print(" - (none)") print("\nSKIP PATHS:") - if skip: - for item in skip: + if plan["skip"]: + for item in plan["skip"]: + extra = f" reason={item['reason']}" if item.get("reason") else "" print( - f" - {item.get('path')} [{item.get('class', 'unknown')}] " - f"service={item.get('service', '?')} target={item.get('target', '?')}" + f" - {item['path']} [{item['class']}] " + f"service={item['service']} target={item['target']}{extra}" ) else: print(" - (none)") def print_warnings(plan): - include = plan.get("include", []) - missing = [item for item in include if not item.get("exists", True)] - + missing = [item for item in plan["include"] if not item.get("exists", True)] if missing: print("\nWARNING: Missing critical paths detected") for item in missing: - print(f" - {item.get('path')} (service={item.get('service', '?')})") + print(f" - {item['path']} (service={item['service']})") def build_parser(): @@ -103,6 +187,13 @@ def build_parser(): help="Automation-friendly mode", ) + parser.add_argument( + "--exclude", + action="append", + default=[], + help="Exclude directory name or path fragment during discovery (repeatable)", + ) + return parser @@ -120,48 +211,56 @@ def main(): sys.exit(2) try: - from .discovery import scan_path - from .classifier import classify_paths - except ModuleNotFoundError as e: - print(f"ERROR: Missing internal module: {e}") - sys.exit(2) + from .discovery import discover_compose_files + from .classifier import classify_compose except ImportError as e: - print(f"ERROR: Import problem: {e}") + print(f"ERROR: Discovery/classifier import problem: {e}") sys.exit(2) - scan_result = scan_path(scan_root) - plan = classify_paths(scan_result) + entries = [] + compose_files = [] - if not isinstance(plan, dict): - print("ERROR: classify_paths() did not return a dict-like plan") + try: + if scan_root.is_file(): + compose_files = [scan_root] + else: + compose_files = discover_compose_files( + scan_root, + excludes=args.exclude, + ) + + for compose_file in compose_files: + entries.extend(classify_compose(compose_file)) + except Exception as e: + print(f"ERROR: Failed during discovery/classification: {e}") sys.exit(2) - plan.setdefault("root", str(scan_root)) + plan = build_plan(scan_root, entries, len(compose_files)) if not args.quiet: print_plan(plan) print_warnings(plan) - if args.borg and args.repo: + if args.repo: try: - from .borg import generate_borg_command - except ModuleNotFoundError as e: - print(f"ERROR: Missing borg module: {e}") - sys.exit(2) + from .borg import build_borg_create_command, command_to_shell except ImportError as e: print(f"ERROR: Borg import problem: {e}") sys.exit(2) - print("\nSuggested borg create command") - print("============================") - cmd = generate_borg_command(plan, repo=args.repo) - print(cmd) + try: + include_paths = [item["path"] for item in plan["include"] if item["path"] != "?"] + borg_command = build_borg_create_command(args.repo, include_paths) + + print("\nSuggested borg create command") + print("============================") + print(command_to_shell(borg_command)) + except Exception as e: + print(f"ERROR: Failed to build borg command: {e}") + sys.exit(2) if args.automation: - has_missing = any( - not item.get("exists", True) - for item in plan.get("include", []) - ) + has_missing = any(not item.get("exists", True) for item in plan["include"]) if has_missing: sys.exit(1)