dockervault/dockervault/cli.py

370 lines
11 KiB
Python

from __future__ import annotations
import argparse
import json
import logging
import shlex
import socket
import subprocess
from datetime import datetime
from pathlib import Path
from typing import Any, Iterable
from . import __version__
from .classifier import classify_compose
LOGGER = logging.getLogger("dockervault")
def setup_logging(verbose: bool = False) -> None:
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(level=level, format="%(levelname)s: %(message)s")
def safe_get(obj: Any, key: str, default: Any = None) -> Any:
if obj is None:
return default
if isinstance(obj, dict):
return obj.get(key, default)
return getattr(obj, key, default)
def normalize_entries(entries: Any) -> list[dict[str, Any]]:
"""
Normaliserer classifier-output til ensartede dict entries.
Understøtter:
- list[MountEntry]
- list[dict]
- enkeltobjekter
"""
if not entries:
return []
if not isinstance(entries, (list, tuple)):
entries = [entries]
normalized: list[dict[str, Any]] = []
for entry in entries:
if isinstance(entry, dict):
normalized.append(
{
"path": entry.get("path") or entry.get("source") or entry.get("host_path"),
"priority": entry.get("priority") or entry.get("classification"),
"service": entry.get("service"),
"target": entry.get("target") or entry.get("container_path"),
"source_type": entry.get("source_type"),
"reason": entry.get("reason"),
"exists": entry.get("exists"),
}
)
else:
normalized.append(
{
"path": safe_get(entry, "path", safe_get(entry, "source")),
"priority": safe_get(entry, "priority", safe_get(entry, "classification")),
"service": safe_get(entry, "service"),
"target": safe_get(entry, "target", safe_get(entry, "container_path")),
"source_type": safe_get(entry, "source_type"),
"reason": safe_get(entry, "reason"),
"exists": safe_get(entry, "exists"),
}
)
return normalized
def classify_entries(
raw_plan: Any,
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
"""
classify_compose() returnerer aktuelt en liste af MountEntry.
Vi mapper dem til CLI-sektioner sådan her:
- critical -> include
- optional -> skip
- alt andet -> review
"""
entries = normalize_entries(raw_plan)
include_entries: list[dict[str, Any]] = []
review_entries: list[dict[str, Any]] = []
skip_entries: list[dict[str, Any]] = []
for entry in entries:
classification = (entry.get("priority") or "").strip().lower()
if classification == "critical":
include_entries.append(entry)
elif classification in {"optional", "skip", "ignored"}:
skip_entries.append(entry)
else:
review_entries.append(entry)
return include_entries, review_entries, skip_entries
def extract_paths(entries: Iterable[dict[str, Any]]) -> list[str]:
paths: list[str] = []
seen: set[str] = set()
for entry in entries:
path = entry.get("path")
if not path:
continue
path_str = str(path)
if path_str not in seen:
seen.add(path_str)
paths.append(path_str)
return paths
def entry_to_line(entry: dict[str, Any]) -> str:
path = entry.get("path") or "(unknown)"
priority = entry.get("priority") or "unknown"
service = entry.get("service") or "unknown"
target = entry.get("target") or "unknown"
exists = entry.get("exists")
extra = []
if entry.get("source_type"):
extra.append(f"type={entry['source_type']}")
if exists is not None:
extra.append(f"exists={exists}")
if entry.get("reason"):
extra.append(f"reason={entry['reason']}")
suffix = f" ({', '.join(extra)})" if extra else ""
return f" - {path} [{priority}] service={service} target={target}{suffix}"
def default_archive_name() -> str:
hostname = socket.gethostname()
now = datetime.now().strftime("%Y-%m-%d_%H-%M")
return f"{hostname}-{now}"
def build_borg_command(repo: str, archive_name: str, include_paths: list[str]) -> list[str]:
if not repo or not include_paths:
return []
cmd = [
"borg",
"create",
"--stats",
"--progress",
f"{repo}::{archive_name}",
]
cmd.extend(include_paths)
return cmd
def print_human_plan(raw_plan: Any, compose_path: Path, repo: str | None = None) -> None:
include_entries, review_entries, skip_entries = classify_entries(raw_plan)
print()
print("DockerVault Backup Plan")
print("=======================")
print(f"Compose file: {compose_path.resolve()}")
print(f"Project root: {compose_path.resolve().parent}")
print()
print("INCLUDE PATHS:")
if include_entries:
for entry in include_entries:
print(entry_to_line(entry))
else:
print(" - (none)")
print()
print("REVIEW PATHS:")
if review_entries:
for entry in review_entries:
print(entry_to_line(entry))
else:
print(" - (none)")
print()
print("SKIP PATHS:")
if skip_entries:
for entry in skip_entries:
print(entry_to_line(entry))
else:
print(" - (none)")
print()
if repo:
include_paths = extract_paths(include_entries)
archive_name = default_archive_name()
cmd = build_borg_command(repo=repo, archive_name=archive_name, include_paths=include_paths)
print("Suggested borg create command")
print("============================")
if cmd:
rendered = " \\\n ".join(shlex.quote(part) for part in cmd)
print(rendered)
else:
print("(no command generated)")
print()
def print_automation_output(raw_plan: Any, compose_path: Path, repo: str | None = None) -> None:
include_entries, review_entries, skip_entries = classify_entries(raw_plan)
include_paths = extract_paths(include_entries)
payload: dict[str, Any] = {
"compose_file": str(compose_path.resolve()),
"project_root": str(compose_path.resolve().parent),
"include_paths": include_paths,
"review_paths": [str(e["path"]) for e in review_entries if e.get("path")],
"skip_paths": [str(e["path"]) for e in skip_entries if e.get("path")],
}
if repo:
payload["borg_repo"] = repo
payload["suggested_archive_name"] = default_archive_name()
payload["borg_command"] = build_borg_command(
repo=repo,
archive_name=payload["suggested_archive_name"],
include_paths=include_paths,
)
print(json.dumps(payload, indent=2))
def run_borg(repo: str, include_paths: list[str], dry_run: bool = False) -> int:
if not include_paths:
LOGGER.warning("No include paths found. Nothing to back up.")
return 0
archive_name = default_archive_name()
cmd = build_borg_command(repo=repo, archive_name=archive_name, include_paths=include_paths)
LOGGER.info("Borg repository: %s", repo)
LOGGER.info("Archive name: %s", archive_name)
if dry_run:
LOGGER.info("Dry-run enabled. Borg command will not be executed.")
print(" ".join(shlex.quote(part) for part in cmd))
return 0
LOGGER.info("Running borg backup...")
result = subprocess.run(cmd, text=True)
if result.returncode != 0:
LOGGER.error("Borg exited with status %s", result.returncode)
return result.returncode
LOGGER.info("Borg backup completed successfully.")
return 0
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
prog="dockervault",
description="Intelligent Docker backup discovery with Borg integration",
)
parser.add_argument("compose", help="Path to docker-compose.yml or compose.yaml")
parser.add_argument("--repo", help="Borg repository path")
parser.add_argument(
"--run-borg",
action="store_true",
help="Run borg create after building the backup plan",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show borg command without executing it",
)
parser.add_argument(
"--automation",
action="store_true",
help="Output machine-readable JSON",
)
parser.add_argument(
"--quiet",
action="store_true",
help="Suppress normal human-readable plan output",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Enable verbose logging",
)
parser.add_argument(
"--version",
action="version",
version=f"%(prog)s {__version__}",
)
return parser.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
setup_logging(args.verbose)
compose_path = Path(args.compose)
try:
if not compose_path.exists():
LOGGER.error("Compose file not found: %s", compose_path)
return 1
if not compose_path.is_file():
LOGGER.error("Compose path is not a file: %s", compose_path)
return 1
raw_plan = classify_compose(compose_path)
if args.verbose:
LOGGER.debug("Raw plan type: %s", type(raw_plan))
LOGGER.debug("Raw plan repr: %r", raw_plan)
include_entries, _, _ = classify_entries(raw_plan)
include_paths = extract_paths(include_entries)
if args.automation:
print_automation_output(raw_plan, compose_path, repo=args.repo)
elif not args.quiet:
print_human_plan(raw_plan, compose_path, repo=args.repo)
if args.run_borg:
if not args.repo:
LOGGER.error("--repo is required when using --run-borg")
return 2
return run_borg(
repo=args.repo,
include_paths=include_paths,
dry_run=args.dry_run,
)
if args.dry_run:
if not args.repo:
LOGGER.error("--repo is required when using --dry-run")
return 2
archive_name = default_archive_name()
cmd = build_borg_command(args.repo, archive_name, include_paths)
print(" ".join(shlex.quote(part) for part in cmd))
return 0
return 0
except FileNotFoundError as exc:
LOGGER.error("File not found: %s", exc)
return 1
except KeyboardInterrupt:
LOGGER.error("Interrupted by user")
return 130
except Exception:
LOGGER.exception("Unexpected error")
return 99
if __name__ == "__main__":
raise SystemExit(main())