release: prepare v0.1.0

This commit is contained in:
Eddie Nielsen 2026-03-24 12:54:11 +00:00
parent 51f2063389
commit 44c98f5917
6 changed files with 311 additions and 908 deletions

34
dockervault/analyzer.py Normal file
View file

@ -0,0 +1,34 @@
from pathlib import Path
import yaml
def analyse_compose_file(path: Path) -> dict:
with path.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f)
services = data.get("services", {})
mounts = []
for service_name, service in services.items():
volumes = service.get("volumes", [])
for vol in volumes:
if isinstance(vol, str):
# bind mount format: source:target
if ":" in vol:
source, target = vol.split(":", 1)
# kun lokale paths
if source.startswith("./") or source.startswith("/"):
mounts.append({
"service": service_name,
"source": source,
"target": target,
"compose": path,
})
return {
"compose": path,
"mounts": mounts,
}

View file

@ -1,546 +1,31 @@
from __future__ import annotations
import json
import shutil
import subprocess
from pathlib import Path
from typing import Any
import yaml
from .models import MountEntry
# ----------------------------
# Image-aware rules
# ----------------------------
IMAGE_RULES = {
"mariadb": {
"/var/lib/mysql": "critical",
},
"mysql": {
"/var/lib/mysql": "critical",
},
"postgres": {
"/var/lib/postgresql/data": "critical",
},
"redis": {
"/data": "critical",
},
"grafana": {
"/var/lib/grafana": "critical",
},
"prometheus": {
"/prometheus": "critical",
},
"influxdb": {
"/var/lib/influxdb": "critical",
},
"nginx": {
"/var/log/nginx": "optional",
},
}
# ----------------------------
# Generic rules
# ----------------------------
CRITICAL_TARGETS = {
"/config",
"/data",
CRITICAL_PATHS = [
"/var/lib/mysql",
"/var/lib/mariadb",
"/var/lib/postgresql/data",
"/bitnami/postgresql",
"/var/lib/redis",
"/data/db",
"/var/lib/grafana",
"/var/lib/influxdb",
"/var/lib/prometheus",
"/etc/letsencrypt",
"/acme.sh",
"/app/data",
"/srv",
}
"/data",
"/config",
]
REVIEW_TARGET_KEYWORDS = {
"backup",
"uploads",
"media",
"www",
"html",
"content",
"storage",
"files",
"database",
"db",
"config",
}
SKIP_TARGET_PREFIXES = (
"/tmp",
"/var/tmp",
"/run",
"/var/run",
"/dev",
)
SKIP_TARGET_EXACT = {
SKIP_PATHS = [
"/var/log",
"/var/log/nginx",
"/logs",
"/log",
"/cache",
"/tmp",
}
]
CLASS_PRIORITY = {
"critical": 3,
"review": 2,
"optional": 1,
"unknown": 0,
}
def classify_mount(mount: dict) -> dict:
target = mount["target"]
# 🔥 critical
for p in CRITICAL_PATHS:
if target.startswith(p):
return {**mount, "class": "critical"}
# ----------------------------
# Compose loader
# ----------------------------
# 🗑 skip
for p in SKIP_PATHS:
if target.startswith(p):
return {**mount, "class": "skip"}
def load_compose(compose_path: str | Path) -> dict[str, Any]:
compose_file = Path(compose_path).expanduser().resolve()
with compose_file.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
if not isinstance(data, dict):
raise ValueError(f"Compose file did not parse as a mapping: {compose_file}")
return data
# ----------------------------
# Docker helpers
# ----------------------------
def docker_available() -> bool:
return shutil.which("docker") is not None
def run_docker_volume_inspect(volume_name: str) -> dict[str, Any] | None:
if not docker_available():
return None
try:
result = subprocess.run(
["docker", "volume", "inspect", volume_name],
capture_output=True,
text=True,
check=False,
)
except OSError:
return None
if result.returncode != 0:
return None
try:
data = json.loads(result.stdout)
except json.JSONDecodeError:
return None
if not isinstance(data, list) or not data:
return None
first = data[0]
if not isinstance(first, dict):
return None
return first
# ----------------------------
# Volume resolution
# ----------------------------
def infer_project_name(compose_path: Path, compose_data: dict[str, Any]) -> str:
top_level_name = compose_data.get("name")
if isinstance(top_level_name, str) and top_level_name.strip():
return top_level_name.strip()
return compose_path.parent.name
def normalize_top_level_volume_name(
volume_key: str,
compose_data: dict[str, Any],
) -> tuple[str | None, bool]:
volumes = compose_data.get("volumes", {})
if not isinstance(volumes, dict):
return None, False
cfg = volumes.get(volume_key)
if not isinstance(cfg, dict):
return None, False
explicit_name = cfg.get("name")
if not isinstance(explicit_name, str) or not explicit_name.strip():
explicit_name = None
external = cfg.get("external", False)
is_external = False
if isinstance(external, bool):
is_external = external
elif isinstance(external, dict):
is_external = True
ext_name = external.get("name")
if isinstance(ext_name, str) and ext_name.strip():
explicit_name = ext_name.strip()
return explicit_name, is_external
def build_volume_candidates(
compose_name: str,
compose_path: Path,
compose_data: dict[str, Any],
) -> list[str]:
project_name = infer_project_name(compose_path, compose_data)
explicit_name, is_external = normalize_top_level_volume_name(compose_name, compose_data)
candidates: list[str] = []
if explicit_name:
candidates.append(explicit_name)
if is_external:
candidates.append(compose_name)
candidates.append(compose_name)
candidates.append(f"{project_name}_{compose_name}")
unique: list[str] = []
seen: set[str] = set()
for candidate in candidates:
if candidate not in seen:
unique.append(candidate)
seen.add(candidate)
return unique
def resolve_named_volume(
compose_name: str,
compose_path: Path,
compose_data: dict[str, Any],
) -> tuple[Path | None, str]:
if not docker_available():
return None, "docker CLI not available"
for candidate in build_volume_candidates(compose_name, compose_path, compose_data):
inspected = run_docker_volume_inspect(candidate)
if not inspected:
continue
mountpoint = inspected.get("Mountpoint")
if isinstance(mountpoint, str) and mountpoint.strip():
return Path(mountpoint), f"named volume '{compose_name}' -> docker volume '{candidate}'"
return None, f"named volume '{compose_name}' could not be resolved"
# ----------------------------
# Parsing helpers
# ----------------------------
def _extract_image_name(image: str | None) -> str | None:
if not image or not isinstance(image, str):
return None
if "/" in image:
image = image.split("/")[-1]
if ":" in image:
image = image.split(":")[0]
return image.lower()
def _is_bind_source(source: str) -> bool:
return (
source.startswith("/")
or source.startswith("./")
or source.startswith("../")
or source.startswith("~/")
)
def _normalize_bind_path(source: str, compose_file: Path) -> Path:
path = Path(source).expanduser()
if path.is_absolute():
return path.resolve()
return (compose_file.parent / path).resolve()
def _parse_volume_string(spec: str) -> dict[str, str | None]:
parts = spec.split(":")
if len(parts) == 1:
return {
"source": None,
"target": parts[0],
"mode": None,
"kind": "anonymous",
}
source = parts[0]
target = parts[1]
mode = ":".join(parts[2:]) if len(parts) > 2 else None
kind = "bind" if _is_bind_source(source) else "named"
return {
"source": source,
"target": target,
"mode": mode,
"kind": kind,
}
def _parse_volume_entry(entry: Any) -> dict[str, str | None]:
if isinstance(entry, str):
return _parse_volume_string(entry)
if isinstance(entry, dict):
entry_type = entry.get("type")
source = entry.get("source") or entry.get("src")
target = entry.get("target") or entry.get("dst") or entry.get("destination")
if entry_type == "bind":
kind = "bind"
elif entry_type == "volume":
kind = "named" if source else "anonymous"
else:
if isinstance(source, str) and source:
kind = "bind" if _is_bind_source(source) else "named"
else:
kind = "anonymous"
return {
"source": source,
"target": target,
"mode": None,
"kind": kind,
}
return {
"source": None,
"target": None,
"mode": None,
"kind": "unknown",
}
# ----------------------------
# Classification logic
# ----------------------------
def _classify_target(target_path: str | None, image_name: str | None = None) -> tuple[str, str]:
if not target_path:
return "review", "missing container target path"
if image_name and image_name in IMAGE_RULES:
rules = IMAGE_RULES[image_name]
if target_path in rules:
level = rules[target_path]
if level == "critical":
return "critical", f"{image_name} rule for {target_path}"
if level == "optional":
return "optional", f"{image_name} rule for {target_path}"
if target_path in CRITICAL_TARGETS:
return "critical", f"critical target path {target_path}"
if target_path in SKIP_TARGET_EXACT:
return "optional", f"non-essential target path {target_path}"
if target_path.startswith(SKIP_TARGET_PREFIXES):
return "optional", f"ephemeral target path {target_path}"
lowered = target_path.lower()
for keyword in REVIEW_TARGET_KEYWORDS:
if keyword in lowered:
return "review", f"data-like target path {target_path} requires review"
return "review", f"unknown target path {target_path}"
def _merge_reason(existing: str, new: str) -> str:
if not existing:
return new
if not new or new == existing:
return existing
parts = [p.strip() for p in existing.split(" | ") if p.strip()]
if new not in parts:
parts.append(new)
return " | ".join(parts)
def _prefer_entry(existing: MountEntry, new: MountEntry) -> MountEntry:
existing_priority = CLASS_PRIORITY.get(existing.classification, 0)
new_priority = CLASS_PRIORITY.get(new.classification, 0)
if new_priority > existing_priority:
preferred = new
other = existing
else:
preferred = existing
other = new
preferred.reason = _merge_reason(preferred.reason, other.reason)
if other.service and other.service not in preferred.reason:
preferred.reason = _merge_reason(preferred.reason, f"also used by service={other.service} target={other.target}")
preferred.exists = preferred.exists or other.exists
return preferred
def _dedupe_entries(entries: list[MountEntry]) -> list[MountEntry]:
deduped: dict[str, MountEntry] = {}
for entry in entries:
key = str(entry.source.resolve()) if entry.source.is_absolute() else str(entry.source)
if key not in deduped:
deduped[key] = entry
continue
deduped[key] = _prefer_entry(deduped[key], entry)
return list(deduped.values())
def _make_entry(
source: Path,
service: str,
target: str | None,
classification: str,
reason: str,
) -> MountEntry:
return MountEntry(
source=source,
service=service,
target=target or "unknown",
classification=classification,
reason=reason,
exists=source.exists(),
)
# ----------------------------
# Main classifier
# ----------------------------
def classify_compose(compose_path: str | Path) -> list[MountEntry]:
compose_file = Path(compose_path).expanduser().resolve()
compose_data = load_compose(compose_file)
services = compose_data.get("services", {})
if not isinstance(services, dict):
return []
entries: list[MountEntry] = []
for service_name, service_cfg in services.items():
if not isinstance(service_cfg, dict):
continue
raw_volumes = service_cfg.get("volumes", [])
if not isinstance(raw_volumes, list):
continue
image_name = _extract_image_name(service_cfg.get("image"))
for raw_entry in raw_volumes:
parsed = _parse_volume_entry(raw_entry)
source = parsed.get("source")
target = parsed.get("target")
kind = parsed.get("kind")
if kind == "anonymous":
entries.append(
MountEntry(
source=Path("/__anonymous_volume__"),
service=service_name,
target=target or "unknown",
classification="review",
reason="anonymous volume cannot be safely mapped to host path",
exists=False,
)
)
continue
if kind == "bind" and isinstance(source, str):
host_path = _normalize_bind_path(source, compose_file)
classification, base_reason = _classify_target(target, image_name)
reason = f"{base_reason}; bind mount source '{source}' -> '{host_path}'"
entries.append(
_make_entry(
source=host_path,
service=service_name,
target=target,
classification=classification,
reason=reason,
)
)
continue
if kind == "named" and isinstance(source, str):
mountpoint, volume_reason = resolve_named_volume(source, compose_file, compose_data)
if mountpoint is None:
entries.append(
MountEntry(
source=Path(f"/__named_volume_unresolved__/{source}"),
service=service_name,
target=target or "unknown",
classification="review",
reason=volume_reason,
exists=False,
)
)
continue
classification, base_reason = _classify_target(target, image_name)
reason = f"{base_reason}; {volume_reason}; mountpoint '{mountpoint}'"
entries.append(
_make_entry(
source=mountpoint,
service=service_name,
target=target,
classification=classification,
reason=reason,
)
)
continue
entries.append(
MountEntry(
source=Path("/__unknown_volume__"),
service=service_name,
target=target or "unknown",
classification="review",
reason="unrecognized volume entry",
exists=False,
)
)
return _dedupe_entries(entries)
# 🤔 fallback
return {**mount, "class": "review"}

0
dockervault/cli Normal file
View file

View file

@ -1,383 +1,190 @@
from __future__ import annotations
import argparse
import json
import logging
import shlex
import shutil
import socket
import subprocess
from datetime import datetime
from pathlib import Path
from typing import Any, Iterable
from typing import List
from concurrent.futures import ThreadPoolExecutor
import socket
from . import __version__
from .classifier import classify_compose
from .discovery import find_compose_files
LOGGER = logging.getLogger("dockervault")
from dockervault.discovery import discover_compose_files
from dockervault.analyzer import analyse_compose_file
from dockervault.classifier import classify_mount
def setup_logging(verbose: bool = False) -> None:
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(level=level, format="%(levelname)s: %(message)s")
# 🔥 NEW: validation for max-depth
def non_negative_int(value: str) -> int:
ivalue = int(value)
if ivalue < 0:
raise argparse.ArgumentTypeError("must be 0 or greater")
return ivalue
def safe_get(obj: Any, key: str, default: Any = None) -> Any:
if obj is None:
return default
if isinstance(obj, dict):
return obj.get(key, default)
return getattr(obj, key, default)
def normalize_entries(entries: Any) -> list[dict[str, Any]]:
if not entries:
return []
if not isinstance(entries, (list, tuple)):
entries = [entries]
normalized: list[dict[str, Any]] = []
for entry in entries:
if isinstance(entry, dict):
normalized.append(
{
"path": entry.get("path") or entry.get("source") or entry.get("host_path"),
"priority": entry.get("priority") or entry.get("classification"),
"service": entry.get("service"),
"target": entry.get("target") or entry.get("container_path"),
"source_type": entry.get("source_type"),
"reason": entry.get("reason"),
"exists": entry.get("exists"),
"compose_file": entry.get("compose_file"),
}
)
else:
normalized.append(
{
"path": safe_get(entry, "path", safe_get(entry, "source")),
"priority": safe_get(entry, "priority", safe_get(entry, "classification")),
"service": safe_get(entry, "service"),
"target": safe_get(entry, "target", safe_get(entry, "container_path")),
"source_type": safe_get(entry, "source_type"),
"reason": safe_get(entry, "reason"),
"exists": safe_get(entry, "exists"),
"compose_file": safe_get(entry, "compose_file"),
}
)
return normalized
def classify_entries(
raw_entries: Any,
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
entries = normalize_entries(raw_entries)
include_entries: list[dict[str, Any]] = []
review_entries: list[dict[str, Any]] = []
skip_entries: list[dict[str, Any]] = []
for entry in entries:
classification = str(entry.get("priority") or "").strip().lower()
if classification == "critical":
include_entries.append(entry)
elif classification in {"optional", "skip", "ignored"}:
skip_entries.append(entry)
else:
review_entries.append(entry)
return include_entries, review_entries, skip_entries
def dedupe_entries(entries: Iterable[dict[str, Any]]) -> list[dict[str, Any]]:
deduped: list[dict[str, Any]] = []
seen: set[str] = set()
for entry in entries:
path = entry.get("path")
if not path:
continue
key = str(path)
if key in seen:
continue
seen.add(key)
deduped.append(entry)
return deduped
def extract_paths(entries: Iterable[dict[str, Any]]) -> list[str]:
return [str(entry["path"]) for entry in dedupe_entries(entries) if entry.get("path")]
def find_missing_entries(entries: Iterable[dict[str, Any]]) -> list[dict[str, Any]]:
return [entry for entry in entries if entry.get("exists") is False]
def entry_to_line(entry: dict[str, Any]) -> str:
path = entry.get("path") or "(unknown)"
priority = entry.get("priority") or "unknown"
service = entry.get("service") or "unknown"
target = entry.get("target") or "unknown"
exists = entry.get("exists")
compose_file = entry.get("compose_file")
extra = []
if compose_file:
extra.append(f"compose={compose_file}")
if entry.get("source_type"):
extra.append(f"type={entry['source_type']}")
if exists is not None:
extra.append(f"exists={exists}")
if entry.get("reason"):
extra.append(f"reason={entry['reason']}")
suffix = f" ({', '.join(extra)})" if extra else ""
return f" - {path} [{priority}] service={service} target={target}{suffix}"
def default_archive_name() -> str:
hostname = socket.gethostname()
now = datetime.now().strftime("%Y-%m-%d_%H-%M")
return f"{hostname}-{now}"
def build_borg_command(repo: str, archive_name: str, include_paths: list[str]) -> list[str]:
if not repo or not include_paths:
return []
cmd = [
"borg",
"create",
"--stats",
"--progress",
f"{repo}::{archive_name}",
]
cmd.extend(include_paths)
return cmd
def ensure_borg_available() -> bool:
if shutil.which("borg") is None:
LOGGER.error("Borg binary not found in PATH")
return False
return True
def scan_projects(
scan_root: Path,
max_depth: int | None = None,
excludes: list[str] | None = None,
) -> tuple[list[Path], list[dict[str, Any]]]:
compose_files = find_compose_files(
scan_root,
excludes=excludes,
max_depth=max_depth,
)
all_entries: list[dict[str, Any]] = []
for compose_file in compose_files:
raw_entries = classify_compose(compose_file)
normalized = normalize_entries(raw_entries)
for entry in normalized:
entry["compose_file"] = str(compose_file.resolve())
all_entries.append(entry)
return compose_files, all_entries
def print_human_plan(
raw_entries: Any,
label: str,
root_path: Path,
compose_files: list[Path] | None = None,
) -> None:
include_entries, review_entries, skip_entries = classify_entries(raw_entries)
include_entries = dedupe_entries(include_entries)
review_entries = dedupe_entries(review_entries)
skip_entries = dedupe_entries(skip_entries)
missing_include = find_missing_entries(include_entries)
def print_plan(scan_root: Path, classified_mounts: List[dict], quiet: bool):
if quiet:
return [m for m in classified_mounts if m["class"] == "critical"]
print()
print("DockerVault Backup Plan")
print("=======================")
print(f"{label}: {root_path.resolve()}")
if compose_files is not None:
print(f"Compose files found: {len(compose_files)}")
print(f"Scan root: {scan_root}")
print()
include = []
review = []
skip = []
for m in classified_mounts:
cls = m["class"]
if cls == "critical":
include.append(m)
elif cls == "review":
review.append(m)
else:
skip.append(m)
print("INCLUDE PATHS:")
if include_entries:
for entry in include_entries:
print(entry_to_line(entry))
if include:
for m in include:
print(
f" - {m['source']} "
f"[{m['class']}] "
f"service={m['service']} "
f"target={m['target']} "
f"(exists={m['exists']})"
)
else:
print(" - (none)")
print()
print(" (none)")
if missing_include:
print("WARNING: Missing critical paths detected")
for entry in missing_include:
print(f" - {entry.get('path')} (service={entry.get('service')})")
print()
print("REVIEW PATHS:")
if review_entries:
for entry in review_entries:
print(entry_to_line(entry))
if review:
for m in review:
print(
f" - {m['source']} "
f"[{m['class']}] "
f"service={m['service']} "
f"target={m['target']} "
f"(exists={m['exists']})"
)
else:
print(" - (none)")
print(" (none)")
print()
print("SKIP PATHS:")
if skip_entries:
for entry in skip_entries:
print(entry_to_line(entry))
if skip:
for m in skip:
print(
f" - {m['source']} "
f"[{m['class']}] "
f"service={m['service']} "
f"target={m['target']} "
f"(exists={m['exists']})"
)
else:
print(" - (none)")
print(" (none)")
return include
def print_borg_command(include: List[dict], repo: str, quiet: bool):
if not repo:
return
valid_paths = sorted({
m["source"] for m in include if m["exists"]
})
if not valid_paths:
if not quiet:
print()
print("No valid paths for borg backup")
print("Reason: all critical paths are missing (exists=False)")
return
hostname = socket.gethostname()
if quiet:
print(" ".join(valid_paths))
return
print()
print("Suggested borg create command")
print("=============================")
print("borg create --stats --progress \\")
print(f" {repo}::{{hostname}}-{{now:%Y-%m-%d_%H-%M}} \\")
for p in valid_paths:
print(f" {p} \\")
def print_automation_output(
raw_entries: Any,
root_path: Path,
repo: str | None = None,
compose_files: list[Path] | None = None,
) -> None:
include_entries, review_entries, skip_entries = classify_entries(raw_entries)
def build_parser():
parser = argparse.ArgumentParser()
include_entries = dedupe_entries(include_entries)
review_entries = dedupe_entries(review_entries)
skip_entries = dedupe_entries(skip_entries)
subparsers = parser.add_subparsers(dest="command")
include_paths = extract_paths(include_entries)
missing_include = find_missing_entries(include_entries)
scan = subparsers.add_parser("scan")
payload: dict[str, Any] = {
"root": str(root_path.resolve()),
"include_paths": include_paths,
"review_paths": extract_paths(review_entries),
"skip_paths": extract_paths(skip_entries),
"missing_critical_paths": [str(entry["path"]) for entry in missing_include if entry.get("path")],
}
scan.add_argument("path")
scan.add_argument("--repo")
scan.add_argument("--max-depth", type=int, default=None)
scan.add_argument("--exclude", action="append", default=[])
if compose_files is not None:
payload["compose_files"] = [str(path.resolve()) for path in compose_files]
if repo:
archive_name = default_archive_name()
payload["repo"] = repo
payload["archive_name"] = archive_name
payload["borg_command"] = build_borg_command(repo, archive_name, include_paths)
print(json.dumps(payload, indent=2))
def run_borg_command(cmd: list[str], dry_run: bool = False, quiet: bool = False) -> int:
if not cmd:
LOGGER.error("No Borg command to run")
return 1
if dry_run:
if not quiet:
print("Dry run - Borg command:")
print(" ".join(shlex.quote(part) for part in cmd))
return 0
if not ensure_borg_available():
return 1
if not quiet:
print("Running Borg command:")
print(" ".join(shlex.quote(part) for part in cmd))
result = subprocess.run(cmd, check=False)
return result.returncode
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="dockervault")
parser.add_argument("--repo")
parser.add_argument("--run-borg", action="store_true")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--automation", action="store_true")
parser.add_argument("--quiet", action="store_true")
parser.add_argument("--verbose", action="store_true")
parser.add_argument("--version", action="version", version=__version__)
subparsers = parser.add_subparsers(dest="command", required=True)
plan_parser = subparsers.add_parser("plan")
plan_parser.add_argument("path")
scan_parser = subparsers.add_parser("scan")
scan_parser.add_argument("path")
scan_parser.add_argument(
"--max-depth",
type=non_negative_int,
default=None,
help="Maximum directory depth to scan",
)
scan_parser.add_argument(
"--exclude",
action="append",
default=[],
help="Additional directory name to exclude (can be used multiple times)",
)
scan.add_argument("--quiet", action="store_true")
scan.add_argument("--automation", action="store_true")
return parser
def main() -> int:
def main():
parser = build_parser()
args = parser.parse_args()
setup_logging(args.verbose)
if args.command == "scan":
root = Path(args.path)
scan_root = Path(args.path).resolve()
compose_files, entries = scan_projects(
root,
if not scan_root.exists():
if not args.quiet:
print(f"ERROR: Path does not exist: {scan_root}")
return 2
compose_files = discover_compose_files(
root=scan_root,
max_depth=args.max_depth,
excludes=args.exclude,
)
print_human_plan(entries, "Scan root", root, compose_files)
with ThreadPoolExecutor() as executor:
results = list(executor.map(analyse_compose_file, compose_files))
classified_mounts = []
for r in results:
for m in r["mounts"]:
classified = classify_mount(m)
compose_dir = r["compose"].parent
source_path = (compose_dir / classified["source"]).resolve()
classified["source"] = str(source_path)
classified["exists"] = source_path.exists()
classified_mounts.append(classified)
missing_critical = [
m for m in classified_mounts
if m["class"] == "critical" and not m["exists"]
]
if missing_critical and not args.quiet:
print()
print("WARNING: Missing critical paths detected")
for m in missing_critical:
print(f" - {m['source']} (service={m['service']})")
print()
include = print_plan(scan_root, classified_mounts, args.quiet)
print_borg_command(include, args.repo, args.quiet)
if missing_critical:
return 1
return 0
if args.command == "plan":
root = Path(args.path)
entries = normalize_entries(classify_compose(root))
print_human_plan(entries, "Compose file", root, [root])
return 0
return 0
return 1
if __name__ == "__main__":

View file

@ -1,28 +1,8 @@
from __future__ import annotations
import os
from pathlib import Path
from typing import Iterable
DEFAULT_SCAN_EXCLUDES = {
".git",
".hg",
".svn",
".venv",
"venv",
"env",
"node_modules",
"__pycache__",
".pytest_cache",
".mypy_cache",
".tox",
".cache",
".idea",
".vscode",
}
from typing import List, Optional
# Kendte compose filnavne
COMPOSE_FILENAMES = {
"docker-compose.yml",
"docker-compose.yaml",
@ -31,34 +11,66 @@ COMPOSE_FILENAMES = {
}
def find_compose_files(
root: Path | str,
excludes: Iterable[str] | None = None,
max_depth: int | None = None,
) -> list[Path]:
root_path = Path(root).resolve()
root_depth = len(root_path.parts)
# Default mapper vi altid ignorerer (støj + performance)
DEFAULT_EXCLUDES = {
".git",
".venv",
"node_modules",
"__pycache__",
}
exclude_set = set(DEFAULT_SCAN_EXCLUDES)
if excludes:
exclude_set.update(x.strip() for x in excludes if x and x.strip())
found: set[Path] = set()
def should_exclude(path: Path, excludes: Optional[List[str]]) -> bool:
"""
Returnerer True hvis path skal ignoreres.
Matcher path-dele (ikke substring).
"""
parts = set(path.parts)
combined = set(excludes or []) | DEFAULT_EXCLUDES
return any(ex in parts for ex in combined)
for current_root, dirnames, filenames in os.walk(root_path, topdown=True):
current_path = Path(current_root)
current_depth = len(current_path.parts) - root_depth
if max_depth is not None and current_depth >= max_depth:
dirnames[:] = []
def discover_compose_files(
root: Path,
max_depth: Optional[int] = None,
excludes: Optional[List[str]] = None,
) -> List[Path]:
"""
Finder docker-compose filer i et directory tree.
dirnames[:] = sorted(
d for d in dirnames
if d not in exclude_set
)
Args:
root: start directory
max_depth: max dybde (0 = kun root)
excludes: liste af directory navne der skal ignoreres
for filename in filenames:
if filename in COMPOSE_FILENAMES:
found.add((current_path / filename).resolve())
Returns:
Liste af fundne compose filer (sorteret)
"""
root = root.resolve()
results: List[Path] = []
return sorted(found)
def walk(current: Path, depth: int):
# Stop hvis vi er for dybt
if max_depth is not None and depth > max_depth:
return
try:
for entry in current.iterdir():
# 🔥 vigtig: skip før traversal (performance + korrekthed)
if should_exclude(entry, excludes):
continue
if entry.is_dir():
walk(entry, depth + 1)
elif entry.is_file() and entry.name in COMPOSE_FILENAMES:
results.append(entry)
except PermissionError:
# Ignorer mapper vi ikke har adgang til
pass
walk(root, depth=0)
# Stabil rækkefølge (vigtigt for tests og CLI output)
return sorted(results)

View file

@ -0,0 +1,65 @@
from pathlib import Path
from dockervault.discovery import discover_compose_files
def write_compose(base: Path, rel: str):
path = base / rel
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("services: {}\n")
return path
def test_basic_discovery(tmp_path: Path):
write_compose(tmp_path, "app1/docker-compose.yml")
write_compose(tmp_path, "app2/docker-compose.yml")
results = discover_compose_files(tmp_path)
assert len(results) == 2
def test_max_depth(tmp_path: Path):
write_compose(tmp_path, "root.yml")
write_compose(tmp_path, "a/docker-compose.yml")
write_compose(tmp_path, "a/b/docker-compose.yml")
results = discover_compose_files(tmp_path, max_depth=1)
paths = [str(p) for p in results]
assert any("a/docker-compose.yml" in p for p in paths)
assert not any("a/b/docker-compose.yml" in p for p in paths)
def test_exclude_directory(tmp_path: Path):
write_compose(tmp_path, "app1/docker-compose.yml")
write_compose(tmp_path, "app2/docker-compose.yml")
results = discover_compose_files(tmp_path, excludes=["app2"])
paths = [str(p) for p in results]
assert any("app1" in p for p in paths)
assert not any("app2" in p for p in paths)
def test_default_excludes(tmp_path: Path):
write_compose(tmp_path, ".git/test/docker-compose.yml")
write_compose(tmp_path, ".venv/test/docker-compose.yml")
write_compose(tmp_path, "node_modules/test/docker-compose.yml")
write_compose(tmp_path, "app/docker-compose.yml")
results = discover_compose_files(tmp_path)
paths = [str(p) for p in results]
assert len(paths) == 1
assert "app/docker-compose.yml" in paths[0]
def test_exclude_prevents_traversal(tmp_path: Path):
write_compose(tmp_path, "skipme/a/docker-compose.yml")
results = discover_compose_files(tmp_path, excludes=["skipme"])
assert len(results) == 0