diff --git a/dockervault/cli.py b/dockervault/cli.py index 7ff0bd1..e1864a0 100644 --- a/dockervault/cli.py +++ b/dockervault/cli.py @@ -170,8 +170,11 @@ def ensure_borg_available() -> bool: return True -def scan_projects(scan_root: Path) -> tuple[list[Path], list[dict[str, Any]]]: - compose_files = find_compose_files(scan_root) +def scan_projects( + scan_root: Path, + max_depth: int | None = None, +) -> tuple[list[Path], list[dict[str, Any]]]: + compose_files = find_compose_files(scan_root, max_depth=max_depth) all_entries: list[dict[str, Any]] = [] for compose_file in compose_files: @@ -258,252 +261,142 @@ def print_automation_output( payload: dict[str, Any] = { "root": str(root_path.resolve()), "include_paths": include_paths, - "review_paths": [str(e["path"]) for e in review_entries if e.get("path")], - "skip_paths": [str(e["path"]) for e in skip_entries if e.get("path")], - "missing_critical_paths": [str(e["path"]) for e in missing_include if e.get("path")], + "review_paths": extract_paths(review_entries), + "skip_paths": extract_paths(skip_entries), + "missing_critical_paths": [str(entry["path"]) for entry in missing_include if entry.get("path")], } if compose_files is not None: - payload["compose_files"] = [str(p.resolve()) for p in compose_files] + payload["compose_files"] = [str(path.resolve()) for path in compose_files] if repo: archive_name = default_archive_name() - payload["borg_repo"] = repo - payload["suggested_archive_name"] = archive_name - payload["borg_command"] = build_borg_command( - repo=repo, - archive_name=archive_name, - include_paths=include_paths, - ) + payload["repo"] = repo + payload["archive_name"] = archive_name + payload["borg_command"] = build_borg_command(repo, archive_name, include_paths) print(json.dumps(payload, indent=2)) -def run_borg(repo: str, include_paths: list[str], dry_run: bool = False) -> int: - if not include_paths: - LOGGER.warning("No include paths found. Nothing to back up.") +def run_borg_command(cmd: list[str], dry_run: bool = False, quiet: bool = False) -> int: + if not cmd: + LOGGER.error("No Borg command to run") + return 1 + + if dry_run: + if not quiet: + print("Dry run - Borg command:") + print(" ".join(shlex.quote(part) for part in cmd)) return 0 if not ensure_borg_available(): - return 3 + return 1 - archive_name = default_archive_name() - cmd = build_borg_command(repo=repo, archive_name=archive_name, include_paths=include_paths) - - if dry_run: + if not quiet: + print("Running Borg command:") print(" ".join(shlex.quote(part) for part in cmd)) - return 0 - LOGGER.info("Borg repository: %s", repo) - LOGGER.info("Archive name: %s", archive_name) - LOGGER.info("Running borg backup...") - - result = subprocess.run(cmd, text=True, capture_output=True) - - if result.stdout: - print(result.stdout, end="") - - if result.returncode != 0: - stderr = (result.stderr or "").strip() - if stderr: - LOGGER.error("Borg failed: %s", stderr) - else: - LOGGER.error("Borg exited with status %s", result.returncode) - return result.returncode - - LOGGER.info("Borg backup completed successfully.") - return 0 + result = subprocess.run(cmd, check=False) + return result.returncode -def parse_args(argv: list[str] | None = None) -> argparse.Namespace: - common = argparse.ArgumentParser(add_help=False) - common.add_argument("--repo", help="Borg repository path") - common.add_argument( - "--run-borg", - action="store_true", - help="Run borg create after building the backup plan", - ) - common.add_argument( - "--dry-run", - action="store_true", - help="Show borg command without executing it", - ) - common.add_argument( - "--automation", - action="store_true", - help="Output machine-readable JSON", - ) - common.add_argument( - "--quiet", - action="store_true", - help="Suppress normal human-readable plan output", - ) - common.add_argument( - "--verbose", - action="store_true", - help="Enable verbose logging", +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(prog="dockervault") + + parser.add_argument("--repo", help="Borg repository path") + parser.add_argument("--run-borg", action="store_true", help="Run borg create after planning") + parser.add_argument("--dry-run", action="store_true", help="Print actions without executing borg") + parser.add_argument("--automation", action="store_true", help="Output machine-readable JSON") + parser.add_argument("--quiet", action="store_true", help="Reduce non-essential output") + parser.add_argument("--verbose", action="store_true", help="Enable debug logging") + parser.add_argument("--version", action="version", version=__version__) + + subparsers = parser.add_subparsers(dest="command", required=True) + + plan_parser = subparsers.add_parser("plan", help="Plan backup for a single compose file") + plan_parser.add_argument("path", help="Path to docker-compose.yml / compose.yml file") + + scan_parser = subparsers.add_parser("scan", help="Scan a directory tree for compose files") + scan_parser.add_argument("path", help="Root directory to scan") + scan_parser.add_argument( + "--max-depth", + type=int, + default=None, + help="Maximum directory depth to scan", ) - parser = argparse.ArgumentParser( - prog="dockervault", - description="Intelligent Docker backup discovery with Borg integration", - parents=[common], - ) - - subparsers = parser.add_subparsers(dest="command") - - plan_parser = subparsers.add_parser( - "plan", - help="Analyze a single compose file", - parents=[common], - ) - plan_parser.add_argument("compose", help="Path to docker-compose.yml or compose.yaml") - - scan_parser = subparsers.add_parser( - "scan", - help="Scan a directory for compose files", - parents=[common], - ) - scan_parser.add_argument("root", help="Root directory to scan") - - parser.add_argument( - "--version", - action="version", - version=f"%(prog)s {__version__}", - ) - - parser.add_argument( - "legacy_target", - nargs="?", - help=argparse.SUPPRESS, - ) - - args = parser.parse_args(argv) - - if args.command is None and args.legacy_target: - args.command = "plan" - args.compose = args.legacy_target - - if args.command is None: - parser.error("You must provide either a compose file or use the 'scan' subcommand.") - - return args + return parser -def main(argv: list[str] | None = None) -> int: - args = parse_args(argv) +def main() -> int: + parser = build_parser() + args = parser.parse_args() + setup_logging(args.verbose) - try: - if args.command == "scan": - root_path = Path(args.root) + if args.command == "plan": + compose_path = Path(args.path).resolve() + raw_entries = classify_compose(compose_path) + normalized = normalize_entries(raw_entries) - compose_files, raw_entries = scan_projects(root_path) + for entry in normalized: + entry["compose_file"] = str(compose_path) - if args.verbose: - LOGGER.debug("Compose files found: %s", len(compose_files)) - - include_entries, _, _ = classify_entries(raw_entries) - include_entries = dedupe_entries(include_entries) - include_paths = extract_paths(include_entries) - missing_include = find_missing_entries(include_entries) - - if args.automation: - print_automation_output( - raw_entries, - root_path=root_path, - repo=args.repo, - compose_files=compose_files, - ) - elif not args.quiet: - print_human_plan( - raw_entries, - label="Scan root", - root_path=root_path, - compose_files=compose_files, - ) - - else: - compose_path = Path(args.compose) - - if not compose_path.exists(): - LOGGER.error("Compose file not found: %s", compose_path) - return 1 - - if not compose_path.is_file(): - LOGGER.error("Compose path is not a file: %s", compose_path) - return 1 - - raw_entries = classify_compose(compose_path) - - if args.verbose: - LOGGER.debug("Raw plan type: %s", type(raw_entries)) - LOGGER.debug("Raw plan repr: %r", raw_entries) - - include_entries, _, _ = classify_entries(raw_entries) - include_entries = dedupe_entries(include_entries) - include_paths = extract_paths(include_entries) - missing_include = find_missing_entries(include_entries) - - if args.automation: - print_automation_output( - raw_entries, - root_path=compose_path, - repo=args.repo, - ) - elif not args.quiet: - print_human_plan( - raw_entries, - label="Compose file", - root_path=compose_path, - ) - - if not include_paths: - LOGGER.warning("No include paths found. Nothing to back up.") - return 0 - - if args.run_borg: - if not args.repo: - LOGGER.error("--repo is required when using --run-borg") - return 2 - - if missing_include: - LOGGER.error("Refusing to run borg: missing critical paths detected") - return 4 - - return run_borg( + if args.automation: + print_automation_output( + normalized, + compose_path, repo=args.repo, - include_paths=include_paths, - dry_run=args.dry_run, + compose_files=[compose_path], + ) + else: + print_human_plan( + normalized, + "Compose file", + compose_path, + compose_files=[compose_path], ) - if args.dry_run: - if not args.repo: - LOGGER.error("--repo is required when using --dry-run") - return 2 - - if not ensure_borg_available(): - return 3 - - archive_name = default_archive_name() - cmd = build_borg_command(args.repo, archive_name, include_paths) - print(" ".join(shlex.quote(part) for part in cmd)) - return 0 + if args.run_borg: + include_entries, _, _ = classify_entries(normalized) + include_paths = extract_paths(include_entries) + cmd = build_borg_command(args.repo or "", default_archive_name(), include_paths) + return run_borg_command(cmd, dry_run=args.dry_run, quiet=args.quiet) return 0 - except FileNotFoundError as exc: - LOGGER.error("%s", exc) - return 1 - except NotADirectoryError as exc: - LOGGER.error("%s", exc) - return 1 - except KeyboardInterrupt: - LOGGER.error("Interrupted by user") - return 130 - except Exception: - LOGGER.exception("Unexpected error") - return 99 + if args.command == "scan": + scan_root = Path(args.path).resolve() + compose_files, all_entries = scan_projects( + scan_root, + max_depth=args.max_depth, + ) + + if args.automation: + print_automation_output( + all_entries, + scan_root, + repo=args.repo, + compose_files=compose_files, + ) + else: + print_human_plan( + all_entries, + "Scan root", + scan_root, + compose_files=compose_files, + ) + + if args.run_borg: + include_entries, _, _ = classify_entries(all_entries) + include_paths = extract_paths(include_entries) + cmd = build_borg_command(args.repo or "", default_archive_name(), include_paths) + return run_borg_command(cmd, dry_run=args.dry_run, quiet=args.quiet) + + return 0 + + parser.error("No command specified") + return 2 if __name__ == "__main__": diff --git a/dockervault/discovery.py b/dockervault/discovery.py index 78f9db8..2531bc6 100644 --- a/dockervault/discovery.py +++ b/dockervault/discovery.py @@ -4,6 +4,7 @@ import os from pathlib import Path from typing import Iterable + DEFAULT_SCAN_EXCLUDES = { ".git", ".hg", @@ -21,6 +22,7 @@ DEFAULT_SCAN_EXCLUDES = { ".vscode", } + COMPOSE_FILENAMES = { "docker-compose.yml", "docker-compose.yaml", @@ -32,8 +34,10 @@ COMPOSE_FILENAMES = { def find_compose_files( root: Path | str, excludes: Iterable[str] | None = None, + max_depth: int | None = None, ) -> list[Path]: root_path = Path(root).resolve() + root_depth = len(root_path.parts) exclude_set = set(DEFAULT_SCAN_EXCLUDES) if excludes: @@ -42,13 +46,17 @@ def find_compose_files( found: set[Path] = set() for current_root, dirnames, filenames in os.walk(root_path, topdown=True): + current_path = Path(current_root) + current_depth = len(current_path.parts) - root_depth + + if max_depth is not None and current_depth >= max_depth: + dirnames[:] = [] + dirnames[:] = sorted( d for d in dirnames if d not in exclude_set ) - current_path = Path(current_root) - for filename in filenames: if filename in COMPOSE_FILENAMES: found.add((current_path / filename).resolve())