From 44c98f5917d601ba6795fae10e1ff3a7b0766bbb Mon Sep 17 00:00:00 2001 From: Eddie Nielsen <“ed”@edcore.dk”> Date: Tue, 24 Mar 2026 12:54:11 +0000 Subject: [PATCH] release: prepare v0.1.0 --- dockervault/analyzer.py | 34 ++ dockervault/classifier.py | 551 +--------------------------- dockervault/cli | 0 dockervault/cli.py | 465 +++++++---------------- dockervault/discovery.py | 104 +++--- dockervault/tests/test_discovery.py | 65 ++++ 6 files changed, 311 insertions(+), 908 deletions(-) create mode 100644 dockervault/analyzer.py create mode 100644 dockervault/cli create mode 100644 dockervault/tests/test_discovery.py diff --git a/dockervault/analyzer.py b/dockervault/analyzer.py new file mode 100644 index 0000000..33b6d8d --- /dev/null +++ b/dockervault/analyzer.py @@ -0,0 +1,34 @@ +from pathlib import Path +import yaml + + +def analyse_compose_file(path: Path) -> dict: + with path.open("r", encoding="utf-8") as f: + data = yaml.safe_load(f) + + services = data.get("services", {}) + + mounts = [] + + for service_name, service in services.items(): + volumes = service.get("volumes", []) + + for vol in volumes: + if isinstance(vol, str): + # bind mount format: source:target + if ":" in vol: + source, target = vol.split(":", 1) + + # kun lokale paths + if source.startswith("./") or source.startswith("/"): + mounts.append({ + "service": service_name, + "source": source, + "target": target, + "compose": path, + }) + + return { + "compose": path, + "mounts": mounts, + } diff --git a/dockervault/classifier.py b/dockervault/classifier.py index 8e3fe89..2703e1f 100644 --- a/dockervault/classifier.py +++ b/dockervault/classifier.py @@ -1,546 +1,31 @@ -from __future__ import annotations - -import json -import shutil -import subprocess from pathlib import Path -from typing import Any - -import yaml - -from .models import MountEntry -# ---------------------------- -# Image-aware rules -# ---------------------------- - -IMAGE_RULES = { - "mariadb": { - "/var/lib/mysql": "critical", - }, - "mysql": { - "/var/lib/mysql": "critical", - }, - "postgres": { - "/var/lib/postgresql/data": "critical", - }, - "redis": { - "/data": "critical", - }, - "grafana": { - "/var/lib/grafana": "critical", - }, - "prometheus": { - "/prometheus": "critical", - }, - "influxdb": { - "/var/lib/influxdb": "critical", - }, - "nginx": { - "/var/log/nginx": "optional", - }, -} - - -# ---------------------------- -# Generic rules -# ---------------------------- - -CRITICAL_TARGETS = { - "/config", - "/data", +CRITICAL_PATHS = [ "/var/lib/mysql", - "/var/lib/mariadb", - "/var/lib/postgresql/data", - "/bitnami/postgresql", - "/var/lib/redis", - "/data/db", - "/var/lib/grafana", - "/var/lib/influxdb", - "/var/lib/prometheus", - "/etc/letsencrypt", - "/acme.sh", - "/app/data", - "/srv", -} + "/data", + "/config", +] -REVIEW_TARGET_KEYWORDS = { - "backup", - "uploads", - "media", - "www", - "html", - "content", - "storage", - "files", - "database", - "db", - "config", -} -SKIP_TARGET_PREFIXES = ( - "/tmp", - "/var/tmp", - "/run", - "/var/run", - "/dev", -) - -SKIP_TARGET_EXACT = { +SKIP_PATHS = [ "/var/log", - "/var/log/nginx", - "/logs", - "/log", - "/cache", "/tmp", -} +] -CLASS_PRIORITY = { - "critical": 3, - "review": 2, - "optional": 1, - "unknown": 0, -} +def classify_mount(mount: dict) -> dict: + target = mount["target"] + # 🔥 critical + for p in CRITICAL_PATHS: + if target.startswith(p): + return {**mount, "class": "critical"} -# ---------------------------- -# Compose loader -# ---------------------------- + # 🗑 skip + for p in SKIP_PATHS: + if target.startswith(p): + return {**mount, "class": "skip"} -def load_compose(compose_path: str | Path) -> dict[str, Any]: - compose_file = Path(compose_path).expanduser().resolve() - - with compose_file.open("r", encoding="utf-8") as f: - data = yaml.safe_load(f) or {} - - if not isinstance(data, dict): - raise ValueError(f"Compose file did not parse as a mapping: {compose_file}") - - return data - - -# ---------------------------- -# Docker helpers -# ---------------------------- - -def docker_available() -> bool: - return shutil.which("docker") is not None - - -def run_docker_volume_inspect(volume_name: str) -> dict[str, Any] | None: - if not docker_available(): - return None - - try: - result = subprocess.run( - ["docker", "volume", "inspect", volume_name], - capture_output=True, - text=True, - check=False, - ) - except OSError: - return None - - if result.returncode != 0: - return None - - try: - data = json.loads(result.stdout) - except json.JSONDecodeError: - return None - - if not isinstance(data, list) or not data: - return None - - first = data[0] - if not isinstance(first, dict): - return None - - return first - - -# ---------------------------- -# Volume resolution -# ---------------------------- - -def infer_project_name(compose_path: Path, compose_data: dict[str, Any]) -> str: - top_level_name = compose_data.get("name") - if isinstance(top_level_name, str) and top_level_name.strip(): - return top_level_name.strip() - - return compose_path.parent.name - - -def normalize_top_level_volume_name( - volume_key: str, - compose_data: dict[str, Any], -) -> tuple[str | None, bool]: - volumes = compose_data.get("volumes", {}) - if not isinstance(volumes, dict): - return None, False - - cfg = volumes.get(volume_key) - if not isinstance(cfg, dict): - return None, False - - explicit_name = cfg.get("name") - if not isinstance(explicit_name, str) or not explicit_name.strip(): - explicit_name = None - - external = cfg.get("external", False) - is_external = False - - if isinstance(external, bool): - is_external = external - elif isinstance(external, dict): - is_external = True - ext_name = external.get("name") - if isinstance(ext_name, str) and ext_name.strip(): - explicit_name = ext_name.strip() - - return explicit_name, is_external - - -def build_volume_candidates( - compose_name: str, - compose_path: Path, - compose_data: dict[str, Any], -) -> list[str]: - project_name = infer_project_name(compose_path, compose_data) - explicit_name, is_external = normalize_top_level_volume_name(compose_name, compose_data) - - candidates: list[str] = [] - - if explicit_name: - candidates.append(explicit_name) - - if is_external: - candidates.append(compose_name) - - candidates.append(compose_name) - candidates.append(f"{project_name}_{compose_name}") - - unique: list[str] = [] - seen: set[str] = set() - - for candidate in candidates: - if candidate not in seen: - unique.append(candidate) - seen.add(candidate) - - return unique - - -def resolve_named_volume( - compose_name: str, - compose_path: Path, - compose_data: dict[str, Any], -) -> tuple[Path | None, str]: - if not docker_available(): - return None, "docker CLI not available" - - for candidate in build_volume_candidates(compose_name, compose_path, compose_data): - inspected = run_docker_volume_inspect(candidate) - if not inspected: - continue - - mountpoint = inspected.get("Mountpoint") - if isinstance(mountpoint, str) and mountpoint.strip(): - return Path(mountpoint), f"named volume '{compose_name}' -> docker volume '{candidate}'" - - return None, f"named volume '{compose_name}' could not be resolved" - - -# ---------------------------- -# Parsing helpers -# ---------------------------- - -def _extract_image_name(image: str | None) -> str | None: - if not image or not isinstance(image, str): - return None - - if "/" in image: - image = image.split("/")[-1] - - if ":" in image: - image = image.split(":")[0] - - return image.lower() - - -def _is_bind_source(source: str) -> bool: - return ( - source.startswith("/") - or source.startswith("./") - or source.startswith("../") - or source.startswith("~/") - ) - - -def _normalize_bind_path(source: str, compose_file: Path) -> Path: - path = Path(source).expanduser() - if path.is_absolute(): - return path.resolve() - return (compose_file.parent / path).resolve() - - -def _parse_volume_string(spec: str) -> dict[str, str | None]: - parts = spec.split(":") - - if len(parts) == 1: - return { - "source": None, - "target": parts[0], - "mode": None, - "kind": "anonymous", - } - - source = parts[0] - target = parts[1] - mode = ":".join(parts[2:]) if len(parts) > 2 else None - - kind = "bind" if _is_bind_source(source) else "named" - - return { - "source": source, - "target": target, - "mode": mode, - "kind": kind, - } - - -def _parse_volume_entry(entry: Any) -> dict[str, str | None]: - if isinstance(entry, str): - return _parse_volume_string(entry) - - if isinstance(entry, dict): - entry_type = entry.get("type") - source = entry.get("source") or entry.get("src") - target = entry.get("target") or entry.get("dst") or entry.get("destination") - - if entry_type == "bind": - kind = "bind" - elif entry_type == "volume": - kind = "named" if source else "anonymous" - else: - if isinstance(source, str) and source: - kind = "bind" if _is_bind_source(source) else "named" - else: - kind = "anonymous" - - return { - "source": source, - "target": target, - "mode": None, - "kind": kind, - } - - return { - "source": None, - "target": None, - "mode": None, - "kind": "unknown", - } - - -# ---------------------------- -# Classification logic -# ---------------------------- - -def _classify_target(target_path: str | None, image_name: str | None = None) -> tuple[str, str]: - if not target_path: - return "review", "missing container target path" - - if image_name and image_name in IMAGE_RULES: - rules = IMAGE_RULES[image_name] - if target_path in rules: - level = rules[target_path] - if level == "critical": - return "critical", f"{image_name} rule for {target_path}" - if level == "optional": - return "optional", f"{image_name} rule for {target_path}" - - if target_path in CRITICAL_TARGETS: - return "critical", f"critical target path {target_path}" - - if target_path in SKIP_TARGET_EXACT: - return "optional", f"non-essential target path {target_path}" - - if target_path.startswith(SKIP_TARGET_PREFIXES): - return "optional", f"ephemeral target path {target_path}" - - lowered = target_path.lower() - for keyword in REVIEW_TARGET_KEYWORDS: - if keyword in lowered: - return "review", f"data-like target path {target_path} requires review" - - return "review", f"unknown target path {target_path}" - - -def _merge_reason(existing: str, new: str) -> str: - if not existing: - return new - if not new or new == existing: - return existing - - parts = [p.strip() for p in existing.split(" | ") if p.strip()] - if new not in parts: - parts.append(new) - return " | ".join(parts) - - -def _prefer_entry(existing: MountEntry, new: MountEntry) -> MountEntry: - existing_priority = CLASS_PRIORITY.get(existing.classification, 0) - new_priority = CLASS_PRIORITY.get(new.classification, 0) - - if new_priority > existing_priority: - preferred = new - other = existing - else: - preferred = existing - other = new - - preferred.reason = _merge_reason(preferred.reason, other.reason) - - if other.service and other.service not in preferred.reason: - preferred.reason = _merge_reason(preferred.reason, f"also used by service={other.service} target={other.target}") - - preferred.exists = preferred.exists or other.exists - return preferred - - -def _dedupe_entries(entries: list[MountEntry]) -> list[MountEntry]: - deduped: dict[str, MountEntry] = {} - - for entry in entries: - key = str(entry.source.resolve()) if entry.source.is_absolute() else str(entry.source) - - if key not in deduped: - deduped[key] = entry - continue - - deduped[key] = _prefer_entry(deduped[key], entry) - - return list(deduped.values()) - - -def _make_entry( - source: Path, - service: str, - target: str | None, - classification: str, - reason: str, -) -> MountEntry: - return MountEntry( - source=source, - service=service, - target=target or "unknown", - classification=classification, - reason=reason, - exists=source.exists(), - ) - - -# ---------------------------- -# Main classifier -# ---------------------------- - -def classify_compose(compose_path: str | Path) -> list[MountEntry]: - compose_file = Path(compose_path).expanduser().resolve() - compose_data = load_compose(compose_file) - - services = compose_data.get("services", {}) - if not isinstance(services, dict): - return [] - - entries: list[MountEntry] = [] - - for service_name, service_cfg in services.items(): - if not isinstance(service_cfg, dict): - continue - - raw_volumes = service_cfg.get("volumes", []) - if not isinstance(raw_volumes, list): - continue - - image_name = _extract_image_name(service_cfg.get("image")) - - for raw_entry in raw_volumes: - parsed = _parse_volume_entry(raw_entry) - source = parsed.get("source") - target = parsed.get("target") - kind = parsed.get("kind") - - if kind == "anonymous": - entries.append( - MountEntry( - source=Path("/__anonymous_volume__"), - service=service_name, - target=target or "unknown", - classification="review", - reason="anonymous volume cannot be safely mapped to host path", - exists=False, - ) - ) - continue - - if kind == "bind" and isinstance(source, str): - host_path = _normalize_bind_path(source, compose_file) - classification, base_reason = _classify_target(target, image_name) - reason = f"{base_reason}; bind mount source '{source}' -> '{host_path}'" - - entries.append( - _make_entry( - source=host_path, - service=service_name, - target=target, - classification=classification, - reason=reason, - ) - ) - continue - - if kind == "named" and isinstance(source, str): - mountpoint, volume_reason = resolve_named_volume(source, compose_file, compose_data) - - if mountpoint is None: - entries.append( - MountEntry( - source=Path(f"/__named_volume_unresolved__/{source}"), - service=service_name, - target=target or "unknown", - classification="review", - reason=volume_reason, - exists=False, - ) - ) - continue - - classification, base_reason = _classify_target(target, image_name) - reason = f"{base_reason}; {volume_reason}; mountpoint '{mountpoint}'" - - entries.append( - _make_entry( - source=mountpoint, - service=service_name, - target=target, - classification=classification, - reason=reason, - ) - ) - continue - - entries.append( - MountEntry( - source=Path("/__unknown_volume__"), - service=service_name, - target=target or "unknown", - classification="review", - reason="unrecognized volume entry", - exists=False, - ) - ) - - return _dedupe_entries(entries) + # 🤔 fallback + return {**mount, "class": "review"} diff --git a/dockervault/cli b/dockervault/cli new file mode 100644 index 0000000..e69de29 diff --git a/dockervault/cli.py b/dockervault/cli.py index d5abd35..6158755 100644 --- a/dockervault/cli.py +++ b/dockervault/cli.py @@ -1,383 +1,190 @@ -from __future__ import annotations - import argparse -import json -import logging -import shlex -import shutil -import socket -import subprocess -from datetime import datetime from pathlib import Path -from typing import Any, Iterable +from typing import List +from concurrent.futures import ThreadPoolExecutor +import socket -from . import __version__ -from .classifier import classify_compose -from .discovery import find_compose_files - -LOGGER = logging.getLogger("dockervault") +from dockervault.discovery import discover_compose_files +from dockervault.analyzer import analyse_compose_file +from dockervault.classifier import classify_mount -def setup_logging(verbose: bool = False) -> None: - level = logging.DEBUG if verbose else logging.INFO - logging.basicConfig(level=level, format="%(levelname)s: %(message)s") - - -# 🔥 NEW: validation for max-depth -def non_negative_int(value: str) -> int: - ivalue = int(value) - if ivalue < 0: - raise argparse.ArgumentTypeError("must be 0 or greater") - return ivalue - - -def safe_get(obj: Any, key: str, default: Any = None) -> Any: - if obj is None: - return default - if isinstance(obj, dict): - return obj.get(key, default) - return getattr(obj, key, default) - - -def normalize_entries(entries: Any) -> list[dict[str, Any]]: - if not entries: - return [] - - if not isinstance(entries, (list, tuple)): - entries = [entries] - - normalized: list[dict[str, Any]] = [] - - for entry in entries: - if isinstance(entry, dict): - normalized.append( - { - "path": entry.get("path") or entry.get("source") or entry.get("host_path"), - "priority": entry.get("priority") or entry.get("classification"), - "service": entry.get("service"), - "target": entry.get("target") or entry.get("container_path"), - "source_type": entry.get("source_type"), - "reason": entry.get("reason"), - "exists": entry.get("exists"), - "compose_file": entry.get("compose_file"), - } - ) - else: - normalized.append( - { - "path": safe_get(entry, "path", safe_get(entry, "source")), - "priority": safe_get(entry, "priority", safe_get(entry, "classification")), - "service": safe_get(entry, "service"), - "target": safe_get(entry, "target", safe_get(entry, "container_path")), - "source_type": safe_get(entry, "source_type"), - "reason": safe_get(entry, "reason"), - "exists": safe_get(entry, "exists"), - "compose_file": safe_get(entry, "compose_file"), - } - ) - - return normalized - - -def classify_entries( - raw_entries: Any, -) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]: - entries = normalize_entries(raw_entries) - - include_entries: list[dict[str, Any]] = [] - review_entries: list[dict[str, Any]] = [] - skip_entries: list[dict[str, Any]] = [] - - for entry in entries: - classification = str(entry.get("priority") or "").strip().lower() - - if classification == "critical": - include_entries.append(entry) - elif classification in {"optional", "skip", "ignored"}: - skip_entries.append(entry) - else: - review_entries.append(entry) - - return include_entries, review_entries, skip_entries - - -def dedupe_entries(entries: Iterable[dict[str, Any]]) -> list[dict[str, Any]]: - deduped: list[dict[str, Any]] = [] - seen: set[str] = set() - - for entry in entries: - path = entry.get("path") - if not path: - continue - - key = str(path) - if key in seen: - continue - - seen.add(key) - deduped.append(entry) - - return deduped - - -def extract_paths(entries: Iterable[dict[str, Any]]) -> list[str]: - return [str(entry["path"]) for entry in dedupe_entries(entries) if entry.get("path")] - - -def find_missing_entries(entries: Iterable[dict[str, Any]]) -> list[dict[str, Any]]: - return [entry for entry in entries if entry.get("exists") is False] - - -def entry_to_line(entry: dict[str, Any]) -> str: - path = entry.get("path") or "(unknown)" - priority = entry.get("priority") or "unknown" - service = entry.get("service") or "unknown" - target = entry.get("target") or "unknown" - exists = entry.get("exists") - compose_file = entry.get("compose_file") - - extra = [] - if compose_file: - extra.append(f"compose={compose_file}") - if entry.get("source_type"): - extra.append(f"type={entry['source_type']}") - if exists is not None: - extra.append(f"exists={exists}") - if entry.get("reason"): - extra.append(f"reason={entry['reason']}") - - suffix = f" ({', '.join(extra)})" if extra else "" - return f" - {path} [{priority}] service={service} target={target}{suffix}" - - -def default_archive_name() -> str: - hostname = socket.gethostname() - now = datetime.now().strftime("%Y-%m-%d_%H-%M") - return f"{hostname}-{now}" - - -def build_borg_command(repo: str, archive_name: str, include_paths: list[str]) -> list[str]: - if not repo or not include_paths: - return [] - - cmd = [ - "borg", - "create", - "--stats", - "--progress", - f"{repo}::{archive_name}", - ] - cmd.extend(include_paths) - return cmd - - -def ensure_borg_available() -> bool: - if shutil.which("borg") is None: - LOGGER.error("Borg binary not found in PATH") - return False - return True - - -def scan_projects( - scan_root: Path, - max_depth: int | None = None, - excludes: list[str] | None = None, -) -> tuple[list[Path], list[dict[str, Any]]]: - compose_files = find_compose_files( - scan_root, - excludes=excludes, - max_depth=max_depth, - ) - - all_entries: list[dict[str, Any]] = [] - - for compose_file in compose_files: - raw_entries = classify_compose(compose_file) - normalized = normalize_entries(raw_entries) - - for entry in normalized: - entry["compose_file"] = str(compose_file.resolve()) - all_entries.append(entry) - - return compose_files, all_entries - - -def print_human_plan( - raw_entries: Any, - label: str, - root_path: Path, - compose_files: list[Path] | None = None, -) -> None: - include_entries, review_entries, skip_entries = classify_entries(raw_entries) - - include_entries = dedupe_entries(include_entries) - review_entries = dedupe_entries(review_entries) - skip_entries = dedupe_entries(skip_entries) - - missing_include = find_missing_entries(include_entries) +def print_plan(scan_root: Path, classified_mounts: List[dict], quiet: bool): + if quiet: + return [m for m in classified_mounts if m["class"] == "critical"] print() print("DockerVault Backup Plan") print("=======================") - print(f"{label}: {root_path.resolve()}") - - if compose_files is not None: - print(f"Compose files found: {len(compose_files)}") - + print(f"Scan root: {scan_root}") print() + include = [] + review = [] + skip = [] + + for m in classified_mounts: + cls = m["class"] + + if cls == "critical": + include.append(m) + elif cls == "review": + review.append(m) + else: + skip.append(m) + print("INCLUDE PATHS:") - if include_entries: - for entry in include_entries: - print(entry_to_line(entry)) + if include: + for m in include: + print( + f" - {m['source']} " + f"[{m['class']}] " + f"service={m['service']} " + f"target={m['target']} " + f"(exists={m['exists']})" + ) else: - print(" - (none)") + print(" (none)") + print() - if missing_include: - print("WARNING: Missing critical paths detected") - for entry in missing_include: - print(f" - {entry.get('path')} (service={entry.get('service')})") - print() - print("REVIEW PATHS:") - if review_entries: - for entry in review_entries: - print(entry_to_line(entry)) + if review: + for m in review: + print( + f" - {m['source']} " + f"[{m['class']}] " + f"service={m['service']} " + f"target={m['target']} " + f"(exists={m['exists']})" + ) else: - print(" - (none)") + print(" (none)") + print() print("SKIP PATHS:") - if skip_entries: - for entry in skip_entries: - print(entry_to_line(entry)) + if skip: + for m in skip: + print( + f" - {m['source']} " + f"[{m['class']}] " + f"service={m['service']} " + f"target={m['target']} " + f"(exists={m['exists']})" + ) else: - print(" - (none)") - print() + print(" (none)") + + return include -def print_automation_output( - raw_entries: Any, - root_path: Path, - repo: str | None = None, - compose_files: list[Path] | None = None, -) -> None: - include_entries, review_entries, skip_entries = classify_entries(raw_entries) +def print_borg_command(include: List[dict], repo: str, quiet: bool): + if not repo: + return - include_entries = dedupe_entries(include_entries) - review_entries = dedupe_entries(review_entries) - skip_entries = dedupe_entries(skip_entries) + valid_paths = sorted({ + m["source"] for m in include if m["exists"] + }) - include_paths = extract_paths(include_entries) - missing_include = find_missing_entries(include_entries) - - payload: dict[str, Any] = { - "root": str(root_path.resolve()), - "include_paths": include_paths, - "review_paths": extract_paths(review_entries), - "skip_paths": extract_paths(skip_entries), - "missing_critical_paths": [str(entry["path"]) for entry in missing_include if entry.get("path")], - } - - if compose_files is not None: - payload["compose_files"] = [str(path.resolve()) for path in compose_files] - - if repo: - archive_name = default_archive_name() - payload["repo"] = repo - payload["archive_name"] = archive_name - payload["borg_command"] = build_borg_command(repo, archive_name, include_paths) - - print(json.dumps(payload, indent=2)) - - -def run_borg_command(cmd: list[str], dry_run: bool = False, quiet: bool = False) -> int: - if not cmd: - LOGGER.error("No Borg command to run") - return 1 - - if dry_run: + if not valid_paths: if not quiet: - print("Dry run - Borg command:") - print(" ".join(shlex.quote(part) for part in cmd)) - return 0 + print() + print("No valid paths for borg backup") + print("Reason: all critical paths are missing (exists=False)") + return - if not ensure_borg_available(): - return 1 + hostname = socket.gethostname() - if not quiet: - print("Running Borg command:") - print(" ".join(shlex.quote(part) for part in cmd)) + if quiet: + print(" ".join(valid_paths)) + return - result = subprocess.run(cmd, check=False) - return result.returncode + print() + print("Suggested borg create command") + print("=============================") + + print("borg create --stats --progress \\") + print(f" {repo}::{{hostname}}-{{now:%Y-%m-%d_%H-%M}} \\") + + for p in valid_paths: + print(f" {p} \\") -def build_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser(prog="dockervault") +def build_parser(): + parser = argparse.ArgumentParser() - parser.add_argument("--repo") - parser.add_argument("--run-borg", action="store_true") - parser.add_argument("--dry-run", action="store_true") - parser.add_argument("--automation", action="store_true") - parser.add_argument("--quiet", action="store_true") - parser.add_argument("--verbose", action="store_true") - parser.add_argument("--version", action="version", version=__version__) + subparsers = parser.add_subparsers(dest="command") - subparsers = parser.add_subparsers(dest="command", required=True) + scan = subparsers.add_parser("scan") - plan_parser = subparsers.add_parser("plan") - plan_parser.add_argument("path") + scan.add_argument("path") + scan.add_argument("--repo") + scan.add_argument("--max-depth", type=int, default=None) + scan.add_argument("--exclude", action="append", default=[]) - scan_parser = subparsers.add_parser("scan") - scan_parser.add_argument("path") - - scan_parser.add_argument( - "--max-depth", - type=non_negative_int, - default=None, - help="Maximum directory depth to scan", - ) - - scan_parser.add_argument( - "--exclude", - action="append", - default=[], - help="Additional directory name to exclude (can be used multiple times)", - ) + scan.add_argument("--quiet", action="store_true") + scan.add_argument("--automation", action="store_true") return parser -def main() -> int: +def main(): parser = build_parser() args = parser.parse_args() - setup_logging(args.verbose) - if args.command == "scan": - root = Path(args.path) + scan_root = Path(args.path).resolve() - compose_files, entries = scan_projects( - root, + if not scan_root.exists(): + if not args.quiet: + print(f"ERROR: Path does not exist: {scan_root}") + return 2 + + compose_files = discover_compose_files( + root=scan_root, max_depth=args.max_depth, excludes=args.exclude, ) - print_human_plan(entries, "Scan root", root, compose_files) + with ThreadPoolExecutor() as executor: + results = list(executor.map(analyse_compose_file, compose_files)) + classified_mounts = [] + + for r in results: + for m in r["mounts"]: + classified = classify_mount(m) + + compose_dir = r["compose"].parent + source_path = (compose_dir / classified["source"]).resolve() + + classified["source"] = str(source_path) + classified["exists"] = source_path.exists() + + classified_mounts.append(classified) + + missing_critical = [ + m for m in classified_mounts + if m["class"] == "critical" and not m["exists"] + ] + + if missing_critical and not args.quiet: + print() + print("WARNING: Missing critical paths detected") + for m in missing_critical: + print(f" - {m['source']} (service={m['service']})") + print() + + include = print_plan(scan_root, classified_mounts, args.quiet) + + print_borg_command(include, args.repo, args.quiet) + + if missing_critical: + return 1 return 0 - if args.command == "plan": - root = Path(args.path) - - entries = normalize_entries(classify_compose(root)) - print_human_plan(entries, "Compose file", root, [root]) - - return 0 - - return 0 + return 1 if __name__ == "__main__": diff --git a/dockervault/discovery.py b/dockervault/discovery.py index 2531bc6..f0c6a61 100644 --- a/dockervault/discovery.py +++ b/dockervault/discovery.py @@ -1,28 +1,8 @@ -from __future__ import annotations - -import os from pathlib import Path -from typing import Iterable - - -DEFAULT_SCAN_EXCLUDES = { - ".git", - ".hg", - ".svn", - ".venv", - "venv", - "env", - "node_modules", - "__pycache__", - ".pytest_cache", - ".mypy_cache", - ".tox", - ".cache", - ".idea", - ".vscode", -} +from typing import List, Optional +# Kendte compose filnavne COMPOSE_FILENAMES = { "docker-compose.yml", "docker-compose.yaml", @@ -31,34 +11,66 @@ COMPOSE_FILENAMES = { } -def find_compose_files( - root: Path | str, - excludes: Iterable[str] | None = None, - max_depth: int | None = None, -) -> list[Path]: - root_path = Path(root).resolve() - root_depth = len(root_path.parts) +# Default mapper vi altid ignorerer (støj + performance) +DEFAULT_EXCLUDES = { + ".git", + ".venv", + "node_modules", + "__pycache__", +} - exclude_set = set(DEFAULT_SCAN_EXCLUDES) - if excludes: - exclude_set.update(x.strip() for x in excludes if x and x.strip()) - found: set[Path] = set() +def should_exclude(path: Path, excludes: Optional[List[str]]) -> bool: + """ + Returnerer True hvis path skal ignoreres. + Matcher på path-dele (ikke substring). + """ + parts = set(path.parts) + combined = set(excludes or []) | DEFAULT_EXCLUDES + return any(ex in parts for ex in combined) - for current_root, dirnames, filenames in os.walk(root_path, topdown=True): - current_path = Path(current_root) - current_depth = len(current_path.parts) - root_depth - if max_depth is not None and current_depth >= max_depth: - dirnames[:] = [] +def discover_compose_files( + root: Path, + max_depth: Optional[int] = None, + excludes: Optional[List[str]] = None, +) -> List[Path]: + """ + Finder docker-compose filer i et directory tree. - dirnames[:] = sorted( - d for d in dirnames - if d not in exclude_set - ) + Args: + root: start directory + max_depth: max dybde (0 = kun root) + excludes: liste af directory navne der skal ignoreres - for filename in filenames: - if filename in COMPOSE_FILENAMES: - found.add((current_path / filename).resolve()) + Returns: + Liste af fundne compose filer (sorteret) + """ + root = root.resolve() + results: List[Path] = [] - return sorted(found) + def walk(current: Path, depth: int): + # Stop hvis vi er for dybt + if max_depth is not None and depth > max_depth: + return + + try: + for entry in current.iterdir(): + # 🔥 vigtig: skip før traversal (performance + korrekthed) + if should_exclude(entry, excludes): + continue + + if entry.is_dir(): + walk(entry, depth + 1) + + elif entry.is_file() and entry.name in COMPOSE_FILENAMES: + results.append(entry) + + except PermissionError: + # Ignorer mapper vi ikke har adgang til + pass + + walk(root, depth=0) + + # Stabil rækkefølge (vigtigt for tests og CLI output) + return sorted(results) diff --git a/dockervault/tests/test_discovery.py b/dockervault/tests/test_discovery.py new file mode 100644 index 0000000..ad2351c --- /dev/null +++ b/dockervault/tests/test_discovery.py @@ -0,0 +1,65 @@ +from pathlib import Path +from dockervault.discovery import discover_compose_files + + +def write_compose(base: Path, rel: str): + path = base / rel + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text("services: {}\n") + return path + + +def test_basic_discovery(tmp_path: Path): + write_compose(tmp_path, "app1/docker-compose.yml") + write_compose(tmp_path, "app2/docker-compose.yml") + + results = discover_compose_files(tmp_path) + + assert len(results) == 2 + + +def test_max_depth(tmp_path: Path): + write_compose(tmp_path, "root.yml") + write_compose(tmp_path, "a/docker-compose.yml") + write_compose(tmp_path, "a/b/docker-compose.yml") + + results = discover_compose_files(tmp_path, max_depth=1) + + paths = [str(p) for p in results] + + assert any("a/docker-compose.yml" in p for p in paths) + assert not any("a/b/docker-compose.yml" in p for p in paths) + + +def test_exclude_directory(tmp_path: Path): + write_compose(tmp_path, "app1/docker-compose.yml") + write_compose(tmp_path, "app2/docker-compose.yml") + + results = discover_compose_files(tmp_path, excludes=["app2"]) + + paths = [str(p) for p in results] + + assert any("app1" in p for p in paths) + assert not any("app2" in p for p in paths) + + +def test_default_excludes(tmp_path: Path): + write_compose(tmp_path, ".git/test/docker-compose.yml") + write_compose(tmp_path, ".venv/test/docker-compose.yml") + write_compose(tmp_path, "node_modules/test/docker-compose.yml") + write_compose(tmp_path, "app/docker-compose.yml") + + results = discover_compose_files(tmp_path) + + paths = [str(p) for p in results] + + assert len(paths) == 1 + assert "app/docker-compose.yml" in paths[0] + + +def test_exclude_prevents_traversal(tmp_path: Path): + write_compose(tmp_path, "skipme/a/docker-compose.yml") + + results = discover_compose_files(tmp_path, excludes=["skipme"]) + + assert len(results) == 0