dockervault/dockervault/cli.py

384 lines
11 KiB
Python

from __future__ import annotations
import argparse
import json
import logging
import shlex
import shutil
import socket
import subprocess
from datetime import datetime
from pathlib import Path
from typing import Any, Iterable
from . import __version__
from .classifier import classify_compose
from .discovery import find_compose_files
LOGGER = logging.getLogger("dockervault")
def setup_logging(verbose: bool = False) -> None:
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(level=level, format="%(levelname)s: %(message)s")
# 🔥 NEW: validation for max-depth
def non_negative_int(value: str) -> int:
ivalue = int(value)
if ivalue < 0:
raise argparse.ArgumentTypeError("must be 0 or greater")
return ivalue
def safe_get(obj: Any, key: str, default: Any = None) -> Any:
if obj is None:
return default
if isinstance(obj, dict):
return obj.get(key, default)
return getattr(obj, key, default)
def normalize_entries(entries: Any) -> list[dict[str, Any]]:
if not entries:
return []
if not isinstance(entries, (list, tuple)):
entries = [entries]
normalized: list[dict[str, Any]] = []
for entry in entries:
if isinstance(entry, dict):
normalized.append(
{
"path": entry.get("path") or entry.get("source") or entry.get("host_path"),
"priority": entry.get("priority") or entry.get("classification"),
"service": entry.get("service"),
"target": entry.get("target") or entry.get("container_path"),
"source_type": entry.get("source_type"),
"reason": entry.get("reason"),
"exists": entry.get("exists"),
"compose_file": entry.get("compose_file"),
}
)
else:
normalized.append(
{
"path": safe_get(entry, "path", safe_get(entry, "source")),
"priority": safe_get(entry, "priority", safe_get(entry, "classification")),
"service": safe_get(entry, "service"),
"target": safe_get(entry, "target", safe_get(entry, "container_path")),
"source_type": safe_get(entry, "source_type"),
"reason": safe_get(entry, "reason"),
"exists": safe_get(entry, "exists"),
"compose_file": safe_get(entry, "compose_file"),
}
)
return normalized
def classify_entries(
raw_entries: Any,
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
entries = normalize_entries(raw_entries)
include_entries: list[dict[str, Any]] = []
review_entries: list[dict[str, Any]] = []
skip_entries: list[dict[str, Any]] = []
for entry in entries:
classification = str(entry.get("priority") or "").strip().lower()
if classification == "critical":
include_entries.append(entry)
elif classification in {"optional", "skip", "ignored"}:
skip_entries.append(entry)
else:
review_entries.append(entry)
return include_entries, review_entries, skip_entries
def dedupe_entries(entries: Iterable[dict[str, Any]]) -> list[dict[str, Any]]:
deduped: list[dict[str, Any]] = []
seen: set[str] = set()
for entry in entries:
path = entry.get("path")
if not path:
continue
key = str(path)
if key in seen:
continue
seen.add(key)
deduped.append(entry)
return deduped
def extract_paths(entries: Iterable[dict[str, Any]]) -> list[str]:
return [str(entry["path"]) for entry in dedupe_entries(entries) if entry.get("path")]
def find_missing_entries(entries: Iterable[dict[str, Any]]) -> list[dict[str, Any]]:
return [entry for entry in entries if entry.get("exists") is False]
def entry_to_line(entry: dict[str, Any]) -> str:
path = entry.get("path") or "(unknown)"
priority = entry.get("priority") or "unknown"
service = entry.get("service") or "unknown"
target = entry.get("target") or "unknown"
exists = entry.get("exists")
compose_file = entry.get("compose_file")
extra = []
if compose_file:
extra.append(f"compose={compose_file}")
if entry.get("source_type"):
extra.append(f"type={entry['source_type']}")
if exists is not None:
extra.append(f"exists={exists}")
if entry.get("reason"):
extra.append(f"reason={entry['reason']}")
suffix = f" ({', '.join(extra)})" if extra else ""
return f" - {path} [{priority}] service={service} target={target}{suffix}"
def default_archive_name() -> str:
hostname = socket.gethostname()
now = datetime.now().strftime("%Y-%m-%d_%H-%M")
return f"{hostname}-{now}"
def build_borg_command(repo: str, archive_name: str, include_paths: list[str]) -> list[str]:
if not repo or not include_paths:
return []
cmd = [
"borg",
"create",
"--stats",
"--progress",
f"{repo}::{archive_name}",
]
cmd.extend(include_paths)
return cmd
def ensure_borg_available() -> bool:
if shutil.which("borg") is None:
LOGGER.error("Borg binary not found in PATH")
return False
return True
def scan_projects(
scan_root: Path,
max_depth: int | None = None,
excludes: list[str] | None = None,
) -> tuple[list[Path], list[dict[str, Any]]]:
compose_files = find_compose_files(
scan_root,
excludes=excludes,
max_depth=max_depth,
)
all_entries: list[dict[str, Any]] = []
for compose_file in compose_files:
raw_entries = classify_compose(compose_file)
normalized = normalize_entries(raw_entries)
for entry in normalized:
entry["compose_file"] = str(compose_file.resolve())
all_entries.append(entry)
return compose_files, all_entries
def print_human_plan(
raw_entries: Any,
label: str,
root_path: Path,
compose_files: list[Path] | None = None,
) -> None:
include_entries, review_entries, skip_entries = classify_entries(raw_entries)
include_entries = dedupe_entries(include_entries)
review_entries = dedupe_entries(review_entries)
skip_entries = dedupe_entries(skip_entries)
missing_include = find_missing_entries(include_entries)
print()
print("DockerVault Backup Plan")
print("=======================")
print(f"{label}: {root_path.resolve()}")
if compose_files is not None:
print(f"Compose files found: {len(compose_files)}")
print()
print("INCLUDE PATHS:")
if include_entries:
for entry in include_entries:
print(entry_to_line(entry))
else:
print(" - (none)")
print()
if missing_include:
print("WARNING: Missing critical paths detected")
for entry in missing_include:
print(f" - {entry.get('path')} (service={entry.get('service')})")
print()
print("REVIEW PATHS:")
if review_entries:
for entry in review_entries:
print(entry_to_line(entry))
else:
print(" - (none)")
print()
print("SKIP PATHS:")
if skip_entries:
for entry in skip_entries:
print(entry_to_line(entry))
else:
print(" - (none)")
print()
def print_automation_output(
raw_entries: Any,
root_path: Path,
repo: str | None = None,
compose_files: list[Path] | None = None,
) -> None:
include_entries, review_entries, skip_entries = classify_entries(raw_entries)
include_entries = dedupe_entries(include_entries)
review_entries = dedupe_entries(review_entries)
skip_entries = dedupe_entries(skip_entries)
include_paths = extract_paths(include_entries)
missing_include = find_missing_entries(include_entries)
payload: dict[str, Any] = {
"root": str(root_path.resolve()),
"include_paths": include_paths,
"review_paths": extract_paths(review_entries),
"skip_paths": extract_paths(skip_entries),
"missing_critical_paths": [str(entry["path"]) for entry in missing_include if entry.get("path")],
}
if compose_files is not None:
payload["compose_files"] = [str(path.resolve()) for path in compose_files]
if repo:
archive_name = default_archive_name()
payload["repo"] = repo
payload["archive_name"] = archive_name
payload["borg_command"] = build_borg_command(repo, archive_name, include_paths)
print(json.dumps(payload, indent=2))
def run_borg_command(cmd: list[str], dry_run: bool = False, quiet: bool = False) -> int:
if not cmd:
LOGGER.error("No Borg command to run")
return 1
if dry_run:
if not quiet:
print("Dry run - Borg command:")
print(" ".join(shlex.quote(part) for part in cmd))
return 0
if not ensure_borg_available():
return 1
if not quiet:
print("Running Borg command:")
print(" ".join(shlex.quote(part) for part in cmd))
result = subprocess.run(cmd, check=False)
return result.returncode
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="dockervault")
parser.add_argument("--repo")
parser.add_argument("--run-borg", action="store_true")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--automation", action="store_true")
parser.add_argument("--quiet", action="store_true")
parser.add_argument("--verbose", action="store_true")
parser.add_argument("--version", action="version", version=__version__)
subparsers = parser.add_subparsers(dest="command", required=True)
plan_parser = subparsers.add_parser("plan")
plan_parser.add_argument("path")
scan_parser = subparsers.add_parser("scan")
scan_parser.add_argument("path")
scan_parser.add_argument(
"--max-depth",
type=non_negative_int,
default=None,
help="Maximum directory depth to scan",
)
scan_parser.add_argument(
"--exclude",
action="append",
default=[],
help="Additional directory name to exclude (can be used multiple times)",
)
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
setup_logging(args.verbose)
if args.command == "scan":
root = Path(args.path)
compose_files, entries = scan_projects(
root,
max_depth=args.max_depth,
excludes=args.exclude,
)
print_human_plan(entries, "Scan root", root, compose_files)
return 0
if args.command == "plan":
root = Path(args.path)
entries = normalize_entries(classify_compose(root))
print_human_plan(entries, "Compose file", root, [root])
return 0
return 0
if __name__ == "__main__":
raise SystemExit(main())