feat(scan): add max-depth support to discovery

This commit is contained in:
Eddie Nielsen 2026-03-24 11:47:33 +00:00
parent 5af5aab8fe
commit 197474ffd9
2 changed files with 119 additions and 218 deletions

View file

@ -170,8 +170,11 @@ def ensure_borg_available() -> bool:
return True return True
def scan_projects(scan_root: Path) -> tuple[list[Path], list[dict[str, Any]]]: def scan_projects(
compose_files = find_compose_files(scan_root) scan_root: Path,
max_depth: int | None = None,
) -> tuple[list[Path], list[dict[str, Any]]]:
compose_files = find_compose_files(scan_root, max_depth=max_depth)
all_entries: list[dict[str, Any]] = [] all_entries: list[dict[str, Any]] = []
for compose_file in compose_files: for compose_file in compose_files:
@ -258,252 +261,142 @@ def print_automation_output(
payload: dict[str, Any] = { payload: dict[str, Any] = {
"root": str(root_path.resolve()), "root": str(root_path.resolve()),
"include_paths": include_paths, "include_paths": include_paths,
"review_paths": [str(e["path"]) for e in review_entries if e.get("path")], "review_paths": extract_paths(review_entries),
"skip_paths": [str(e["path"]) for e in skip_entries if e.get("path")], "skip_paths": extract_paths(skip_entries),
"missing_critical_paths": [str(e["path"]) for e in missing_include if e.get("path")], "missing_critical_paths": [str(entry["path"]) for entry in missing_include if entry.get("path")],
} }
if compose_files is not None: if compose_files is not None:
payload["compose_files"] = [str(p.resolve()) for p in compose_files] payload["compose_files"] = [str(path.resolve()) for path in compose_files]
if repo: if repo:
archive_name = default_archive_name() archive_name = default_archive_name()
payload["borg_repo"] = repo payload["repo"] = repo
payload["suggested_archive_name"] = archive_name payload["archive_name"] = archive_name
payload["borg_command"] = build_borg_command( payload["borg_command"] = build_borg_command(repo, archive_name, include_paths)
repo=repo,
archive_name=archive_name,
include_paths=include_paths,
)
print(json.dumps(payload, indent=2)) print(json.dumps(payload, indent=2))
def run_borg(repo: str, include_paths: list[str], dry_run: bool = False) -> int: def run_borg_command(cmd: list[str], dry_run: bool = False, quiet: bool = False) -> int:
if not include_paths: if not cmd:
LOGGER.warning("No include paths found. Nothing to back up.") LOGGER.error("No Borg command to run")
return 1
if dry_run:
if not quiet:
print("Dry run - Borg command:")
print(" ".join(shlex.quote(part) for part in cmd))
return 0 return 0
if not ensure_borg_available(): if not ensure_borg_available():
return 3 return 1
archive_name = default_archive_name() if not quiet:
cmd = build_borg_command(repo=repo, archive_name=archive_name, include_paths=include_paths) print("Running Borg command:")
if dry_run:
print(" ".join(shlex.quote(part) for part in cmd)) print(" ".join(shlex.quote(part) for part in cmd))
return 0
LOGGER.info("Borg repository: %s", repo) result = subprocess.run(cmd, check=False)
LOGGER.info("Archive name: %s", archive_name) return result.returncode
LOGGER.info("Running borg backup...")
result = subprocess.run(cmd, text=True, capture_output=True)
if result.stdout:
print(result.stdout, end="")
if result.returncode != 0:
stderr = (result.stderr or "").strip()
if stderr:
LOGGER.error("Borg failed: %s", stderr)
else:
LOGGER.error("Borg exited with status %s", result.returncode)
return result.returncode
LOGGER.info("Borg backup completed successfully.")
return 0
def parse_args(argv: list[str] | None = None) -> argparse.Namespace: def build_parser() -> argparse.ArgumentParser:
common = argparse.ArgumentParser(add_help=False) parser = argparse.ArgumentParser(prog="dockervault")
common.add_argument("--repo", help="Borg repository path")
common.add_argument( parser.add_argument("--repo", help="Borg repository path")
"--run-borg", parser.add_argument("--run-borg", action="store_true", help="Run borg create after planning")
action="store_true", parser.add_argument("--dry-run", action="store_true", help="Print actions without executing borg")
help="Run borg create after building the backup plan", parser.add_argument("--automation", action="store_true", help="Output machine-readable JSON")
) parser.add_argument("--quiet", action="store_true", help="Reduce non-essential output")
common.add_argument( parser.add_argument("--verbose", action="store_true", help="Enable debug logging")
"--dry-run", parser.add_argument("--version", action="version", version=__version__)
action="store_true",
help="Show borg command without executing it", subparsers = parser.add_subparsers(dest="command", required=True)
)
common.add_argument( plan_parser = subparsers.add_parser("plan", help="Plan backup for a single compose file")
"--automation", plan_parser.add_argument("path", help="Path to docker-compose.yml / compose.yml file")
action="store_true",
help="Output machine-readable JSON", scan_parser = subparsers.add_parser("scan", help="Scan a directory tree for compose files")
) scan_parser.add_argument("path", help="Root directory to scan")
common.add_argument( scan_parser.add_argument(
"--quiet", "--max-depth",
action="store_true", type=int,
help="Suppress normal human-readable plan output", default=None,
) help="Maximum directory depth to scan",
common.add_argument(
"--verbose",
action="store_true",
help="Enable verbose logging",
) )
parser = argparse.ArgumentParser( return parser
prog="dockervault",
description="Intelligent Docker backup discovery with Borg integration",
parents=[common],
)
subparsers = parser.add_subparsers(dest="command")
plan_parser = subparsers.add_parser(
"plan",
help="Analyze a single compose file",
parents=[common],
)
plan_parser.add_argument("compose", help="Path to docker-compose.yml or compose.yaml")
scan_parser = subparsers.add_parser(
"scan",
help="Scan a directory for compose files",
parents=[common],
)
scan_parser.add_argument("root", help="Root directory to scan")
parser.add_argument(
"--version",
action="version",
version=f"%(prog)s {__version__}",
)
parser.add_argument(
"legacy_target",
nargs="?",
help=argparse.SUPPRESS,
)
args = parser.parse_args(argv)
if args.command is None and args.legacy_target:
args.command = "plan"
args.compose = args.legacy_target
if args.command is None:
parser.error("You must provide either a compose file or use the 'scan' subcommand.")
return args
def main(argv: list[str] | None = None) -> int: def main() -> int:
args = parse_args(argv) parser = build_parser()
args = parser.parse_args()
setup_logging(args.verbose) setup_logging(args.verbose)
try: if args.command == "plan":
if args.command == "scan": compose_path = Path(args.path).resolve()
root_path = Path(args.root) raw_entries = classify_compose(compose_path)
normalized = normalize_entries(raw_entries)
compose_files, raw_entries = scan_projects(root_path) for entry in normalized:
entry["compose_file"] = str(compose_path)
if args.verbose: if args.automation:
LOGGER.debug("Compose files found: %s", len(compose_files)) print_automation_output(
normalized,
include_entries, _, _ = classify_entries(raw_entries) compose_path,
include_entries = dedupe_entries(include_entries)
include_paths = extract_paths(include_entries)
missing_include = find_missing_entries(include_entries)
if args.automation:
print_automation_output(
raw_entries,
root_path=root_path,
repo=args.repo,
compose_files=compose_files,
)
elif not args.quiet:
print_human_plan(
raw_entries,
label="Scan root",
root_path=root_path,
compose_files=compose_files,
)
else:
compose_path = Path(args.compose)
if not compose_path.exists():
LOGGER.error("Compose file not found: %s", compose_path)
return 1
if not compose_path.is_file():
LOGGER.error("Compose path is not a file: %s", compose_path)
return 1
raw_entries = classify_compose(compose_path)
if args.verbose:
LOGGER.debug("Raw plan type: %s", type(raw_entries))
LOGGER.debug("Raw plan repr: %r", raw_entries)
include_entries, _, _ = classify_entries(raw_entries)
include_entries = dedupe_entries(include_entries)
include_paths = extract_paths(include_entries)
missing_include = find_missing_entries(include_entries)
if args.automation:
print_automation_output(
raw_entries,
root_path=compose_path,
repo=args.repo,
)
elif not args.quiet:
print_human_plan(
raw_entries,
label="Compose file",
root_path=compose_path,
)
if not include_paths:
LOGGER.warning("No include paths found. Nothing to back up.")
return 0
if args.run_borg:
if not args.repo:
LOGGER.error("--repo is required when using --run-borg")
return 2
if missing_include:
LOGGER.error("Refusing to run borg: missing critical paths detected")
return 4
return run_borg(
repo=args.repo, repo=args.repo,
include_paths=include_paths, compose_files=[compose_path],
dry_run=args.dry_run, )
else:
print_human_plan(
normalized,
"Compose file",
compose_path,
compose_files=[compose_path],
) )
if args.dry_run: if args.run_borg:
if not args.repo: include_entries, _, _ = classify_entries(normalized)
LOGGER.error("--repo is required when using --dry-run") include_paths = extract_paths(include_entries)
return 2 cmd = build_borg_command(args.repo or "", default_archive_name(), include_paths)
return run_borg_command(cmd, dry_run=args.dry_run, quiet=args.quiet)
if not ensure_borg_available():
return 3
archive_name = default_archive_name()
cmd = build_borg_command(args.repo, archive_name, include_paths)
print(" ".join(shlex.quote(part) for part in cmd))
return 0
return 0 return 0
except FileNotFoundError as exc: if args.command == "scan":
LOGGER.error("%s", exc) scan_root = Path(args.path).resolve()
return 1 compose_files, all_entries = scan_projects(
except NotADirectoryError as exc: scan_root,
LOGGER.error("%s", exc) max_depth=args.max_depth,
return 1 )
except KeyboardInterrupt:
LOGGER.error("Interrupted by user") if args.automation:
return 130 print_automation_output(
except Exception: all_entries,
LOGGER.exception("Unexpected error") scan_root,
return 99 repo=args.repo,
compose_files=compose_files,
)
else:
print_human_plan(
all_entries,
"Scan root",
scan_root,
compose_files=compose_files,
)
if args.run_borg:
include_entries, _, _ = classify_entries(all_entries)
include_paths = extract_paths(include_entries)
cmd = build_borg_command(args.repo or "", default_archive_name(), include_paths)
return run_borg_command(cmd, dry_run=args.dry_run, quiet=args.quiet)
return 0
parser.error("No command specified")
return 2
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -4,6 +4,7 @@ import os
from pathlib import Path from pathlib import Path
from typing import Iterable from typing import Iterable
DEFAULT_SCAN_EXCLUDES = { DEFAULT_SCAN_EXCLUDES = {
".git", ".git",
".hg", ".hg",
@ -21,6 +22,7 @@ DEFAULT_SCAN_EXCLUDES = {
".vscode", ".vscode",
} }
COMPOSE_FILENAMES = { COMPOSE_FILENAMES = {
"docker-compose.yml", "docker-compose.yml",
"docker-compose.yaml", "docker-compose.yaml",
@ -32,8 +34,10 @@ COMPOSE_FILENAMES = {
def find_compose_files( def find_compose_files(
root: Path | str, root: Path | str,
excludes: Iterable[str] | None = None, excludes: Iterable[str] | None = None,
max_depth: int | None = None,
) -> list[Path]: ) -> list[Path]:
root_path = Path(root).resolve() root_path = Path(root).resolve()
root_depth = len(root_path.parts)
exclude_set = set(DEFAULT_SCAN_EXCLUDES) exclude_set = set(DEFAULT_SCAN_EXCLUDES)
if excludes: if excludes:
@ -42,13 +46,17 @@ def find_compose_files(
found: set[Path] = set() found: set[Path] = set()
for current_root, dirnames, filenames in os.walk(root_path, topdown=True): for current_root, dirnames, filenames in os.walk(root_path, topdown=True):
current_path = Path(current_root)
current_depth = len(current_path.parts) - root_depth
if max_depth is not None and current_depth >= max_depth:
dirnames[:] = []
dirnames[:] = sorted( dirnames[:] = sorted(
d for d in dirnames d for d in dirnames
if d not in exclude_set if d not in exclude_set
) )
current_path = Path(current_root)
for filename in filenames: for filename in filenames:
if filename in COMPOSE_FILENAMES: if filename in COMPOSE_FILENAMES:
found.add((current_path / filename).resolve()) found.add((current_path / filename).resolve())