feat: add borg path validation, mkdir apply, and fail-on-missing

This commit is contained in:
Eddie Nielsen 2026-03-22 13:53:26 +00:00
parent b81b29e674
commit f6b0521c34
3 changed files with 388 additions and 237 deletions

35
docker-compose.yml Normal file
View file

@ -0,0 +1,35 @@
version: "3.9"
services:
db:
image: mariadb:10.11
container_name: dv-db
restart: unless-stopped
environment:
MYSQL_ROOT_PASSWORD: example
MYSQL_DATABASE: testdb
MYSQL_USER: test
MYSQL_PASSWORD: test
volumes:
- ./db:/var/lib/mysql
mc:
image: itzg/minecraft-server:latest
container_name: dv-mc
restart: unless-stopped
environment:
EULA: "TRUE"
MEMORY: "1G"
ports:
- "25565:25565"
volumes:
- ./mc-missing:/data # <-- med vilje mangler denne
nginx:
image: nginx:latest
container_name: dv-nginx
restart: unless-stopped
ports:
- "8080:80"
volumes:
- ./logs:/var/log/nginx

291
dockervault/classifier.py Normal file
View file

@ -0,0 +1,291 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
import yaml
CRITICAL_TARGET_PATTERNS = (
"/var/lib/mysql",
"/var/lib/postgresql",
"/var/lib/postgres",
"/var/lib/mariadb",
"/data",
"/config",
"/var/www",
"/srv",
"/app/data",
"/bitnami",
"/var/opt",
"/var/lib/redis",
"/redis",
"/var/lib/mongodb",
"/mongodb",
)
OPTIONAL_TARGET_PATTERNS = (
"/var/log",
"/logs",
"/log",
"/tmp",
"/cache",
"/var/cache",
"/run",
)
OPTIONAL_SOURCE_PATTERNS = (
"logs",
"log",
"cache",
"tmp",
"temp",
)
SKIP_TARGET_PATTERNS = (
"/dev",
"/proc",
"/sys",
"/run",
"/tmp",
)
SKIP_SOURCE_PATTERNS = (
"/var/run/docker.sock",
"docker.sock",
)
def load_compose(compose_file: Path) -> dict[str, Any]:
with compose_file.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
if not isinstance(data, dict):
raise ValueError("Compose file did not parse into a dictionary")
return data
def is_bind_mount(volume: Any) -> bool:
if isinstance(volume, str):
return ":" in volume
if isinstance(volume, dict):
return volume.get("type") == "bind"
return False
def parse_volume_entry(
volume: Any,
compose_file: Path,
) -> dict[str, str] | None:
project_root = compose_file.parent.resolve()
if isinstance(volume, str):
parts = volume.split(":")
if len(parts) < 2:
return None
source_raw = parts[0].strip()
target = parts[1].strip()
if not source_raw or not target:
return None
# Named volumes should not be treated as bind mounts
if not source_raw.startswith(("/", ".", "~")):
return None
source = resolve_source_path(source_raw, project_root)
return {
"source": str(source),
"target": target,
}
if isinstance(volume, dict):
if volume.get("type") != "bind":
return None
source_raw = str(volume.get("source", "")).strip()
target = str(volume.get("target", "")).strip()
if not source_raw or not target:
return None
source = resolve_source_path(source_raw, project_root)
return {
"source": str(source),
"target": target,
}
return None
def resolve_source_path(source_raw: str, project_root: Path) -> Path:
source_path = Path(source_raw).expanduser()
if not source_path.is_absolute():
source_path = (project_root / source_path).resolve()
else:
source_path = source_path.resolve()
return source_path
def classify_mount(
service_name: str,
source: str,
target: str,
) -> tuple[str, str, str]:
source_lower = source.lower()
target_lower = target.lower()
for pattern in SKIP_SOURCE_PATTERNS:
if pattern in source_lower:
return "skip", "optional", "docker runtime socket"
for pattern in SKIP_TARGET_PATTERNS:
if target_lower == pattern or target_lower.startswith(pattern + "/"):
return "skip", "optional", "runtime/system path"
for pattern in CRITICAL_TARGET_PATTERNS:
if target_lower == pattern or target_lower.startswith(pattern + "/"):
return "include", "critical", "persistent app data"
for pattern in OPTIONAL_TARGET_PATTERNS:
if target_lower == pattern or target_lower.startswith(pattern + "/"):
return "skip", "optional", "logs/cache/temp path"
source_name = Path(source).name.lower()
for pattern in OPTIONAL_SOURCE_PATTERNS:
if pattern in source_name:
return "skip", "optional", "logs/cache/temp source"
return "review", "medium", "unknown bind mount"
def classify_service_mounts(
service_name: str,
service_data: dict[str, Any],
compose_file: Path,
) -> list[dict[str, str]]:
results: list[dict[str, str]] = []
volumes = service_data.get("volumes", [])
if not isinstance(volumes, list):
return results
for volume in volumes:
if not is_bind_mount(volume):
continue
parsed = parse_volume_entry(volume, compose_file)
if not parsed:
continue
bucket, priority, reason = classify_mount(
service_name=service_name,
source=parsed["source"],
target=parsed["target"],
)
results.append(
{
"bucket": bucket,
"priority": priority,
"reason": reason,
"service": service_name,
"source": parsed["source"],
"target": parsed["target"],
}
)
return results
def deduplicate_items(items: list[dict[str, str]]) -> list[dict[str, str]]:
seen: set[tuple[str, str, str, str]] = set()
deduped: list[dict[str, str]] = []
for item in items:
key = (
item["service"],
item["source"],
item["target"],
item["bucket"],
)
if key in seen:
continue
seen.add(key)
deduped.append(item)
return deduped
def sort_items(items: list[dict[str, str]]) -> list[dict[str, str]]:
priority_order = {
"critical": 0,
"high": 1,
"medium": 2,
"low": 3,
"optional": 4,
}
return sorted(
items,
key=lambda item: (
priority_order.get(item["priority"], 99),
item["service"],
item["source"],
item["target"],
),
)
def classify_compose(compose_file: str | Path) -> dict[str, Any]:
compose_path = Path(compose_file).resolve()
data = load_compose(compose_path)
services = data.get("services", {})
if not isinstance(services, dict):
raise ValueError("Compose file does not contain a valid 'services' section")
all_items: list[dict[str, str]] = []
for service_name, service_data in services.items():
if not isinstance(service_data, dict):
continue
all_items.extend(
classify_service_mounts(
service_name=service_name,
service_data=service_data,
compose_file=compose_path,
)
)
all_items = deduplicate_items(all_items)
all_items = sort_items(all_items)
include = [strip_bucket(item) for item in all_items if item["bucket"] == "include"]
review = [strip_bucket(item) for item in all_items if item["bucket"] == "review"]
skip = [strip_bucket(item) for item in all_items if item["bucket"] == "skip"]
return {
"include": include,
"review": review,
"skip": skip,
}
def strip_bucket(item: dict[str, str]) -> dict[str, str]:
return {
"service": item["service"],
"source": item["source"],
"target": item["target"],
"priority": item["priority"],
"reason": item["reason"],
}

View file

@ -15,6 +15,16 @@ def check_path_exists(path: str) -> bool:
return Path(path).exists()
def create_missing_paths(paths: list[str]) -> list[str]:
created = []
for path in sorted(set(paths)):
p = Path(path)
if not p.exists():
p.mkdir(parents=True, exist_ok=True)
created.append(str(p))
return created
def build_mkdir_suggestion(paths: list[str]) -> str:
unique_paths = sorted(set(paths))
lines = ["mkdir -p \\"]
@ -37,11 +47,7 @@ def render_borg_archive(template: str, project: str, compose_path: Path) -> str:
)
def build_borg_command(
repo: str,
archive_name: str,
include_paths: list[str],
) -> str:
def build_borg_command(repo: str, archive_name: str, include_paths: list[str]) -> str:
lines = [
"borg create --stats --progress \\",
f" {repo}::{archive_name} \\",
@ -54,91 +60,6 @@ def build_borg_command(
return "\n".join(lines)
def plan_to_json_dict(
compose_file: Path,
project_root: Path,
plan: dict[str, Any],
borg_repo: str | None = None,
borg_archive: str | None = None,
borg_command: str | None = None,
missing_include: list[dict[str, Any]] | None = None,
missing_review: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
return {
"compose_file": str(compose_file.resolve()),
"project_root": str(project_root.resolve()),
"include": plan.get("include", []),
"review": plan.get("review", []),
"skip": plan.get("skip", []),
"missing": {
"include": missing_include or [],
"review": missing_review or [],
},
"borg": {
"repo": borg_repo,
"archive": borg_archive,
"command": borg_command,
}
if borg_repo or borg_archive or borg_command
else None,
}
def print_human_summary(compose_file: Path, project_root: Path, plan: dict[str, Any]) -> None:
print("DockerVault Backup Plan")
print("=======================")
print(f"Compose file: {compose_file.resolve()}")
print(f"Project root: {project_root.resolve()}")
print()
print("INCLUDE PATHS:")
include = plan.get("include", [])
if include:
for item in include:
exists = check_path_exists(item["source"])
status = "✔ exists" if exists else "❌ missing"
print(
f" - {item['source']} "
f"[{item['priority']}] {status} "
f"service={item['service']} target={item['target']}"
)
else:
print(" - (none)")
print()
print("REVIEW PATHS:")
review = plan.get("review", [])
if review:
for item in review:
exists = check_path_exists(item["source"])
status = "✔ exists" if exists else "❌ missing"
print(
f" - {item['source']} "
f"[{item['priority']}] {status} "
f"service={item['service']} target={item['target']}"
)
else:
print(" - (none)")
print()
print("SKIP PATHS:")
skip = plan.get("skip", [])
if skip:
for item in skip:
exists = check_path_exists(item["source"])
status = "✔ exists" if exists else "❌ missing"
print(
f" - {item['source']} "
f"[{item['priority']}] {status} "
f"service={item['service']} target={item['target']}"
)
else:
print(" - (none)")
def find_missing_paths(plan: dict[str, Any]) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
missing_include = [
item for item in plan.get("include", [])
@ -153,179 +74,83 @@ def find_missing_paths(plan: dict[str, Any]) -> tuple[list[dict[str, Any]], list
return missing_include, missing_review
def print_missing_paths_report(
missing_include: list[dict[str, Any]],
missing_review: list[dict[str, Any]],
) -> None:
all_missing = missing_include + missing_review
if not all_missing:
return
def print_human_summary(compose_file: Path, project_root: Path, plan: dict[str, Any]) -> None:
print("DockerVault Backup Plan")
print("=======================")
print(f"Compose file: {compose_file.resolve()}")
print(f"Project root: {project_root.resolve()}")
print()
print("WARNING: Missing paths detected:")
for item in all_missing:
bucket = "include" if item in missing_include else "review"
print(f" - {item['source']} (service={item['service']}, bucket={bucket})")
mkdir_paths = [item["source"] for item in all_missing]
for section in ["include", "review", "skip"]:
print(f"{section.upper()} PATHS:")
items = plan.get(section, [])
if items:
for item in items:
exists = check_path_exists(item["source"])
status = "✔ exists" if exists else "❌ missing"
print(
f" - {item['source']} "
f"[{item['priority']}] {status} "
f"service={item['service']} target={item['target']}"
)
else:
print(" - (none)")
print()
print("Suggested fix for missing paths")
print("================================")
print(build_mkdir_suggestion(mkdir_paths))
def main() -> None:
parser = argparse.ArgumentParser(
description="DockerVault - intelligent Docker backup discovery"
)
parser = argparse.ArgumentParser(description="DockerVault")
parser.add_argument(
"compose_file",
nargs="?",
default="docker-compose.yml",
help="Path to docker-compose.yml",
)
parser.add_argument(
"--summary-only",
action="store_true",
help="Print human-readable summary only",
)
parser.add_argument(
"--json",
action="store_true",
help="Print backup plan as JSON",
)
parser.add_argument(
"--borg",
action="store_true",
help="Generate borg backup command output",
)
parser.add_argument(
"--borg-json",
action="store_true",
help="Print borg-related output as JSON",
)
parser.add_argument(
"--borg-repo",
default="/backup-repo",
help="Borg repository path or URI (default: /backup-repo)",
)
parser.add_argument(
"--borg-archive",
default="{hostname}-{now:%Y-%m-%d_%H-%M}",
help=(
"Archive naming template. Supported fields: "
"{hostname}, {project}, {compose_stem}, {now:...}"
),
)
parser.add_argument(
"--fail-on-missing",
action="store_true",
help="Exit with status 2 if include/review paths are missing",
)
parser.add_argument("compose_file", nargs="?", default="docker-compose.yml")
parser.add_argument("--borg", action="store_true")
parser.add_argument("--borg-repo", default="/backup-repo")
parser.add_argument("--borg-archive", default="{hostname}-{now:%Y-%m-%d_%H-%M}")
parser.add_argument("--fail-on-missing", action="store_true")
parser.add_argument("--apply-mkdir", action="store_true")
args = parser.parse_args()
compose_file = Path(args.compose_file).resolve()
if not compose_file.exists():
raise SystemExit(f"Compose file not found: {compose_file}")
project_root = compose_file.parent
project_name = project_root.name or compose_file.stem
project_name = project_root.name
plan = classify_compose(compose_file)
missing_include, missing_review = find_missing_paths(plan)
all_missing = missing_include + missing_review
if args.json:
print(
json.dumps(
plan_to_json_dict(
compose_file,
project_root,
plan,
missing_include=missing_include,
missing_review=missing_review,
),
indent=2,
)
)
if args.fail_on_missing and all_missing:
sys.exit(2)
return
if args.borg or args.borg_json:
include_items = plan.get("include", [])
include_paths = [item["source"] for item in include_items]
try:
archive_name = render_borg_archive(
args.borg_archive,
project_name,
compose_file,
)
except KeyError as exc:
raise SystemExit(
f"Invalid borg archive template field: {exc}. "
"Allowed: hostname, project, compose_stem, now"
) from exc
borg_command = build_borg_command(
repo=args.borg_repo,
archive_name=archive_name,
include_paths=include_paths,
)
if args.borg_json:
print(
json.dumps(
plan_to_json_dict(
compose_file,
project_root,
plan,
borg_repo=args.borg_repo,
borg_archive=archive_name,
borg_command=borg_command,
missing_include=missing_include,
missing_review=missing_review,
),
indent=2,
)
)
if args.fail_on_missing and all_missing:
sys.exit(2)
return
print_human_summary(compose_file, project_root, plan)
print_missing_paths_report(missing_include, missing_review)
if all_missing:
print("WARNING: Missing paths detected:")
for item in all_missing:
print(f" - {item['source']} ({item['service']})")
paths = [item["source"] for item in all_missing]
if args.apply_mkdir:
created = create_missing_paths(paths)
print()
print("Suggested borg create command")
print("=============================")
print(borg_command)
print("Created missing paths:")
for p in created:
print(f" - {p}")
else:
print()
print("Suggested fix:")
print(build_mkdir_suggestion(paths))
if args.borg:
archive = render_borg_archive(args.borg_archive, project_name, compose_file)
include_paths = [item["source"] for item in plan.get("include", [])]
print()
print("Suggested borg command:")
print(build_borg_command(args.borg_repo, archive, include_paths))
if args.fail_on_missing and all_missing:
print()
print("ERROR: Failing because include/review paths are missing.")
sys.exit(2)
return
print_human_summary(compose_file, project_root, plan)
print_missing_paths_report(missing_include, missing_review)
if args.fail_on_missing and all_missing:
print()
print("ERROR: Failing because include/review paths are missing.")
print("ERROR: Missing required paths")
sys.exit(2)