feat: add missing path validation and fail-on-missing

This commit is contained in:
Eddie Nielsen 2026-03-22 13:42:57 +00:00
parent dc9a6dc322
commit b81b29e674

View file

@ -2,106 +2,332 @@ from __future__ import annotations
import argparse
import json
import socket
import sys
from datetime import datetime
from pathlib import Path
from typing import Any
from dockervault.scanner import scan_projects
from dockervault.classifier import classify_compose
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="dockervault",
description="DockerVault CLI"
)
subparsers = parser.add_subparsers(dest="command", required=True)
def check_path_exists(path: str) -> bool:
return Path(path).exists()
scan_parser = subparsers.add_parser(
"scan",
help="Scan a folder for Docker Compose projects"
)
scan_parser.add_argument(
"path",
nargs="?",
default=".",
help="Base path to scan (default: current directory)"
)
scan_parser.add_argument(
"--json",
action="store_true",
help="Output scan results as JSON"
def build_mkdir_suggestion(paths: list[str]) -> str:
unique_paths = sorted(set(paths))
lines = ["mkdir -p \\"]
for index, path in enumerate(unique_paths):
suffix = " \\" if index < len(unique_paths) - 1 else ""
lines.append(f" {path}{suffix}")
return "\n".join(lines)
def render_borg_archive(template: str, project: str, compose_path: Path) -> str:
now = datetime.now()
hostname = socket.gethostname()
compose_stem = compose_path.stem
return template.format(
hostname=hostname,
project=project,
compose_stem=compose_stem,
now=now,
)
return parser
def build_borg_command(
repo: str,
archive_name: str,
include_paths: list[str],
) -> str:
lines = [
"borg create --stats --progress \\",
f" {repo}::{archive_name} \\",
]
def render_text(projects: list) -> str:
if not projects:
return "No Docker Compose projects found."
lines: list[str] = []
lines.append(f"Found {len(projects)} project(s):")
for project in projects:
lines.append("")
lines.append(f"- {project.name}")
lines.append(f" Path: {project.root_path}")
lines.append(f" Compose files: {', '.join(project.compose_files) or '-'}")
lines.append(f" Services: {', '.join(project.service_names) or '-'}")
lines.append(f" Named volumes: {', '.join(project.named_volumes) or '-'}")
if project.backup_paths:
lines.append(" Backup candidates:")
for backup_path in project.backup_paths:
lines.append(f" - {backup_path}")
else:
lines.append(" Backup candidates: -")
for service in project.services:
lines.append(f" Service: {service.name}")
lines.append(f" Image: {service.image or '-'}")
lines.append(f" Restart: {service.restart or '-'}")
lines.append(f" Env files: {', '.join(service.env_files) or '-'}")
if service.mounts:
lines.append(" Mounts:")
for mount in service.mounts:
ro = " (ro)" if mount.read_only else ""
lines.append(
f" - {mount.kind}: {mount.source or '[anonymous]'} -> {mount.target}{ro}"
)
else:
lines.append(" Mounts: -")
for index, path in enumerate(include_paths):
suffix = " \\" if index < len(include_paths) - 1 else ""
lines.append(f" {path}{suffix}")
return "\n".join(lines)
def main() -> int:
parser = build_parser()
def plan_to_json_dict(
compose_file: Path,
project_root: Path,
plan: dict[str, Any],
borg_repo: str | None = None,
borg_archive: str | None = None,
borg_command: str | None = None,
missing_include: list[dict[str, Any]] | None = None,
missing_review: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
return {
"compose_file": str(compose_file.resolve()),
"project_root": str(project_root.resolve()),
"include": plan.get("include", []),
"review": plan.get("review", []),
"skip": plan.get("skip", []),
"missing": {
"include": missing_include or [],
"review": missing_review or [],
},
"borg": {
"repo": borg_repo,
"archive": borg_archive,
"command": borg_command,
}
if borg_repo or borg_archive or borg_command
else None,
}
def print_human_summary(compose_file: Path, project_root: Path, plan: dict[str, Any]) -> None:
print("DockerVault Backup Plan")
print("=======================")
print(f"Compose file: {compose_file.resolve()}")
print(f"Project root: {project_root.resolve()}")
print()
print("INCLUDE PATHS:")
include = plan.get("include", [])
if include:
for item in include:
exists = check_path_exists(item["source"])
status = "✔ exists" if exists else "❌ missing"
print(
f" - {item['source']} "
f"[{item['priority']}] {status} "
f"service={item['service']} target={item['target']}"
)
else:
print(" - (none)")
print()
print("REVIEW PATHS:")
review = plan.get("review", [])
if review:
for item in review:
exists = check_path_exists(item["source"])
status = "✔ exists" if exists else "❌ missing"
print(
f" - {item['source']} "
f"[{item['priority']}] {status} "
f"service={item['service']} target={item['target']}"
)
else:
print(" - (none)")
print()
print("SKIP PATHS:")
skip = plan.get("skip", [])
if skip:
for item in skip:
exists = check_path_exists(item["source"])
status = "✔ exists" if exists else "❌ missing"
print(
f" - {item['source']} "
f"[{item['priority']}] {status} "
f"service={item['service']} target={item['target']}"
)
else:
print(" - (none)")
def find_missing_paths(plan: dict[str, Any]) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
missing_include = [
item for item in plan.get("include", [])
if not check_path_exists(item["source"])
]
missing_review = [
item for item in plan.get("review", [])
if not check_path_exists(item["source"])
]
return missing_include, missing_review
def print_missing_paths_report(
missing_include: list[dict[str, Any]],
missing_review: list[dict[str, Any]],
) -> None:
all_missing = missing_include + missing_review
if not all_missing:
return
print()
print("WARNING: Missing paths detected:")
for item in all_missing:
bucket = "include" if item in missing_include else "review"
print(f" - {item['source']} (service={item['service']}, bucket={bucket})")
mkdir_paths = [item["source"] for item in all_missing]
print()
print("Suggested fix for missing paths")
print("================================")
print(build_mkdir_suggestion(mkdir_paths))
def main() -> None:
parser = argparse.ArgumentParser(
description="DockerVault - intelligent Docker backup discovery"
)
parser.add_argument(
"compose_file",
nargs="?",
default="docker-compose.yml",
help="Path to docker-compose.yml",
)
parser.add_argument(
"--summary-only",
action="store_true",
help="Print human-readable summary only",
)
parser.add_argument(
"--json",
action="store_true",
help="Print backup plan as JSON",
)
parser.add_argument(
"--borg",
action="store_true",
help="Generate borg backup command output",
)
parser.add_argument(
"--borg-json",
action="store_true",
help="Print borg-related output as JSON",
)
parser.add_argument(
"--borg-repo",
default="/backup-repo",
help="Borg repository path or URI (default: /backup-repo)",
)
parser.add_argument(
"--borg-archive",
default="{hostname}-{now:%Y-%m-%d_%H-%M}",
help=(
"Archive naming template. Supported fields: "
"{hostname}, {project}, {compose_stem}, {now:...}"
),
)
parser.add_argument(
"--fail-on-missing",
action="store_true",
help="Exit with status 2 if include/review paths are missing",
)
args = parser.parse_args()
if args.command == "scan":
try:
projects = scan_projects(Path(args.path))
except (FileNotFoundError, NotADirectoryError) as exc:
print(f"Error: {exc}", file=sys.stderr)
return 2
except json.JSONDecodeError as exc:
print(f"Error: invalid JSON/YAML data: {exc}", file=sys.stderr)
return 2
except Exception as exc:
print(f"Error: {exc}", file=sys.stderr)
return 2
compose_file = Path(args.compose_file).resolve()
if not compose_file.exists():
raise SystemExit(f"Compose file not found: {compose_file}")
project_root = compose_file.parent
project_name = project_root.name or compose_file.stem
plan = classify_compose(compose_file)
missing_include, missing_review = find_missing_paths(plan)
all_missing = missing_include + missing_review
if args.json:
print(json.dumps([project.to_dict() for project in projects], indent=2))
else:
print(render_text(projects))
print(
json.dumps(
plan_to_json_dict(
compose_file,
project_root,
plan,
missing_include=missing_include,
missing_review=missing_review,
),
indent=2,
)
)
if args.fail_on_missing and all_missing:
sys.exit(2)
return
return 0
if args.borg or args.borg_json:
include_items = plan.get("include", [])
include_paths = [item["source"] for item in include_items]
parser.print_help()
return 1
try:
archive_name = render_borg_archive(
args.borg_archive,
project_name,
compose_file,
)
except KeyError as exc:
raise SystemExit(
f"Invalid borg archive template field: {exc}. "
"Allowed: hostname, project, compose_stem, now"
) from exc
borg_command = build_borg_command(
repo=args.borg_repo,
archive_name=archive_name,
include_paths=include_paths,
)
if args.borg_json:
print(
json.dumps(
plan_to_json_dict(
compose_file,
project_root,
plan,
borg_repo=args.borg_repo,
borg_archive=archive_name,
borg_command=borg_command,
missing_include=missing_include,
missing_review=missing_review,
),
indent=2,
)
)
if args.fail_on_missing and all_missing:
sys.exit(2)
return
print_human_summary(compose_file, project_root, plan)
print_missing_paths_report(missing_include, missing_review)
print()
print("Suggested borg create command")
print("=============================")
print(borg_command)
if args.fail_on_missing and all_missing:
print()
print("ERROR: Failing because include/review paths are missing.")
sys.exit(2)
return
print_human_summary(compose_file, project_root, plan)
print_missing_paths_report(missing_include, missing_review)
if args.fail_on_missing and all_missing:
print()
print("ERROR: Failing because include/review paths are missing.")
sys.exit(2)
if __name__ == "__main__":
raise SystemExit(main())
main()