feat: add borg backup support and classification improvements

This commit is contained in:
Eddie Nielsen 2026-03-23 14:46:33 +00:00
parent 483e2720f1
commit e5ef50a74a
15 changed files with 1293 additions and 649 deletions

367
README.md
View file

@ -1,5 +1,5 @@
<p align="center">
<img src="images/dockervault-logo.png" width="600">
<img src="images/dockervault_logo.png" width="600">
</p>
# DockerVault
@ -15,172 +15,262 @@ No guesswork. No forgotten volumes. No broken restores.
## 📚 Contents
* [🚀 What is DockerVault?](#-what-is-dockervault)
* [🧠 Why DockerVault?](#-why-dockervault)
* [⚡ Quick Start](#-quick-start)
* [🧠 How it Works](#-how-it-works)
* [🗂 Classification Model](#-classification-model)
* [💾 Borg Integration](#-borg-integration)
* [🤖 Automation Mode](#-automation-mode)
* [🔢 Exit Codes](#-exit-codes)
* [🛠 Tech Stack](#-tech-stack)
* [🔍 Example](#-example)
* [🧱 Current Features](#-current-features)
* [🔥 Roadmap](#-roadmap)
* [🧠 Philosophy](#-philosophy)
* [📜 License](#-license)
* [🤝 Contributing](#-contributing)
* [❤️ Credits](#-credits)
---
## 🚀 What is DockerVault?
DockerVault is a CLI tool that:
DockerVault analyzes your `docker-compose.yml` and identifies:
* Scans Docker Compose projects
* Parses services, volumes, env files
* Identifies **real data vs noise**
* Builds a structured backup understanding
* What **must** be backed up
* What can be **ignored**
* What needs **human review**
Built for people running real systems — not toy setups.
It bridges the gap between:
---
## 🧠 Why DockerVault?
Most backup setups fail because:
* You forget a volume
* You miss an `.env` file
* You back up cache instead of data
* You dont know what actually matters
DockerVault solves this by **thinking like an operator**.
👉 “everything looks fine”
and
👉 “restore just failed”
---
## ⚡ Quick Start
```bash
git clone https://git.lanx.dk/ed/dockervault.git
git clone https://github.com/YOUR-USER/dockervault.git
cd dockervault
python3 -m venv .venv
source .venv/bin/activate
pip install -e .
dockervault scan /path/to/docker
```
Run analysis:
```bash
python -m dockervault.cli docker-compose.yml --borg --repo /backup-repo
```
Run backup:
```bash
python -m dockervault.cli docker-compose.yml \
--run-borg \
--repo /backup-repo
```
---
## 🧠 How it Works
DockerVault parses your compose file and inspects:
* bind mounts
* volume targets
* known data paths
It then classifies them using heuristics:
* database paths → critical
* logs/cache → optional
* unknown → review
---
## 🗂 Classification Model
DockerVault divides everything into three categories:
### ✅ INCLUDE
Must be backed up.
Example:
```
/var/lib/mysql
/data
/config
```
### ⚠️ REVIEW
Needs human decision.
Triggered when:
* path does not exist
* path exists but is empty
* named volumes (Docker-managed)
Example:
```
./mc-missing → /data
```
### ❌ SKIP
Safe to ignore.
Example:
```
/var/log
/tmp
/cache
```
---
## 💾 Borg Integration
DockerVault can generate and run Borg backups directly.
Example:
```bash
python -m dockervault.cli docker-compose.yml \
--run-borg \
--repo /mnt/backups/borg/dockervault
```
Generated command:
```bash
borg create --stats --progress \
/repo::hostname-2026-03-23_12-44-19 \
/path/to/data
```
### Features
* automatic archive naming (with seconds precision)
* deduplicated paths
* safe command generation
* subprocess execution
* optional passphrase support
---
## 🤖 Automation Mode
Designed for cron / scripts / servers.
```bash
python -m dockervault.cli docker-compose.yml \
--run-borg \
--quiet \
--automation \
--repo /backup-repo
```
### Behavior
* no plan output
* no interactive prompts
* minimal output
* suitable for logs / CI
---
## 🔢 Exit Codes
| Code | Meaning |
| ---- | ------------------------------------ |
| 0 | Success |
| 1 | General error |
| 2 | Missing required args |
| 3 | No include paths |
| 4 | Review required (`--fail-on-review`) |
### Fail on review
```bash
--fail-on-review
```
Stops automation if something needs human attention.
---
## 🛠 Tech Stack
DockerVault is built using simple, reliable components:
* **Python 3.10+** core language
* **PyYAML** parsing Docker Compose files
* **argparse** CLI interface
* **pip / venv** environment management
---
### 🔧 Designed for
* Linux systems (Ubuntu, Debian, Unraid environments)
* Docker Compose based setups
* CLI-first workflows
* Python 3.10+
* PyYAML
* BorgBackup
* CLI-first design
---
## 🔍 Example
```bash
dockervault scan ~/test-docker --json
Input:
```yaml
services:
db:
volumes:
- ./db:/var/lib/mysql
mc:
volumes:
- ./mc-missing:/data
nginx:
volumes:
- ./logs:/var/log/nginx
```
```json
[
{
"name": "app2",
"services": [
{
"name": "app",
"image": "ghcr.io/example/app:latest",
"env_files": [".env"],
"mounts": [
"./data:/app/data",
"./config:/app/config"
]
}
],
"named_volumes": ["app_cache"]
}
]
Output:
```
INCLUDE:
db
REVIEW:
mc-missing
SKIP:
logs
```
---
## 🧱 Current Features
* CLI interface
* Recursive project scanning
* Docker Compose parsing (YAML)
* Service detection
* Volume + bind mount detection
* Environment file detection
* Docker Compose parsing
* Bind mount detection
* Intelligent classification
* Borg backup integration
* Automation mode
* Exit codes for scripting
* Safe path handling
* Deduplication
---
## 🔥 Roadmap
### ✅ Phase 1 Discovery
* [x] CLI
* [x] Scan command
* [x] YAML parsing
---
### 🚧 Phase 2 Intelligence
* [ ] Classify mounts (data / config / cache)
* [ ] Detect backup candidates
* [ ] Generate backup plan
---
### 🔜 Phase 3 Storage
* [ ] SQLite inventory
* [ ] Historical tracking
* [ ] Change detection
---
### 🔜 Phase 4 Execution
* [ ] Borg integration
* [ ] Backup automation
* [ ] Named volume inspection (`docker volume inspect`)
* [ ] Docker API integration
* [ ] Multiple compose files support
* [ ] Email / ntfy notifications
* [ ] Web interface
* [ ] Backup history tracking
* [ ] Restore validation
---
### 🔔 Phase 5 Notifications & Monitoring
* [ ] Email notifications
* [ ] ntfy.sh integration
* [ ] Webhook support
* [ ] Alerts on:
* missing backups
* new volumes
* changed data paths
* [ ] Daily/weekly reports
---
### 🧠 Future Ideas
* [ ] Auto-detect Docker hosts on network
* [ ] Multi-node backup coordination (Lanx-style)
* [ ] Backup simulation ("what would be backed up?")
* [ ] Restore dry-run validation
* [ ] Tagging system (critical / optional / ignore)
* [ ] Scheduling integration
---
@ -188,43 +278,30 @@ dockervault scan ~/test-docker --json
DockerVault is built on a simple idea:
> Backups should be **correct by default**
> Backups should reflect reality — not assumptions.
Not configurable chaos.
* No blind backups
* No hidden data
* No silent failures
Not guesswork.
But **system understanding**.
Just clarity.
---
## 📜 License
This project is licensed under the **GNU General Public License v3.0 (GPL-3.0)**.
GNU GPLv3
You are free to:
* Use the software
* Study and modify it
* Share and redistribute it
Under the condition that:
* Any derivative work must also be licensed under GPL-3.0
* Source code must be made available when distributed
See the full license in the [LICENSE](LICENSE) file.
This project is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License v3.
---
## 🤝 Contributing
## ❤️ Credits
Created by **Ed & NodeFox 🦊**
Built with ❤️ for Lanx
Maintained by Eddie Nielsen
Feel free to contribute, suggest improvements or fork the project.
---
<p align="center">
Built with ❤️ for Lanx by NodeFox 🦊
Maintained by Eddie Nielsen & NodeFox 🦊
Feel free to contribute, suggest improvements or fork the project.
</p>

115
dockervault/borg.py Normal file
View file

@ -0,0 +1,115 @@
from __future__ import annotations
import os
import shlex
import socket
import subprocess
from datetime import datetime
from pathlib import Path
from typing import Iterable
def borg_env(passphrase: str | None = None) -> dict[str, str]:
env = os.environ.copy()
if passphrase:
env["BORG_PASSPHRASE"] = passphrase
return env
def build_archive_name(prefix: str | None = None) -> str:
"""
Build a borg archive name.
Default format:
hostname-YYYY-MM-DD_HH-MM-SS
With prefix:
prefix-hostname-YYYY-MM-DD_HH-MM-SS
"""
hostname = socket.gethostname()
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
if prefix:
return f"{prefix}-{hostname}-{timestamp}"
return f"{hostname}-{timestamp}"
def normalize_include_paths(include_paths: Iterable[str | Path]) -> list[str]:
normalized: list[str] = []
seen: set[str] = set()
for path in include_paths:
resolved = str(Path(path))
if resolved not in seen:
seen.add(resolved)
normalized.append(resolved)
return normalized
def build_borg_create_command(
repo: str,
include_paths: Iterable[str | Path],
archive_name: str | None = None,
stats: bool = True,
progress: bool = True,
) -> list[str]:
normalized_paths = normalize_include_paths(include_paths)
if not normalized_paths:
raise ValueError("No include paths provided for borg backup.")
if archive_name is None:
archive_name = build_archive_name()
command = ["borg", "create"]
if stats:
command.append("--stats")
if progress:
command.append("--progress")
command.append(f"{repo}::{archive_name}")
command.extend(normalized_paths)
return command
def command_to_shell(command: list[str]) -> str:
return " ".join(shlex.quote(part) for part in command)
def run_borg_create(
repo: str,
include_paths: Iterable[str | Path],
passphrase: str | None = None,
archive_name: str | None = None,
stats: bool = True,
progress: bool = True,
quiet: bool = False,
) -> int:
command = build_borg_create_command(
repo=repo,
include_paths=include_paths,
archive_name=archive_name,
stats=stats,
progress=progress,
)
stdout = subprocess.DEVNULL if quiet else None
stderr = subprocess.DEVNULL if quiet else None
result = subprocess.run(
command,
env=borg_env(passphrase),
stdout=stdout,
stderr=stderr,
check=False,
)
return result.returncode

View file

@ -0,0 +1,15 @@
from .engine import ClassificationEngine
from .models import (
Classification,
ClassificationResult,
MountCandidate,
RuleEvidence,
)
__all__ = [
"ClassificationEngine",
"Classification",
"ClassificationResult",
"MountCandidate",
"RuleEvidence",
]

View file

@ -0,0 +1,80 @@
DATABASE_PATH_KEYWORDS = [
"/var/lib/mysql",
"/var/lib/mariadb",
"/var/lib/postgresql",
"/var/lib/postgresql/data",
"/data/db",
]
CONFIG_PATH_KEYWORDS = [
"/config",
"/app/config",
"/settings",
"/etc",
]
DATA_PATH_KEYWORDS = [
"/data",
"/app/data",
"/srv/data",
"/var/lib",
]
EPHEMERAL_PATH_KEYWORDS = [
"/tmp",
"/var/tmp",
"/cache",
"/var/cache",
"/transcode",
"/run",
"/var/run",
]
LOG_PATH_KEYWORDS = [
"/logs",
"/log",
"/var/log",
]
DATABASE_IMAGE_HINTS = [
"mysql",
"mariadb",
"postgres",
"postgresql",
"mongo",
"mongodb",
"redis",
]
KNOWN_IMPORTANT_IMAGE_HINTS = [
"nextcloud",
"grafana",
"vaultwarden",
"gitea",
"portainer",
"paperless",
"immich",
"wordpress",
"nginx",
"traefik",
"minecraft",
"itzg",
]
MINECRAFT_IMAGE_HINTS = [
"minecraft",
"itzg",
]
MINECRAFT_CRITICAL_PATHS = [
"/data",
"/server",
"/minecraft",
]
MINECRAFT_IMPORTANT_PATHS = [
"/plugins",
"/config",
"/mods",
"/world",
]

View file

@ -0,0 +1,52 @@
from collections import defaultdict
from .models import ClassificationResult, Classification
from .rules import DEFAULT_RULES
from .utils import unique_preserve_order
class ClassificationEngine:
def __init__(self, rules=None):
self.rules = rules or DEFAULT_RULES
def classify(self, candidate):
scores = defaultdict(int)
reasons = []
tags = []
matched = []
for rule in self.rules:
results = rule(candidate)
for result in results:
scores[result.classification] += result.score
reasons.extend(result.reasons)
tags.extend(result.tags)
matched.append(result.rule_name)
if not scores:
return ClassificationResult(
candidate=candidate,
classification=Classification.UNKNOWN,
confidence=0.0,
score=0,
reasons=["No rules matched"],
tags=["unknown"],
matched_rules=[],
score_breakdown={},
)
classification, score = max(scores.items(), key=lambda item: item[1])
total_score = sum(scores.values())
confidence = score / total_score if total_score else 0.0
return ClassificationResult(
candidate=candidate,
classification=classification,
confidence=round(confidence, 2),
score=score,
reasons=reasons,
tags=unique_preserve_order(tags),
matched_rules=unique_preserve_order(matched),
score_breakdown={cls.value: value for cls, value in scores.items()},
)

View file

@ -0,0 +1,44 @@
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional
class Classification(str, Enum):
CRITICAL = "critical"
IMPORTANT = "important"
OPTIONAL = "optional"
EPHEMERAL = "ephemeral"
UNKNOWN = "unknown"
@dataclass
class MountCandidate:
service_name: str
image: str
source: str
target: str
mount_type: str
read_only: bool = False
env: Dict[str, str] = field(default_factory=dict)
compose_project: Optional[str] = None
@dataclass
class RuleEvidence:
rule_name: str
classification: Classification
score: int
reasons: List[str] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
@dataclass
class ClassificationResult:
candidate: MountCandidate
classification: Classification
confidence: float
score: int
reasons: List[str]
tags: List[str]
matched_rules: List[str]
score_breakdown: Dict[str, int] = field(default_factory=dict)

View file

@ -0,0 +1,73 @@
from typing import List
from .models import Classification, MountCandidate, RuleEvidence
from .defaults import *
from .utils import norm, path_contains, text_contains
def rule_minecraft(candidate: MountCandidate) -> List[RuleEvidence]:
image = norm(candidate.image)
target = norm(candidate.target)
if any(h in image for h in MINECRAFT_IMAGE_HINTS):
if any(p in target for p in MINECRAFT_CRITICAL_PATHS):
return [RuleEvidence("minecraft_critical", Classification.CRITICAL, 45,
[f"{candidate.target} looks like Minecraft world data"], ["minecraft"])]
if any(p in target for p in MINECRAFT_IMPORTANT_PATHS):
return [RuleEvidence("minecraft_important", Classification.IMPORTANT, 25,
[f"{candidate.target} looks like Minecraft config/plugins"], ["minecraft"])]
return []
def rule_database(candidate: MountCandidate) -> List[RuleEvidence]:
if path_contains(candidate.target, DATABASE_PATH_KEYWORDS):
return [RuleEvidence("db_path", Classification.CRITICAL, 40,
[f"{candidate.target} is database path"], ["database"])]
if text_contains(candidate.image, DATABASE_IMAGE_HINTS):
return [RuleEvidence("db_image", Classification.CRITICAL, 25,
[f"{candidate.image} looks like DB"], ["database"])]
return []
def rule_config(candidate: MountCandidate) -> List[RuleEvidence]:
if path_contains(candidate.target, CONFIG_PATH_KEYWORDS):
return [RuleEvidence("config", Classification.IMPORTANT, 20,
[f"{candidate.target} is config"], ["config"])]
return []
def rule_data(candidate: MountCandidate) -> List[RuleEvidence]:
if path_contains(candidate.target, DATA_PATH_KEYWORDS):
return [RuleEvidence("data", Classification.IMPORTANT, 20,
[f"{candidate.target} is data"], ["data"])]
return []
def rule_ephemeral(candidate: MountCandidate) -> List[RuleEvidence]:
if path_contains(candidate.target, EPHEMERAL_PATH_KEYWORDS):
return [RuleEvidence("ephemeral", Classification.EPHEMERAL, 35,
[f"{candidate.target} is temp/cache"], ["ephemeral"])]
return []
def rule_logs(candidate: MountCandidate) -> List[RuleEvidence]:
if path_contains(candidate.target, LOG_PATH_KEYWORDS):
return [RuleEvidence("logs", Classification.OPTIONAL, 15,
[f"{candidate.target} is logs"], ["logs"])]
return []
DEFAULT_RULES = [
rule_minecraft,
rule_database,
rule_config,
rule_data,
rule_ephemeral,
rule_logs,
]

View file

@ -0,0 +1,22 @@
def norm(value: str) -> str:
return (value or "").strip().lower()
def path_contains(target: str, keywords):
target = norm(target)
return any(k in target for k in keywords)
def text_contains(value: str, keywords):
value = norm(value)
return any(k in value for k in keywords)
def unique_preserve_order(values):
seen = set()
result = []
for v in values:
if v not in seen:
seen.add(v)
result.append(v)
return result

View file

@ -1,334 +1,292 @@
from __future__ import annotations
import argparse
import json
import shlex
import socket
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Any
from typing import Any, Iterable
from dockervault.classifier import classify_compose
from .borg import (
build_borg_create_command,
command_to_shell,
run_borg_create,
)
from .classifier import classify_compose
def check_path_exists(path: str) -> bool:
return Path(path).exists()
def _get_value(obj: Any, *names: str, default: Any = None) -> Any:
for name in names:
if isinstance(obj, dict) and name in obj:
return obj[name]
if hasattr(obj, name):
return getattr(obj, name)
return default
def create_missing_paths(paths: list[str]) -> list[str]:
created: list[str] = []
for path in sorted(set(paths)):
p = Path(path)
if not p.exists():
p.mkdir(parents=True, exist_ok=True)
created.append(str(p))
return created
def _normalize_entries(entries: Any) -> list[dict[str, Any]]:
if not entries:
return []
normalized: list[dict[str, Any]] = []
def build_mkdir_suggestion(paths: list[str]) -> str:
unique_paths = sorted(set(paths))
lines = ["mkdir -p \\"]
for index, path in enumerate(unique_paths):
suffix = " \\" if index < len(unique_paths) - 1 else ""
lines.append(f" {path}{suffix}")
return "\n".join(lines)
for entry in entries:
if isinstance(entry, dict):
normalized.append(
{
"source": (
entry.get("source")
or entry.get("path")
or entry.get("host_path")
or entry.get("src")
),
"service": entry.get("service"),
"target": (
entry.get("target")
or entry.get("mount_target")
or entry.get("container_path")
or entry.get("destination")
),
"classification": (
entry.get("classification")
or entry.get("priority")
or entry.get("category")
or entry.get("kind")
),
"reason": entry.get("reason"),
}
)
continue
def render_borg_archive(template: str, project: str, compose_path: Path) -> str:
now = datetime.now()
hostname = socket.gethostname()
compose_stem = compose_path.stem
return template.format(
hostname=hostname,
project=project,
compose_stem=compose_stem,
now=now,
normalized.append(
{
"source": _get_value(entry, "source", "path", "host_path", "src"),
"service": _get_value(entry, "service"),
"target": _get_value(
entry, "target", "mount_target", "container_path", "destination"
),
"classification": _get_value(
entry, "classification", "priority", "category", "kind"
),
"reason": _get_value(entry, "reason"),
}
)
def build_borg_command(repo: str, archive_name: str, include_paths: list[str]) -> str:
lines = [
"borg create --stats --progress \\",
f" {repo}::{archive_name} \\",
]
for index, path in enumerate(include_paths):
suffix = " \\" if index < len(include_paths) - 1 else ""
lines.append(f" {path}{suffix}")
return "\n".join(lines)
return normalized
def build_borg_argv(repo: str, archive_name: str, include_paths: list[str]) -> list[str]:
return [
"borg",
"create",
"--stats",
"--progress",
f"{repo}::{archive_name}",
*include_paths,
]
def find_missing_paths(
plan: dict[str, Any],
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
missing_include = [
item for item in plan.get("include", [])
if not check_path_exists(item["source"])
]
missing_review = [
item for item in plan.get("review", [])
if not check_path_exists(item["source"])
]
return missing_include, missing_review
def print_human_summary(compose_file: Path, project_root: Path, plan: dict[str, Any]) -> None:
print("DockerVault Backup Plan")
print("=======================")
print(f"Compose file: {compose_file.resolve()}")
print(f"Project root: {project_root.resolve()}")
print()
for section in ["include", "review", "skip"]:
print(f"{section.upper()} PATHS:")
items = plan.get(section, [])
if items:
for item in items:
exists = check_path_exists(item["source"])
status = "✔ exists" if exists else "❌ missing"
print(
f" - {item['source']} "
f"[{item['priority']}] {status} "
f"service={item['service']} target={item['target']}"
def _extract_plan_sections(
plan: Any,
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
include_entries = _normalize_entries(
_get_value(plan, "include", "include_paths", "includes", default=[])
)
else:
review_entries = _normalize_entries(
_get_value(plan, "review", "review_paths", "reviews", default=[])
)
skip_entries = _normalize_entries(
_get_value(plan, "skip", "skip_paths", "skips", default=[])
)
return include_entries, review_entries, skip_entries
def _entry_path(entry: dict[str, Any]) -> str:
return str(entry.get("source") or "(unknown)")
def _entry_label(entry: dict[str, Any]) -> str:
classification = entry.get("classification") or "unknown"
service = entry.get("service") or "unknown"
target = entry.get("target") or "unknown"
reason = entry.get("reason")
label = f"[{classification}] service={service} target={target}"
if reason:
label += f" reason={reason}"
return label
def _print_section(title: str, entries: Iterable[dict[str, Any]]) -> None:
entries = list(entries)
print(f"{title}:")
if not entries:
print(" - (none)")
print()
def print_missing_paths_report(
missing_include: list[dict[str, Any]],
missing_review: list[dict[str, Any]],
) -> None:
all_missing = missing_include + missing_review
if not all_missing:
return
print("WARNING: Missing paths detected:")
for item in all_missing:
bucket = "include" if item in missing_include else "review"
print(f" - {item['source']} (service={item['service']}, bucket={bucket})")
print()
for entry in entries:
print(f" - {_entry_path(entry):<40} {_entry_label(entry)}")
def print_created_paths(created_paths: list[str]) -> None:
if not created_paths:
return
def _collect_include_paths(include_entries: Iterable[dict[str, Any]]) -> list[str]:
paths: list[str] = []
seen: set[str] = set()
print("Created missing paths:")
for path in created_paths:
print(f" - {path}")
print()
for entry in include_entries:
path = _entry_path(entry)
if path == "(unknown)" or path in seen:
continue
seen.add(path)
paths.append(path)
return paths
def plan_to_json_dict(
compose_file: Path,
def _print_borg_plan(
compose_path: Path,
project_root: Path,
plan: dict[str, Any],
borg_repo: str | None = None,
borg_archive: str | None = None,
borg_command: str | None = None,
missing_include: list[dict[str, Any]] | None = None,
missing_review: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
return {
"compose_file": str(compose_file.resolve()),
"project_root": str(project_root.resolve()),
"include": plan.get("include", []),
"review": plan.get("review", []),
"skip": plan.get("skip", []),
"missing": {
"include": missing_include or [],
"review": missing_review or [],
},
"borg": {
"repo": borg_repo,
"archive": borg_archive,
"command": borg_command,
}
if borg_repo or borg_archive or borg_command
else None,
}
include_entries: list[dict[str, Any]],
review_entries: list[dict[str, Any]],
skip_entries: list[dict[str, Any]],
repo: str | None,
) -> None:
print()
print("Borg Backup Plan")
print("================")
print(f"Compose file: {compose_path}")
print(f"Project root: {project_root}")
print()
_print_section("INCLUDE PATHS", include_entries)
print()
_print_section("REVIEW PATHS", review_entries)
print()
_print_section("SKIP PATHS", skip_entries)
include_paths = _collect_include_paths(include_entries)
if repo and include_paths:
command = build_borg_create_command(
repo=repo,
include_paths=include_paths,
)
print()
print("Suggested borg create command")
print("=============================")
print(command_to_shell(command))
def main() -> None:
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="DockerVault - intelligent Docker backup discovery"
prog="dockervault",
description="DockerVault - intelligent Docker backup discovery",
)
parser.add_argument(
"compose_file",
nargs="?",
default="docker-compose.yml",
help="Path to docker-compose.yml",
)
parser.add_argument(
"--json",
action="store_true",
help="Print plan as JSON",
)
parser.add_argument("compose", help="Path to docker-compose.yml")
parser.add_argument(
"--borg",
action="store_true",
help="Show suggested borg create command",
help="Show borg backup plan and suggested command",
)
parser.add_argument(
"--run-borg",
action="store_true",
help="Execute borg create",
help="Run borg create using discovered include paths",
)
parser.add_argument(
"--borg-repo",
default="/backup-repo",
help="Borg repository path or URI (default: /backup-repo)",
"--repo",
help="Borg repository path, e.g. /mnt/backups/borg/dockervault",
)
parser.add_argument(
"--borg-archive",
default="{hostname}-{now:%Y-%m-%d_%H-%M}",
help=(
"Archive naming template. Supported fields: "
"{hostname}, {project}, {compose_stem}, {now:...}"
),
"--passphrase",
help="Optional borg passphrase",
)
parser.add_argument(
"--fail-on-missing",
"--quiet",
action="store_true",
help="Exit with status 2 if include/review paths are missing",
help="Suppress borg stdout/stderr output during execution",
)
parser.add_argument(
"--apply-mkdir",
"--automation",
action="store_true",
help="Create missing include/review paths",
help="Automation mode: minimal output, non-interactive behavior",
)
parser.add_argument(
"--fail-on-review",
action="store_true",
help="Exit with code 4 if review paths are present",
)
return parser
def main() -> None:
parser = build_parser()
args = parser.parse_args()
compose_file = Path(args.compose_file).resolve()
if not compose_file.exists():
raise SystemExit(f"Compose file not found: {compose_file}")
compose_path = Path(args.compose).expanduser().resolve()
project_root = compose_file.parent
project_name = project_root.name or compose_file.stem
plan = classify_compose(compose_file)
missing_include, missing_review = find_missing_paths(plan)
all_missing = missing_include + missing_review
created_paths: list[str] = []
if args.apply_mkdir and all_missing:
created_paths = create_missing_paths([item["source"] for item in all_missing])
missing_include, missing_review = find_missing_paths(plan)
all_missing = missing_include + missing_review
borg_command: str | None = None
borg_argv: list[str] | None = None
archive_name: str | None = None
if args.borg or args.run_borg:
include_paths = [item["source"] for item in plan.get("include", [])]
if not compose_path.exists():
print(f"Error: compose file not found: {compose_path}", file=sys.stderr)
sys.exit(1)
try:
archive_name = render_borg_archive(
args.borg_archive,
project_name,
compose_file,
)
except KeyError as exc:
raise SystemExit(
f"Invalid borg archive template field: {exc}. "
"Allowed: hostname, project, compose_stem, now"
) from exc
plan = classify_compose(compose_path)
except Exception as exc:
print(f"Error: failed to classify compose file: {exc}", file=sys.stderr)
sys.exit(1)
borg_command = build_borg_command(
repo=args.borg_repo,
archive_name=archive_name,
include_paths=include_paths,
)
include_entries, review_entries, skip_entries = _extract_plan_sections(plan)
include_paths = _collect_include_paths(include_entries)
project_root = compose_path.parent
borg_argv = build_borg_argv(
repo=args.borg_repo,
archive_name=archive_name,
include_paths=include_paths,
)
should_show_plan = args.borg or (not args.automation and not args.quiet)
if args.json:
print(
json.dumps(
plan_to_json_dict(
compose_file=compose_file,
if should_show_plan:
_print_borg_plan(
compose_path=compose_path,
project_root=project_root,
plan=plan,
borg_repo=args.borg_repo if (args.borg or args.run_borg) else None,
borg_archive=archive_name,
borg_command=borg_command,
missing_include=missing_include,
missing_review=missing_review,
),
indent=2,
include_entries=include_entries,
review_entries=review_entries,
skip_entries=skip_entries,
repo=args.repo,
)
)
if args.fail_on_missing and all_missing:
sys.exit(2)
return
print_human_summary(compose_file, project_root, plan)
print_missing_paths_report(missing_include, missing_review)
print_created_paths(created_paths)
if all_missing and not args.apply_mkdir:
print("Suggested fix:")
print(build_mkdir_suggestion([item["source"] for item in all_missing]))
if args.fail_on_review and review_entries:
if args.automation or args.quiet:
print("REVIEW required", file=sys.stderr)
else:
print()
if borg_command:
print("Suggested borg command:")
print(borg_command)
print()
if args.fail_on_missing and all_missing:
print("ERROR: Failing because include/review paths are missing.")
sys.exit(2)
print("Review required before automated backup can proceed.", file=sys.stderr)
sys.exit(4)
if args.run_borg:
if borg_argv is None:
raise SystemExit("Internal error: borg command was not prepared")
if not args.repo:
print("Error: --run-borg requires --repo", file=sys.stderr)
sys.exit(2)
print("Running borg create...")
print(" ".join(shlex.quote(part) for part in borg_argv))
if not include_paths:
print("Error: no include paths found for borg backup", file=sys.stderr)
sys.exit(3)
if not args.quiet:
print()
print("Running borg backup...")
print("======================")
try:
completed = subprocess.run(borg_argv, check=False)
except FileNotFoundError as exc:
raise SystemExit("borg executable not found in PATH") from exc
exit_code = run_borg_create(
repo=args.repo,
include_paths=include_paths,
passphrase=args.passphrase,
quiet=args.quiet,
stats=not args.quiet,
progress=not args.quiet,
)
if completed.returncode != 0:
raise SystemExit(completed.returncode)
if exit_code != 0:
print(f"Error: borg exited with status {exit_code}", file=sys.stderr)
sys.exit(exit_code)
if not args.quiet:
print()
print("Borg backup completed successfully.")
sys.exit(0)
if __name__ == "__main__":

View file

@ -1,63 +1,31 @@
from __future__ import annotations
from dataclasses import asdict, dataclass, field
from dataclasses import dataclass
from pathlib import Path
@dataclass(slots=True)
class MountMapping:
source: str
target: str
kind: str
read_only: bool = False
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class MountEntry:
source: Path
service: str = "unknown"
target: str = "unknown"
classification: str = "unknown"
reason: str = ""
exists: bool = False
@dataclass(slots=True)
class ServiceDefinition:
name: str
image: str | None = None
restart: str | None = None
env_files: list[str] = field(default_factory=list)
mounts: list[MountMapping] = field(default_factory=list)
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class ValidationResult:
missing: list[MountEntry]
present: list[MountEntry]
@dataclass(slots=True)
class ComposeProject:
name: str
root_path: str
compose_files: list[str] = field(default_factory=list)
services: list[ServiceDefinition] = field(default_factory=list)
named_volumes: list[str] = field(default_factory=list)
backup_paths: list[str] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"name": self.name,
"root_path": self.root_path,
"compose_files": self.compose_files,
"services": [service.to_dict() for service in self.services],
"named_volumes": self.named_volumes,
"backup_paths": self.backup_paths,
}
@property
def service_names(self) -> list[str]:
return [service.name for service in self.services]
DEFAULT_COMPOSE_FILENAMES = {
"docker-compose.yml",
"docker-compose.yaml",
"compose.yml",
"compose.yaml",
}
def normalize_path(path: Path) -> str:
return str(path.resolve())
@dataclass
class BorgSettings:
repo: str
archive_name: str
passphrase_present: bool
automation: bool
auto_init_repo: bool
encryption: str
quiet: bool = False

View file

@ -1,222 +1,165 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
from typing import Any, Dict, List
import yaml
from dockervault.models import (
ComposeProject,
DEFAULT_COMPOSE_FILENAMES,
MountMapping,
ServiceDefinition,
normalize_path,
)
from dockervault.classification.models import MountCandidate
def find_compose_files(base_path: Path) -> list[Path]:
"""Find likely Docker Compose files under base_path."""
matches: list[Path] = []
class DockerComposeScanner:
def __init__(self, compose_file: str | Path):
self.compose_file = Path(compose_file)
self.base_dir = self.compose_file.parent
for path in base_path.rglob("*"):
if path.is_file() and path.name in DEFAULT_COMPOSE_FILENAMES:
matches.append(path)
def load_compose(self) -> Dict[str, Any]:
with self.compose_file.open("r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
return sorted(matches)
def scan(self) -> List[MountCandidate]:
compose = self.load_compose()
services = compose.get("services", {})
project_name = compose.get("name") or self.base_dir.name
candidates: List[MountCandidate] = []
def load_yaml_file(compose_path: Path) -> dict[str, Any]:
try:
content = compose_path.read_text(encoding="utf-8")
except UnicodeDecodeError:
content = compose_path.read_text(encoding="utf-8", errors="ignore")
for service_name, service_def in services.items():
image = service_def.get("image", "")
env = self._normalize_environment(service_def.get("environment", {}))
volumes = service_def.get("volumes", [])
for volume in volumes:
candidate = self._parse_volume(
service_name=service_name,
image=image,
volume=volume,
env=env,
compose_project=project_name,
)
if candidate:
candidates.append(candidate)
return candidates
def _normalize_environment(self, env: Any) -> Dict[str, str]:
if isinstance(env, dict):
return {str(k): str(v) for k, v in env.items()}
if isinstance(env, list):
parsed: Dict[str, str] = {}
for item in env:
if isinstance(item, str) and "=" in item:
key, value = item.split("=", 1)
parsed[key] = value
return parsed
data = yaml.safe_load(content) or {}
if not isinstance(data, dict):
return {}
return data
def parse_env_files(value: Any) -> list[str]:
if isinstance(value, str):
return [value]
if isinstance(value, list):
items: list[str] = []
for item in value:
if isinstance(item, str):
items.append(item)
elif isinstance(item, dict):
path = item.get("path")
if isinstance(path, str):
items.append(path)
return sorted(set(items))
return []
def normalize_volume_dict(volume: dict[str, Any]) -> MountMapping | None:
source = volume.get("source") or volume.get("src") or ""
target = volume.get("target") or volume.get("dst") or volume.get("destination") or ""
if not isinstance(target, str) or not target:
return None
kind = volume.get("type") or ("bind" if source and str(source).startswith(("/", ".", "~")) else "volume")
read_only = bool(volume.get("read_only") or volume.get("readonly"))
return MountMapping(
source=str(source),
target=target,
kind=str(kind),
read_only=read_only,
def _parse_volume(
self,
service_name: str,
image: str,
volume: Any,
env: Dict[str, str],
compose_project: str,
) -> MountCandidate | None:
if isinstance(volume, str):
return self._parse_short_syntax(
service_name=service_name,
image=image,
volume=volume,
env=env,
compose_project=compose_project,
)
if isinstance(volume, dict):
return self._parse_long_syntax(
service_name=service_name,
image=image,
volume=volume,
env=env,
compose_project=compose_project,
)
return None
def _parse_short_syntax(
self,
service_name: str,
image: str,
volume: str,
env: Dict[str, str],
compose_project: str,
) -> MountCandidate | None:
parts = volume.split(":")
def normalize_volume_string(value: str) -> MountMapping | None:
parts = value.split(":")
if len(parts) == 1:
return MountMapping(source="", target=parts[0], kind="anonymous", read_only=False)
# Anonymous volume style: "/data"
return MountCandidate(
service_name=service_name,
image=image,
source="",
target=parts[0],
mount_type="volume",
read_only=False,
env=env,
compose_project=compose_project,
)
if len(parts) >= 2:
source = parts[0]
target = parts[1]
options = parts[2:]
read_only = any(option == "ro" for option in options)
options = parts[2:] if len(parts) > 2 else []
read_only = "ro" in options
if source.startswith(("/", ".", "~")):
kind = "bind"
else:
kind = "volume"
mount_type = self._guess_mount_type(source)
return MountMapping(source=source, target=target, kind=kind, read_only=read_only)
return MountCandidate(
service_name=service_name,
image=image,
source=source,
target=target,
mount_type=mount_type,
read_only=read_only,
env=env,
compose_project=compose_project,
)
return None
def _parse_long_syntax(
self,
service_name: str,
image: str,
volume: Dict[str, Any],
env: Dict[str, str],
compose_project: str,
) -> MountCandidate | None:
source = volume.get("source", "") or volume.get("src", "")
target = volume.get("target", "") or volume.get("dst", "") or volume.get("destination", "")
mount_type = volume.get("type", self._guess_mount_type(str(source)))
read_only = bool(volume.get("read_only", False))
def parse_mounts(value: Any) -> list[MountMapping]:
mounts: list[MountMapping] = []
if not target:
return None
if not isinstance(value, list):
return mounts
for item in value:
mapping: MountMapping | None = None
if isinstance(item, str):
mapping = normalize_volume_string(item)
elif isinstance(item, dict):
mapping = normalize_volume_dict(item)
if mapping:
mounts.append(mapping)
return mounts
def parse_service_definition(name: str, data: Any) -> ServiceDefinition:
if not isinstance(data, dict):
return ServiceDefinition(name=name)
mounts = parse_mounts(data.get("volumes", []))
env_files = parse_env_files(data.get("env_file"))
return ServiceDefinition(
name=name,
image=data.get("image") if isinstance(data.get("image"), str) else None,
restart=data.get("restart") if isinstance(data.get("restart"), str) else None,
env_files=env_files,
mounts=mounts,
return MountCandidate(
service_name=service_name,
image=image,
source=str(source),
target=str(target),
mount_type=str(mount_type),
read_only=read_only,
env=env,
compose_project=compose_project,
)
def _guess_mount_type(self, source: str) -> str:
if not source:
return "volume"
def merge_service(existing: ServiceDefinition, incoming: ServiceDefinition) -> ServiceDefinition:
mounts_by_key: dict[tuple[str, str, str, bool], MountMapping] = {
(mount.source, mount.target, mount.kind, mount.read_only): mount
for mount in existing.mounts
}
for mount in incoming.mounts:
mounts_by_key[(mount.source, mount.target, mount.kind, mount.read_only)] = mount
if source.startswith("/") or source.startswith("./") or source.startswith("../"):
return "bind"
env_files = sorted(set(existing.env_files) | set(incoming.env_files))
return ServiceDefinition(
name=existing.name,
image=incoming.image or existing.image,
restart=incoming.restart or existing.restart,
env_files=env_files,
mounts=sorted(mounts_by_key.values(), key=lambda item: (item.target, item.source, item.kind)),
)
def extract_project_from_compose(folder: Path, compose_files: list[Path]) -> ComposeProject:
services_by_name: dict[str, ServiceDefinition] = {}
named_volumes: set[str] = set()
backup_paths: set[str] = set()
for compose_file in sorted(compose_files):
data = load_yaml_file(compose_file)
for volume_name in (data.get("volumes") or {}).keys() if isinstance(data.get("volumes"), dict) else []:
if isinstance(volume_name, str):
named_volumes.add(volume_name)
raw_services = data.get("services") or {}
if not isinstance(raw_services, dict):
continue
for service_name, service_data in raw_services.items():
if not isinstance(service_name, str):
continue
incoming = parse_service_definition(service_name, service_data)
if service_name in services_by_name:
services_by_name[service_name] = merge_service(services_by_name[service_name], incoming)
else:
services_by_name[service_name] = incoming
for service in services_by_name.values():
for mount in service.mounts:
if mount.kind == "bind" and mount.source:
candidate = Path(mount.source).expanduser()
if not candidate.is_absolute():
candidate = (folder / candidate).resolve()
backup_paths.add(str(candidate))
for env_file in service.env_files:
candidate = Path(env_file).expanduser()
if not candidate.is_absolute():
candidate = (folder / candidate).resolve()
backup_paths.add(str(candidate))
return ComposeProject(
name=folder.name,
root_path=normalize_path(folder),
compose_files=[file.name for file in sorted(compose_files)],
services=sorted(services_by_name.values(), key=lambda item: item.name),
named_volumes=sorted(named_volumes),
backup_paths=sorted(backup_paths),
)
def group_projects_by_folder(compose_files: list[Path]) -> list[ComposeProject]:
grouped: dict[Path, list[Path]] = {}
for compose_file in compose_files:
grouped.setdefault(compose_file.parent, []).append(compose_file)
projects: list[ComposeProject] = []
for folder, files in sorted(grouped.items()):
projects.append(extract_project_from_compose(folder, files))
return projects
def scan_projects(base_path: Path) -> list[ComposeProject]:
if not base_path.exists():
raise FileNotFoundError(f"Path does not exist: {base_path}")
if not base_path.is_dir():
raise NotADirectoryError(f"Path is not a directory: {base_path}")
compose_files = find_compose_files(base_path)
return group_projects_by_folder(compose_files)
return "volume"

View file

@ -0,0 +1,47 @@
from dockervault.classification.engine import ClassificationEngine
from dockervault.classification.models import MountCandidate, Classification
def test_minecraft():
engine = ClassificationEngine()
c = MountCandidate(
service_name="mc",
image="itzg/minecraft-server",
source="data",
target="/data",
mount_type="bind"
)
result = engine.classify(c)
assert result.classification == Classification.CRITICAL
def test_database():
engine = ClassificationEngine()
c = MountCandidate(
service_name="db",
image="mysql",
source="db",
target="/var/lib/mysql",
mount_type="bind"
)
result = engine.classify(c)
assert result.classification == Classification.CRITICAL
def test_logs():
engine = ClassificationEngine()
c = MountCandidate(
service_name="nginx",
image="nginx",
source="logs",
target="/var/log/nginx",
mount_type="bind"
)
result = engine.classify(c)
assert result.classification == Classification.OPTIONAL

44
dockervault/validation.py Normal file
View file

@ -0,0 +1,44 @@
from __future__ import annotations
from pathlib import Path
from .models import MountEntry, ValidationResult
def validate_paths(include_entries: list[MountEntry], review_entries: list[MountEntry]) -> ValidationResult:
missing: list[MountEntry] = []
present: list[MountEntry] = []
for entry in [*include_entries, *review_entries]:
entry.exists = entry.source.exists()
if entry.exists:
present.append(entry)
else:
missing.append(entry)
return ValidationResult(missing=missing, present=present)
def mkdir_target_for_missing(entry: MountEntry) -> Path:
"""
Heuristic:
- If path looks like a file path (has suffix), create parent directory.
- Otherwise create the directory path itself.
"""
source = entry.source
if source.suffix and not source.name.startswith("."):
return source.parent
return source
def apply_mkdir_for_missing(missing: list[MountEntry]) -> list[Path]:
created: list[Path] = []
for entry in missing:
target = mkdir_target_for_missing(entry)
if target.exists():
continue
target.mkdir(parents=True, exist_ok=True)
created.append(target)
return created

View file

@ -0,0 +1,171 @@
from __future__ import annotations
import json
import shutil
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Any
@dataclass
class NamedVolumeResolution:
compose_name: str
docker_name: str | None
mountpoint: Path | None
available: bool
reason: str | None = None
def docker_available() -> bool:
return shutil.which("docker") is not None
def run_docker_volume_inspect(volume_name: str) -> dict[str, Any] | None:
if not docker_available():
return None
try:
result = subprocess.run(
["docker", "volume", "inspect", volume_name],
capture_output=True,
text=True,
check=False,
)
except OSError:
return None
if result.returncode != 0:
return None
try:
data = json.loads(result.stdout)
except json.JSONDecodeError:
return None
if not isinstance(data, list) or not data:
return None
item = data[0]
if not isinstance(item, dict):
return None
return item
def infer_project_name(compose_path: Path, compose_data: dict[str, Any]) -> str:
top_level_name = compose_data.get("name")
if isinstance(top_level_name, str) and top_level_name.strip():
return top_level_name.strip()
return compose_path.parent.name
def normalize_top_level_volume_name(
volume_key: str,
compose_data: dict[str, Any],
) -> tuple[str | None, bool]:
"""
Returns:
(explicit_name_or_none, is_external)
"""
volumes = compose_data.get("volumes", {})
if not isinstance(volumes, dict):
return None, False
cfg = volumes.get(volume_key)
if not isinstance(cfg, dict):
return None, False
explicit_name = cfg.get("name")
if not isinstance(explicit_name, str):
explicit_name = None
external = cfg.get("external", False)
is_external = False
if isinstance(external, bool):
is_external = external
elif isinstance(external, dict):
is_external = True
ext_name = external.get("name")
if isinstance(ext_name, str) and ext_name.strip():
explicit_name = ext_name.strip()
return explicit_name, is_external
def build_volume_candidates(
compose_name: str,
compose_path: Path,
compose_data: dict[str, Any],
) -> list[str]:
"""
Try likely Docker volume names in a sensible order.
"""
candidates: list[str] = []
project_name = infer_project_name(compose_path, compose_data)
explicit_name, is_external = normalize_top_level_volume_name(compose_name, compose_data)
# 1) explicit external/name override
if explicit_name:
candidates.append(explicit_name)
# 2) external volumes often use raw name directly
if is_external:
candidates.append(compose_name)
# 3) raw compose source
candidates.append(compose_name)
# 4) compose-created default name: <project>_<volume>
candidates.append(f"{project_name}_{compose_name}")
# de-dup while preserving order
unique: list[str] = []
seen: set[str] = set()
for c in candidates:
if c not in seen:
unique.append(c)
seen.add(c)
return unique
def resolve_named_volume(
compose_name: str,
compose_path: Path,
compose_data: dict[str, Any],
) -> NamedVolumeResolution:
if not docker_available():
return NamedVolumeResolution(
compose_name=compose_name,
docker_name=None,
mountpoint=None,
available=False,
reason="docker CLI not available",
)
for candidate in build_volume_candidates(compose_name, compose_path, compose_data):
inspected = run_docker_volume_inspect(candidate)
if not inspected:
continue
mountpoint = inspected.get("Mountpoint")
if isinstance(mountpoint, str) and mountpoint.strip():
return NamedVolumeResolution(
compose_name=compose_name,
docker_name=candidate,
mountpoint=Path(mountpoint),
available=True,
reason=None,
)
return NamedVolumeResolution(
compose_name=compose_name,
docker_name=None,
mountpoint=None,
available=False,
reason="docker volume not found or not inspectable",
)

View file

@ -0,0 +1,35 @@
version: "3.9"
services:
db:
image: mariadb:10.11
container_name: dv-db
restart: unless-stopped
environment:
MYSQL_ROOT_PASSWORD: example
MYSQL_DATABASE: testdb
MYSQL_USER: test
MYSQL_PASSWORD: test
volumes:
- ./db:/var/lib/mysql
mc:
image: itzg/minecraft-server:latest
container_name: dv-mc
restart: unless-stopped
environment:
EULA: "TRUE"
MEMORY: "1G"
ports:
- "25565:25565"
volumes:
- ./mc-missing:/data # <-- med vilje mangler denne
nginx:
image: nginx:latest
container_name: dv-nginx
restart: unless-stopped
ports:
- "8080:80"
volumes:
- ./logs:/var/log/nginx