feat: add scan mode for multi-project docker backup discovery

This commit is contained in:
Eddie Nielsen 2026-03-23 15:26:24 +00:00
parent 3ab5c8d3fd
commit 168f0a38ed

View file

@ -5,14 +5,15 @@ import json
import logging import logging
import shlex import shlex
import shutil import shutil
import subprocess
import socket import socket
import subprocess
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Any, Iterable from typing import Any, Iterable
from . import __version__ from . import __version__
from .classifier import classify_compose from .classifier import classify_compose
from .discovery import find_compose_files
LOGGER = logging.getLogger("dockervault") LOGGER = logging.getLogger("dockervault")
@ -31,14 +32,6 @@ def safe_get(obj: Any, key: str, default: Any = None) -> Any:
def normalize_entries(entries: Any) -> list[dict[str, Any]]: def normalize_entries(entries: Any) -> list[dict[str, Any]]:
"""
Normaliserer classifier-output til ensartede dict entries.
Understøtter:
- list[MountEntry]
- list[dict]
- enkeltobjekter
"""
if not entries: if not entries:
return [] return []
@ -58,6 +51,7 @@ def normalize_entries(entries: Any) -> list[dict[str, Any]]:
"source_type": entry.get("source_type"), "source_type": entry.get("source_type"),
"reason": entry.get("reason"), "reason": entry.get("reason"),
"exists": entry.get("exists"), "exists": entry.get("exists"),
"compose_file": entry.get("compose_file"),
} }
) )
else: else:
@ -70,6 +64,7 @@ def normalize_entries(entries: Any) -> list[dict[str, Any]]:
"source_type": safe_get(entry, "source_type"), "source_type": safe_get(entry, "source_type"),
"reason": safe_get(entry, "reason"), "reason": safe_get(entry, "reason"),
"exists": safe_get(entry, "exists"), "exists": safe_get(entry, "exists"),
"compose_file": safe_get(entry, "compose_file"),
} }
) )
@ -77,17 +72,9 @@ def normalize_entries(entries: Any) -> list[dict[str, Any]]:
def classify_entries( def classify_entries(
raw_plan: Any, raw_entries: Any,
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]: ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
""" entries = normalize_entries(raw_entries)
classify_compose() returnerer aktuelt en liste af MountEntry.
Mapping til CLI-sektioner:
- critical -> include
- optional / skip / ignored -> skip
- alt andet -> review
"""
entries = normalize_entries(raw_plan)
include_entries: list[dict[str, Any]] = [] include_entries: list[dict[str, Any]] = []
review_entries: list[dict[str, Any]] = [] review_entries: list[dict[str, Any]] = []
@ -106,20 +93,31 @@ def classify_entries(
return include_entries, review_entries, skip_entries return include_entries, review_entries, skip_entries
def extract_paths(entries: Iterable[dict[str, Any]]) -> list[str]: def dedupe_entries(entries: Iterable[dict[str, Any]]) -> list[dict[str, Any]]:
paths: list[str] = [] """
Dedupe primært path.
Første forekomst bevares.
"""
deduped: list[dict[str, Any]] = []
seen: set[str] = set() seen: set[str] = set()
for entry in entries: for entry in entries:
path = entry.get("path") path = entry.get("path")
if not path: if not path:
continue continue
path_str = str(path)
if path_str not in seen:
seen.add(path_str)
paths.append(path_str)
return paths key = str(path)
if key in seen:
continue
seen.add(key)
deduped.append(entry)
return deduped
def extract_paths(entries: Iterable[dict[str, Any]]) -> list[str]:
return [str(entry["path"]) for entry in dedupe_entries(entries) if entry.get("path")]
def find_missing_entries(entries: Iterable[dict[str, Any]]) -> list[dict[str, Any]]: def find_missing_entries(entries: Iterable[dict[str, Any]]) -> list[dict[str, Any]]:
@ -132,8 +130,11 @@ def entry_to_line(entry: dict[str, Any]) -> str:
service = entry.get("service") or "unknown" service = entry.get("service") or "unknown"
target = entry.get("target") or "unknown" target = entry.get("target") or "unknown"
exists = entry.get("exists") exists = entry.get("exists")
compose_file = entry.get("compose_file")
extra = [] extra = []
if compose_file:
extra.append(f"compose={compose_file}")
if entry.get("source_type"): if entry.get("source_type"):
extra.append(f"type={entry['source_type']}") extra.append(f"type={entry['source_type']}")
if exists is not None: if exists is not None:
@ -173,15 +174,43 @@ def ensure_borg_available() -> bool:
return True return True
def print_human_plan(raw_plan: Any, compose_path: Path) -> None: def scan_projects(scan_root: Path) -> tuple[list[Path], list[dict[str, Any]]]:
include_entries, review_entries, skip_entries = classify_entries(raw_plan) compose_files = find_compose_files(scan_root)
all_entries: list[dict[str, Any]] = []
for compose_file in compose_files:
raw_entries = classify_compose(compose_file)
normalized = normalize_entries(raw_entries)
for entry in normalized:
entry["compose_file"] = str(compose_file.resolve())
all_entries.append(entry)
return compose_files, all_entries
def print_human_plan(
raw_entries: Any,
label: str,
root_path: Path,
compose_files: list[Path] | None = None,
) -> None:
include_entries, review_entries, skip_entries = classify_entries(raw_entries)
include_entries = dedupe_entries(include_entries)
review_entries = dedupe_entries(review_entries)
skip_entries = dedupe_entries(skip_entries)
missing_include = find_missing_entries(include_entries) missing_include = find_missing_entries(include_entries)
print() print()
print("DockerVault Backup Plan") print("DockerVault Backup Plan")
print("=======================") print("=======================")
print(f"Compose file: {compose_path.resolve()}") print(f"{label}: {root_path.resolve()}")
print(f"Project root: {compose_path.resolve().parent}")
if compose_files is not None:
print(f"Compose files found: {len(compose_files)}")
print() print()
print("INCLUDE PATHS:") print("INCLUDE PATHS:")
@ -215,20 +244,32 @@ def print_human_plan(raw_plan: Any, compose_path: Path) -> None:
print() print()
def print_automation_output(raw_plan: Any, compose_path: Path, repo: str | None = None) -> None: def print_automation_output(
include_entries, review_entries, skip_entries = classify_entries(raw_plan) raw_entries: Any,
root_path: Path,
repo: str | None = None,
compose_files: list[Path] | None = None,
) -> None:
include_entries, review_entries, skip_entries = classify_entries(raw_entries)
include_entries = dedupe_entries(include_entries)
review_entries = dedupe_entries(review_entries)
skip_entries = dedupe_entries(skip_entries)
include_paths = extract_paths(include_entries) include_paths = extract_paths(include_entries)
missing_include = find_missing_entries(include_entries) missing_include = find_missing_entries(include_entries)
payload: dict[str, Any] = { payload: dict[str, Any] = {
"compose_file": str(compose_path.resolve()), "root": str(root_path.resolve()),
"project_root": str(compose_path.resolve().parent),
"include_paths": include_paths, "include_paths": include_paths,
"review_paths": [str(e["path"]) for e in review_entries if e.get("path")], "review_paths": [str(e["path"]) for e in review_entries if e.get("path")],
"skip_paths": [str(e["path"]) for e in skip_entries if e.get("path")], "skip_paths": [str(e["path"]) for e in skip_entries if e.get("path")],
"missing_critical_paths": [str(e["path"]) for e in missing_include if e.get("path")], "missing_critical_paths": [str(e["path"]) for e in missing_include if e.get("path")],
} }
if compose_files is not None:
payload["compose_files"] = [str(p.resolve()) for p in compose_files]
if repo: if repo:
archive_name = default_archive_name() archive_name = default_archive_name()
payload["borg_repo"] = repo payload["borg_repo"] = repo
@ -261,10 +302,17 @@ def run_borg(repo: str, include_paths: list[str], dry_run: bool = False) -> int:
LOGGER.info("Archive name: %s", archive_name) LOGGER.info("Archive name: %s", archive_name)
LOGGER.info("Running borg backup...") LOGGER.info("Running borg backup...")
result = subprocess.run(cmd, text=True) result = subprocess.run(cmd, text=True, capture_output=True)
if result.stdout:
print(result.stdout, end="")
if result.returncode != 0: if result.returncode != 0:
LOGGER.error("Borg exited with status %s", result.returncode) stderr = (result.stderr or "").strip()
if stderr:
LOGGER.error("Borg failed: %s", stderr)
else:
LOGGER.error("Borg exited with status %s", result.returncode)
return result.returncode return result.returncode
LOGGER.info("Borg backup completed successfully.") LOGGER.info("Borg backup completed successfully.")
@ -277,71 +325,108 @@ def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
description="Intelligent Docker backup discovery with Borg integration", description="Intelligent Docker backup discovery with Borg integration",
) )
parser.add_argument("compose", help="Path to docker-compose.yml or compose.yaml") subparsers = parser.add_subparsers(dest="command")
plan_parser = subparsers.add_parser("plan", help="Analyze a single compose file")
plan_parser.add_argument("compose", help="Path to docker-compose.yml or compose.yaml")
scan_parser = subparsers.add_parser("scan", help="Scan a directory for compose files")
scan_parser.add_argument("root", help="Root directory to scan")
parser.add_argument("--repo", help="Borg repository path") parser.add_argument("--repo", help="Borg repository path")
parser.add_argument("--run-borg", action="store_true", help="Run borg create after building the backup plan")
parser.add_argument("--dry-run", action="store_true", help="Show borg command without executing it")
parser.add_argument("--automation", action="store_true", help="Output machine-readable JSON")
parser.add_argument("--quiet", action="store_true", help="Suppress normal human-readable plan output")
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
parser.add_argument("--version", action="version", version=f"%(prog)s {__version__}")
parser.add_argument( parser.add_argument(
"--run-borg", "legacy_target",
action="store_true", nargs="?",
help="Run borg create after building the backup plan", help=argparse.SUPPRESS,
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show borg command without executing it",
)
parser.add_argument(
"--automation",
action="store_true",
help="Output machine-readable JSON",
)
parser.add_argument(
"--quiet",
action="store_true",
help="Suppress normal human-readable plan output",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Enable verbose logging",
)
parser.add_argument(
"--version",
action="version",
version=f"%(prog)s {__version__}",
) )
return parser.parse_args(argv) args = parser.parse_args(argv)
# legacy mode: dockervault docker-compose.yml
if args.command is None and args.legacy_target:
args.command = "plan"
args.compose = args.legacy_target
if args.command is None:
parser.error("You must provide either a compose file or use the 'scan' subcommand.")
return args
def main(argv: list[str] | None = None) -> int: def main(argv: list[str] | None = None) -> int:
args = parse_args(argv) args = parse_args(argv)
setup_logging(args.verbose) setup_logging(args.verbose)
compose_path = Path(args.compose)
try: try:
if not compose_path.exists(): if args.command == "scan":
LOGGER.error("Compose file not found: %s", compose_path) root_path = Path(args.root)
return 1
if not compose_path.is_file(): compose_files, raw_entries = scan_projects(root_path)
LOGGER.error("Compose path is not a file: %s", compose_path)
return 1
raw_plan = classify_compose(compose_path) if args.verbose:
LOGGER.debug("Compose files found: %s", len(compose_files))
if args.verbose: include_entries, _, _ = classify_entries(raw_entries)
LOGGER.debug("Raw plan type: %s", type(raw_plan)) include_entries = dedupe_entries(include_entries)
LOGGER.debug("Raw plan repr: %r", raw_plan) include_paths = extract_paths(include_entries)
missing_include = find_missing_entries(include_entries)
include_entries, _, _ = classify_entries(raw_plan) if args.automation:
include_paths = extract_paths(include_entries) print_automation_output(
missing_include = find_missing_entries(include_entries) raw_entries,
root_path=root_path,
repo=args.repo,
compose_files=compose_files,
)
elif not args.quiet:
print_human_plan(
raw_entries,
label="Scan root",
root_path=root_path,
compose_files=compose_files,
)
if args.automation: else:
print_automation_output(raw_plan, compose_path, repo=args.repo) compose_path = Path(args.compose)
elif not args.quiet:
print_human_plan(raw_plan, compose_path) if not compose_path.exists():
LOGGER.error("Compose file not found: %s", compose_path)
return 1
if not compose_path.is_file():
LOGGER.error("Compose path is not a file: %s", compose_path)
return 1
raw_entries = classify_compose(compose_path)
if args.verbose:
LOGGER.debug("Raw plan type: %s", type(raw_entries))
LOGGER.debug("Raw plan repr: %r", raw_entries)
include_entries, _, _ = classify_entries(raw_entries)
include_entries = dedupe_entries(include_entries)
include_paths = extract_paths(include_entries)
missing_include = find_missing_entries(include_entries)
if args.automation:
print_automation_output(
raw_entries,
root_path=compose_path,
repo=args.repo,
)
elif not args.quiet:
print_human_plan(
raw_entries,
label="Compose file",
root_path=compose_path,
)
if not include_paths: if not include_paths:
LOGGER.warning("No include paths found. Nothing to back up.") LOGGER.warning("No include paths found. Nothing to back up.")
@ -378,7 +463,10 @@ def main(argv: list[str] | None = None) -> int:
return 0 return 0
except FileNotFoundError as exc: except FileNotFoundError as exc:
LOGGER.error("File not found: %s", exc) LOGGER.error("%s", exc)
return 1
except NotADirectoryError as exc:
LOGGER.error("%s", exc)
return 1 return 1
except KeyboardInterrupt: except KeyboardInterrupt:
LOGGER.error("Interrupted by user") LOGGER.error("Interrupted by user")