diff --git a/README.md b/README.md index d6d149e..8e06b00 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@
-
+
-Built with β€οΈ for Lanx by NodeFox π¦ -Maintained by Eddie Nielsen & NodeFox π¦ -Feel free to contribute, suggest improvements or fork the project. -
diff --git a/dockervault/borg.py b/dockervault/borg.py new file mode 100644 index 0000000..3645f98 --- /dev/null +++ b/dockervault/borg.py @@ -0,0 +1,115 @@ +from __future__ import annotations + +import os +import shlex +import socket +import subprocess +from datetime import datetime +from pathlib import Path +from typing import Iterable + + +def borg_env(passphrase: str | None = None) -> dict[str, str]: + env = os.environ.copy() + + if passphrase: + env["BORG_PASSPHRASE"] = passphrase + + return env + + +def build_archive_name(prefix: str | None = None) -> str: + """ + Build a borg archive name. + + Default format: + hostname-YYYY-MM-DD_HH-MM-SS + + With prefix: + prefix-hostname-YYYY-MM-DD_HH-MM-SS + """ + hostname = socket.gethostname() + timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + + if prefix: + return f"{prefix}-{hostname}-{timestamp}" + + return f"{hostname}-{timestamp}" + + +def normalize_include_paths(include_paths: Iterable[str | Path]) -> list[str]: + normalized: list[str] = [] + seen: set[str] = set() + + for path in include_paths: + resolved = str(Path(path)) + + if resolved not in seen: + seen.add(resolved) + normalized.append(resolved) + + return normalized + + +def build_borg_create_command( + repo: str, + include_paths: Iterable[str | Path], + archive_name: str | None = None, + stats: bool = True, + progress: bool = True, +) -> list[str]: + normalized_paths = normalize_include_paths(include_paths) + + if not normalized_paths: + raise ValueError("No include paths provided for borg backup.") + + if archive_name is None: + archive_name = build_archive_name() + + command = ["borg", "create"] + + if stats: + command.append("--stats") + + if progress: + command.append("--progress") + + command.append(f"{repo}::{archive_name}") + command.extend(normalized_paths) + + return command + + +def command_to_shell(command: list[str]) -> str: + return " ".join(shlex.quote(part) for part in command) + + +def run_borg_create( + repo: str, + include_paths: Iterable[str | Path], + passphrase: str | None = None, + archive_name: str | None = None, + stats: bool = True, + progress: bool = True, + quiet: bool = False, +) -> int: + command = build_borg_create_command( + repo=repo, + include_paths=include_paths, + archive_name=archive_name, + stats=stats, + progress=progress, + ) + + stdout = subprocess.DEVNULL if quiet else None + stderr = subprocess.DEVNULL if quiet else None + + result = subprocess.run( + command, + env=borg_env(passphrase), + stdout=stdout, + stderr=stderr, + check=False, + ) + + return result.returncode diff --git a/dockervault/classification/__init__.py b/dockervault/classification/__init__.py new file mode 100644 index 0000000..ceebe03 --- /dev/null +++ b/dockervault/classification/__init__.py @@ -0,0 +1,15 @@ +from .engine import ClassificationEngine +from .models import ( + Classification, + ClassificationResult, + MountCandidate, + RuleEvidence, +) + +__all__ = [ + "ClassificationEngine", + "Classification", + "ClassificationResult", + "MountCandidate", + "RuleEvidence", +] diff --git a/dockervault/classification/defaults.py b/dockervault/classification/defaults.py new file mode 100644 index 0000000..212443d --- /dev/null +++ b/dockervault/classification/defaults.py @@ -0,0 +1,80 @@ +DATABASE_PATH_KEYWORDS = [ + "/var/lib/mysql", + "/var/lib/mariadb", + "/var/lib/postgresql", + "/var/lib/postgresql/data", + "/data/db", +] + +CONFIG_PATH_KEYWORDS = [ + "/config", + "/app/config", + "/settings", + "/etc", +] + +DATA_PATH_KEYWORDS = [ + "/data", + "/app/data", + "/srv/data", + "/var/lib", +] + +EPHEMERAL_PATH_KEYWORDS = [ + "/tmp", + "/var/tmp", + "/cache", + "/var/cache", + "/transcode", + "/run", + "/var/run", +] + +LOG_PATH_KEYWORDS = [ + "/logs", + "/log", + "/var/log", +] + +DATABASE_IMAGE_HINTS = [ + "mysql", + "mariadb", + "postgres", + "postgresql", + "mongo", + "mongodb", + "redis", +] + +KNOWN_IMPORTANT_IMAGE_HINTS = [ + "nextcloud", + "grafana", + "vaultwarden", + "gitea", + "portainer", + "paperless", + "immich", + "wordpress", + "nginx", + "traefik", + "minecraft", + "itzg", +] + +MINECRAFT_IMAGE_HINTS = [ + "minecraft", + "itzg", +] + +MINECRAFT_CRITICAL_PATHS = [ + "/data", + "/server", + "/minecraft", +] + +MINECRAFT_IMPORTANT_PATHS = [ + "/plugins", + "/config", + "/mods", + "/world", +] diff --git a/dockervault/classification/engine.py b/dockervault/classification/engine.py new file mode 100644 index 0000000..bf2df65 --- /dev/null +++ b/dockervault/classification/engine.py @@ -0,0 +1,52 @@ +from collections import defaultdict + +from .models import ClassificationResult, Classification +from .rules import DEFAULT_RULES +from .utils import unique_preserve_order + + +class ClassificationEngine: + def __init__(self, rules=None): + self.rules = rules or DEFAULT_RULES + + def classify(self, candidate): + scores = defaultdict(int) + reasons = [] + tags = [] + matched = [] + + for rule in self.rules: + results = rule(candidate) + for result in results: + scores[result.classification] += result.score + reasons.extend(result.reasons) + tags.extend(result.tags) + matched.append(result.rule_name) + + if not scores: + return ClassificationResult( + candidate=candidate, + classification=Classification.UNKNOWN, + confidence=0.0, + score=0, + reasons=["No rules matched"], + tags=["unknown"], + matched_rules=[], + score_breakdown={}, + ) + + classification, score = max(scores.items(), key=lambda item: item[1]) + + total_score = sum(scores.values()) + confidence = score / total_score if total_score else 0.0 + + return ClassificationResult( + candidate=candidate, + classification=classification, + confidence=round(confidence, 2), + score=score, + reasons=reasons, + tags=unique_preserve_order(tags), + matched_rules=unique_preserve_order(matched), + score_breakdown={cls.value: value for cls, value in scores.items()}, + ) diff --git a/dockervault/classification/models.py b/dockervault/classification/models.py new file mode 100644 index 0000000..e16d07d --- /dev/null +++ b/dockervault/classification/models.py @@ -0,0 +1,44 @@ +from dataclasses import dataclass, field +from enum import Enum +from typing import Dict, List, Optional + + +class Classification(str, Enum): + CRITICAL = "critical" + IMPORTANT = "important" + OPTIONAL = "optional" + EPHEMERAL = "ephemeral" + UNKNOWN = "unknown" + + +@dataclass +class MountCandidate: + service_name: str + image: str + source: str + target: str + mount_type: str + read_only: bool = False + env: Dict[str, str] = field(default_factory=dict) + compose_project: Optional[str] = None + + +@dataclass +class RuleEvidence: + rule_name: str + classification: Classification + score: int + reasons: List[str] = field(default_factory=list) + tags: List[str] = field(default_factory=list) + + +@dataclass +class ClassificationResult: + candidate: MountCandidate + classification: Classification + confidence: float + score: int + reasons: List[str] + tags: List[str] + matched_rules: List[str] + score_breakdown: Dict[str, int] = field(default_factory=dict) diff --git a/dockervault/classification/rules.py b/dockervault/classification/rules.py new file mode 100644 index 0000000..f3321c1 --- /dev/null +++ b/dockervault/classification/rules.py @@ -0,0 +1,73 @@ +from typing import List +from .models import Classification, MountCandidate, RuleEvidence +from .defaults import * +from .utils import norm, path_contains, text_contains + + +def rule_minecraft(candidate: MountCandidate) -> List[RuleEvidence]: + image = norm(candidate.image) + target = norm(candidate.target) + + if any(h in image for h in MINECRAFT_IMAGE_HINTS): + if any(p in target for p in MINECRAFT_CRITICAL_PATHS): + return [RuleEvidence("minecraft_critical", Classification.CRITICAL, 45, + [f"{candidate.target} looks like Minecraft world data"], ["minecraft"])] + if any(p in target for p in MINECRAFT_IMPORTANT_PATHS): + return [RuleEvidence("minecraft_important", Classification.IMPORTANT, 25, + [f"{candidate.target} looks like Minecraft config/plugins"], ["minecraft"])] + + return [] + + +def rule_database(candidate: MountCandidate) -> List[RuleEvidence]: + if path_contains(candidate.target, DATABASE_PATH_KEYWORDS): + return [RuleEvidence("db_path", Classification.CRITICAL, 40, + [f"{candidate.target} is database path"], ["database"])] + + if text_contains(candidate.image, DATABASE_IMAGE_HINTS): + return [RuleEvidence("db_image", Classification.CRITICAL, 25, + [f"{candidate.image} looks like DB"], ["database"])] + + return [] + + +def rule_config(candidate: MountCandidate) -> List[RuleEvidence]: + if path_contains(candidate.target, CONFIG_PATH_KEYWORDS): + return [RuleEvidence("config", Classification.IMPORTANT, 20, + [f"{candidate.target} is config"], ["config"])] + + return [] + + +def rule_data(candidate: MountCandidate) -> List[RuleEvidence]: + if path_contains(candidate.target, DATA_PATH_KEYWORDS): + return [RuleEvidence("data", Classification.IMPORTANT, 20, + [f"{candidate.target} is data"], ["data"])] + + return [] + + +def rule_ephemeral(candidate: MountCandidate) -> List[RuleEvidence]: + if path_contains(candidate.target, EPHEMERAL_PATH_KEYWORDS): + return [RuleEvidence("ephemeral", Classification.EPHEMERAL, 35, + [f"{candidate.target} is temp/cache"], ["ephemeral"])] + + return [] + + +def rule_logs(candidate: MountCandidate) -> List[RuleEvidence]: + if path_contains(candidate.target, LOG_PATH_KEYWORDS): + return [RuleEvidence("logs", Classification.OPTIONAL, 15, + [f"{candidate.target} is logs"], ["logs"])] + + return [] + + +DEFAULT_RULES = [ + rule_minecraft, + rule_database, + rule_config, + rule_data, + rule_ephemeral, + rule_logs, +] diff --git a/dockervault/classification/utils.py b/dockervault/classification/utils.py new file mode 100644 index 0000000..7949432 --- /dev/null +++ b/dockervault/classification/utils.py @@ -0,0 +1,22 @@ +def norm(value: str) -> str: + return (value or "").strip().lower() + + +def path_contains(target: str, keywords): + target = norm(target) + return any(k in target for k in keywords) + + +def text_contains(value: str, keywords): + value = norm(value) + return any(k in value for k in keywords) + + +def unique_preserve_order(values): + seen = set() + result = [] + for v in values: + if v not in seen: + seen.add(v) + result.append(v) + return result diff --git a/dockervault/cli.py b/dockervault/cli.py index 3e88ab3..75100b0 100644 --- a/dockervault/cli.py +++ b/dockervault/cli.py @@ -1,334 +1,292 @@ from __future__ import annotations import argparse -import json -import shlex -import socket -import subprocess import sys -from datetime import datetime from pathlib import Path -from typing import Any +from typing import Any, Iterable -from dockervault.classifier import classify_compose +from .borg import ( + build_borg_create_command, + command_to_shell, + run_borg_create, +) +from .classifier import classify_compose -def check_path_exists(path: str) -> bool: - return Path(path).exists() +def _get_value(obj: Any, *names: str, default: Any = None) -> Any: + for name in names: + if isinstance(obj, dict) and name in obj: + return obj[name] + if hasattr(obj, name): + return getattr(obj, name) + return default -def create_missing_paths(paths: list[str]) -> list[str]: - created: list[str] = [] - for path in sorted(set(paths)): - p = Path(path) - if not p.exists(): - p.mkdir(parents=True, exist_ok=True) - created.append(str(p)) - return created +def _normalize_entries(entries: Any) -> list[dict[str, Any]]: + if not entries: + return [] + + normalized: list[dict[str, Any]] = [] + + for entry in entries: + if isinstance(entry, dict): + normalized.append( + { + "source": ( + entry.get("source") + or entry.get("path") + or entry.get("host_path") + or entry.get("src") + ), + "service": entry.get("service"), + "target": ( + entry.get("target") + or entry.get("mount_target") + or entry.get("container_path") + or entry.get("destination") + ), + "classification": ( + entry.get("classification") + or entry.get("priority") + or entry.get("category") + or entry.get("kind") + ), + "reason": entry.get("reason"), + } + ) + continue + + normalized.append( + { + "source": _get_value(entry, "source", "path", "host_path", "src"), + "service": _get_value(entry, "service"), + "target": _get_value( + entry, "target", "mount_target", "container_path", "destination" + ), + "classification": _get_value( + entry, "classification", "priority", "category", "kind" + ), + "reason": _get_value(entry, "reason"), + } + ) + + return normalized -def build_mkdir_suggestion(paths: list[str]) -> str: - unique_paths = sorted(set(paths)) - lines = ["mkdir -p \\"] - for index, path in enumerate(unique_paths): - suffix = " \\" if index < len(unique_paths) - 1 else "" - lines.append(f" {path}{suffix}") - return "\n".join(lines) - - -def render_borg_archive(template: str, project: str, compose_path: Path) -> str: - now = datetime.now() - hostname = socket.gethostname() - compose_stem = compose_path.stem - - return template.format( - hostname=hostname, - project=project, - compose_stem=compose_stem, - now=now, +def _extract_plan_sections( + plan: Any, +) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]: + include_entries = _normalize_entries( + _get_value(plan, "include", "include_paths", "includes", default=[]) + ) + review_entries = _normalize_entries( + _get_value(plan, "review", "review_paths", "reviews", default=[]) + ) + skip_entries = _normalize_entries( + _get_value(plan, "skip", "skip_paths", "skips", default=[]) ) - -def build_borg_command(repo: str, archive_name: str, include_paths: list[str]) -> str: - lines = [ - "borg create --stats --progress \\", - f" {repo}::{archive_name} \\", - ] - - for index, path in enumerate(include_paths): - suffix = " \\" if index < len(include_paths) - 1 else "" - lines.append(f" {path}{suffix}") - - return "\n".join(lines) + return include_entries, review_entries, skip_entries -def build_borg_argv(repo: str, archive_name: str, include_paths: list[str]) -> list[str]: - return [ - "borg", - "create", - "--stats", - "--progress", - f"{repo}::{archive_name}", - *include_paths, - ] +def _entry_path(entry: dict[str, Any]) -> str: + return str(entry.get("source") or "(unknown)") -def find_missing_paths( - plan: dict[str, Any], -) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: - missing_include = [ - item for item in plan.get("include", []) - if not check_path_exists(item["source"]) - ] +def _entry_label(entry: dict[str, Any]) -> str: + classification = entry.get("classification") or "unknown" + service = entry.get("service") or "unknown" + target = entry.get("target") or "unknown" + reason = entry.get("reason") - missing_review = [ - item for item in plan.get("review", []) - if not check_path_exists(item["source"]) - ] - - return missing_include, missing_review + label = f"[{classification}] service={service} target={target}" + if reason: + label += f" reason={reason}" + return label -def print_human_summary(compose_file: Path, project_root: Path, plan: dict[str, Any]) -> None: - print("DockerVault Backup Plan") - print("=======================") - print(f"Compose file: {compose_file.resolve()}") - print(f"Project root: {project_root.resolve()}") - print() - - for section in ["include", "review", "skip"]: - print(f"{section.upper()} PATHS:") - items = plan.get(section, []) - if items: - for item in items: - exists = check_path_exists(item["source"]) - status = "β exists" if exists else "β missing" - print( - f" - {item['source']} " - f"[{item['priority']}] {status} " - f"service={item['service']} target={item['target']}" - ) - else: - print(" - (none)") - print() - - -def print_missing_paths_report( - missing_include: list[dict[str, Any]], - missing_review: list[dict[str, Any]], -) -> None: - all_missing = missing_include + missing_review - if not all_missing: +def _print_section(title: str, entries: Iterable[dict[str, Any]]) -> None: + entries = list(entries) + print(f"{title}:") + if not entries: + print(" - (none)") return - print("WARNING: Missing paths detected:") - for item in all_missing: - bucket = "include" if item in missing_include else "review" - print(f" - {item['source']} (service={item['service']}, bucket={bucket})") - print() + for entry in entries: + print(f" - {_entry_path(entry):<40} {_entry_label(entry)}") -def print_created_paths(created_paths: list[str]) -> None: - if not created_paths: - return +def _collect_include_paths(include_entries: Iterable[dict[str, Any]]) -> list[str]: + paths: list[str] = [] + seen: set[str] = set() - print("Created missing paths:") - for path in created_paths: - print(f" - {path}") - print() + for entry in include_entries: + path = _entry_path(entry) + if path == "(unknown)" or path in seen: + continue + seen.add(path) + paths.append(path) + + return paths -def plan_to_json_dict( - compose_file: Path, +def _print_borg_plan( + compose_path: Path, project_root: Path, - plan: dict[str, Any], - borg_repo: str | None = None, - borg_archive: str | None = None, - borg_command: str | None = None, - missing_include: list[dict[str, Any]] | None = None, - missing_review: list[dict[str, Any]] | None = None, -) -> dict[str, Any]: - return { - "compose_file": str(compose_file.resolve()), - "project_root": str(project_root.resolve()), - "include": plan.get("include", []), - "review": plan.get("review", []), - "skip": plan.get("skip", []), - "missing": { - "include": missing_include or [], - "review": missing_review or [], - }, - "borg": { - "repo": borg_repo, - "archive": borg_archive, - "command": borg_command, - } - if borg_repo or borg_archive or borg_command - else None, - } + include_entries: list[dict[str, Any]], + review_entries: list[dict[str, Any]], + skip_entries: list[dict[str, Any]], + repo: str | None, +) -> None: + print() + print("Borg Backup Plan") + print("================") + print(f"Compose file: {compose_path}") + print(f"Project root: {project_root}") + print() + + _print_section("INCLUDE PATHS", include_entries) + print() + _print_section("REVIEW PATHS", review_entries) + print() + _print_section("SKIP PATHS", skip_entries) + + include_paths = _collect_include_paths(include_entries) + + if repo and include_paths: + command = build_borg_create_command( + repo=repo, + include_paths=include_paths, + ) + print() + print("Suggested borg create command") + print("=============================") + print(command_to_shell(command)) -def main() -> None: +def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( - description="DockerVault - intelligent Docker backup discovery" + prog="dockervault", + description="DockerVault - intelligent Docker backup discovery", ) - parser.add_argument( - "compose_file", - nargs="?", - default="docker-compose.yml", - help="Path to docker-compose.yml", - ) - - parser.add_argument( - "--json", - action="store_true", - help="Print plan as JSON", - ) + parser.add_argument("compose", help="Path to docker-compose.yml") parser.add_argument( "--borg", action="store_true", - help="Show suggested borg create command", + help="Show borg backup plan and suggested command", ) parser.add_argument( "--run-borg", action="store_true", - help="Execute borg create", + help="Run borg create using discovered include paths", ) parser.add_argument( - "--borg-repo", - default="/backup-repo", - help="Borg repository path or URI (default: /backup-repo)", + "--repo", + help="Borg repository path, e.g. /mnt/backups/borg/dockervault", ) parser.add_argument( - "--borg-archive", - default="{hostname}-{now:%Y-%m-%d_%H-%M}", - help=( - "Archive naming template. Supported fields: " - "{hostname}, {project}, {compose_stem}, {now:...}" - ), + "--passphrase", + help="Optional borg passphrase", ) parser.add_argument( - "--fail-on-missing", + "--quiet", action="store_true", - help="Exit with status 2 if include/review paths are missing", + help="Suppress borg stdout/stderr output during execution", ) parser.add_argument( - "--apply-mkdir", + "--automation", action="store_true", - help="Create missing include/review paths", + help="Automation mode: minimal output, non-interactive behavior", ) + parser.add_argument( + "--fail-on-review", + action="store_true", + help="Exit with code 4 if review paths are present", + ) + + return parser + + +def main() -> None: + parser = build_parser() args = parser.parse_args() - compose_file = Path(args.compose_file).resolve() - if not compose_file.exists(): - raise SystemExit(f"Compose file not found: {compose_file}") + compose_path = Path(args.compose).expanduser().resolve() - project_root = compose_file.parent - project_name = project_root.name or compose_file.stem + if not compose_path.exists(): + print(f"Error: compose file not found: {compose_path}", file=sys.stderr) + sys.exit(1) - plan = classify_compose(compose_file) + try: + plan = classify_compose(compose_path) + except Exception as exc: + print(f"Error: failed to classify compose file: {exc}", file=sys.stderr) + sys.exit(1) - missing_include, missing_review = find_missing_paths(plan) - all_missing = missing_include + missing_review + include_entries, review_entries, skip_entries = _extract_plan_sections(plan) + include_paths = _collect_include_paths(include_entries) + project_root = compose_path.parent - created_paths: list[str] = [] - if args.apply_mkdir and all_missing: - created_paths = create_missing_paths([item["source"] for item in all_missing]) - missing_include, missing_review = find_missing_paths(plan) - all_missing = missing_include + missing_review + should_show_plan = args.borg or (not args.automation and not args.quiet) - borg_command: str | None = None - borg_argv: list[str] | None = None - archive_name: str | None = None - - if args.borg or args.run_borg: - include_paths = [item["source"] for item in plan.get("include", [])] - - try: - archive_name = render_borg_archive( - args.borg_archive, - project_name, - compose_file, - ) - except KeyError as exc: - raise SystemExit( - f"Invalid borg archive template field: {exc}. " - "Allowed: hostname, project, compose_stem, now" - ) from exc - - borg_command = build_borg_command( - repo=args.borg_repo, - archive_name=archive_name, - include_paths=include_paths, + if should_show_plan: + _print_borg_plan( + compose_path=compose_path, + project_root=project_root, + include_entries=include_entries, + review_entries=review_entries, + skip_entries=skip_entries, + repo=args.repo, ) - borg_argv = build_borg_argv( - repo=args.borg_repo, - archive_name=archive_name, - include_paths=include_paths, - ) - - if args.json: - print( - json.dumps( - plan_to_json_dict( - compose_file=compose_file, - project_root=project_root, - plan=plan, - borg_repo=args.borg_repo if (args.borg or args.run_borg) else None, - borg_archive=archive_name, - borg_command=borg_command, - missing_include=missing_include, - missing_review=missing_review, - ), - indent=2, - ) - ) - if args.fail_on_missing and all_missing: - sys.exit(2) - return - - print_human_summary(compose_file, project_root, plan) - print_missing_paths_report(missing_include, missing_review) - print_created_paths(created_paths) - - if all_missing and not args.apply_mkdir: - print("Suggested fix:") - print(build_mkdir_suggestion([item["source"] for item in all_missing])) - print() - - if borg_command: - print("Suggested borg command:") - print(borg_command) - print() - - if args.fail_on_missing and all_missing: - print("ERROR: Failing because include/review paths are missing.") - sys.exit(2) + if args.fail_on_review and review_entries: + if args.automation or args.quiet: + print("REVIEW required", file=sys.stderr) + else: + print() + print("Review required before automated backup can proceed.", file=sys.stderr) + sys.exit(4) if args.run_borg: - if borg_argv is None: - raise SystemExit("Internal error: borg command was not prepared") + if not args.repo: + print("Error: --run-borg requires --repo", file=sys.stderr) + sys.exit(2) - print("Running borg create...") - print(" ".join(shlex.quote(part) for part in borg_argv)) - print() + if not include_paths: + print("Error: no include paths found for borg backup", file=sys.stderr) + sys.exit(3) - try: - completed = subprocess.run(borg_argv, check=False) - except FileNotFoundError as exc: - raise SystemExit("borg executable not found in PATH") from exc + if not args.quiet: + print() + print("Running borg backup...") + print("======================") - if completed.returncode != 0: - raise SystemExit(completed.returncode) + exit_code = run_borg_create( + repo=args.repo, + include_paths=include_paths, + passphrase=args.passphrase, + quiet=args.quiet, + stats=not args.quiet, + progress=not args.quiet, + ) + + if exit_code != 0: + print(f"Error: borg exited with status {exit_code}", file=sys.stderr) + sys.exit(exit_code) + + if not args.quiet: + print() + print("Borg backup completed successfully.") + + sys.exit(0) if __name__ == "__main__": diff --git a/dockervault/models.py b/dockervault/models.py index f8ed090..403dbdf 100644 --- a/dockervault/models.py +++ b/dockervault/models.py @@ -1,63 +1,31 @@ from __future__ import annotations -from dataclasses import asdict, dataclass, field +from dataclasses import dataclass from pathlib import Path -@dataclass(slots=True) -class MountMapping: - source: str - target: str - kind: str - read_only: bool = False - - def to_dict(self) -> dict: - return asdict(self) +@dataclass +class MountEntry: + source: Path + service: str = "unknown" + target: str = "unknown" + classification: str = "unknown" + reason: str = "" + exists: bool = False -@dataclass(slots=True) -class ServiceDefinition: - name: str - image: str | None = None - restart: str | None = None - env_files: list[str] = field(default_factory=list) - mounts: list[MountMapping] = field(default_factory=list) - - def to_dict(self) -> dict: - return asdict(self) +@dataclass +class ValidationResult: + missing: list[MountEntry] + present: list[MountEntry] -@dataclass(slots=True) -class ComposeProject: - name: str - root_path: str - compose_files: list[str] = field(default_factory=list) - services: list[ServiceDefinition] = field(default_factory=list) - named_volumes: list[str] = field(default_factory=list) - backup_paths: list[str] = field(default_factory=list) - - def to_dict(self) -> dict: - return { - "name": self.name, - "root_path": self.root_path, - "compose_files": self.compose_files, - "services": [service.to_dict() for service in self.services], - "named_volumes": self.named_volumes, - "backup_paths": self.backup_paths, - } - - @property - def service_names(self) -> list[str]: - return [service.name for service in self.services] - - -DEFAULT_COMPOSE_FILENAMES = { - "docker-compose.yml", - "docker-compose.yaml", - "compose.yml", - "compose.yaml", -} - - -def normalize_path(path: Path) -> str: - return str(path.resolve()) +@dataclass +class BorgSettings: + repo: str + archive_name: str + passphrase_present: bool + automation: bool + auto_init_repo: bool + encryption: str + quiet: bool = False diff --git a/dockervault/scanner.py b/dockervault/scanner.py index d4fbf2c..b3b69ce 100644 --- a/dockervault/scanner.py +++ b/dockervault/scanner.py @@ -1,222 +1,165 @@ from __future__ import annotations from pathlib import Path -from typing import Any +from typing import Any, Dict, List import yaml -from dockervault.models import ( - ComposeProject, - DEFAULT_COMPOSE_FILENAMES, - MountMapping, - ServiceDefinition, - normalize_path, -) +from dockervault.classification.models import MountCandidate -def find_compose_files(base_path: Path) -> list[Path]: - """Find likely Docker Compose files under base_path.""" - matches: list[Path] = [] +class DockerComposeScanner: + def __init__(self, compose_file: str | Path): + self.compose_file = Path(compose_file) + self.base_dir = self.compose_file.parent - for path in base_path.rglob("*"): - if path.is_file() and path.name in DEFAULT_COMPOSE_FILENAMES: - matches.append(path) + def load_compose(self) -> Dict[str, Any]: + with self.compose_file.open("r", encoding="utf-8") as f: + return yaml.safe_load(f) or {} - return sorted(matches) + def scan(self) -> List[MountCandidate]: + compose = self.load_compose() + services = compose.get("services", {}) + project_name = compose.get("name") or self.base_dir.name + candidates: List[MountCandidate] = [] -def load_yaml_file(compose_path: Path) -> dict[str, Any]: - try: - content = compose_path.read_text(encoding="utf-8") - except UnicodeDecodeError: - content = compose_path.read_text(encoding="utf-8", errors="ignore") + for service_name, service_def in services.items(): + image = service_def.get("image", "") + env = self._normalize_environment(service_def.get("environment", {})) + volumes = service_def.get("volumes", []) + + for volume in volumes: + candidate = self._parse_volume( + service_name=service_name, + image=image, + volume=volume, + env=env, + compose_project=project_name, + ) + if candidate: + candidates.append(candidate) + + return candidates + + def _normalize_environment(self, env: Any) -> Dict[str, str]: + if isinstance(env, dict): + return {str(k): str(v) for k, v in env.items()} + + if isinstance(env, list): + parsed: Dict[str, str] = {} + for item in env: + if isinstance(item, str) and "=" in item: + key, value = item.split("=", 1) + parsed[key] = value + return parsed - data = yaml.safe_load(content) or {} - if not isinstance(data, dict): return {} - return data + def _parse_volume( + self, + service_name: str, + image: str, + volume: Any, + env: Dict[str, str], + compose_project: str, + ) -> MountCandidate | None: + if isinstance(volume, str): + return self._parse_short_syntax( + service_name=service_name, + image=image, + volume=volume, + env=env, + compose_project=compose_project, + ) -def parse_env_files(value: Any) -> list[str]: - if isinstance(value, str): - return [value] + if isinstance(volume, dict): + return self._parse_long_syntax( + service_name=service_name, + image=image, + volume=volume, + env=env, + compose_project=compose_project, + ) - if isinstance(value, list): - items: list[str] = [] - for item in value: - if isinstance(item, str): - items.append(item) - elif isinstance(item, dict): - path = item.get("path") - if isinstance(path, str): - items.append(path) - return sorted(set(items)) - - return [] - - -def normalize_volume_dict(volume: dict[str, Any]) -> MountMapping | None: - source = volume.get("source") or volume.get("src") or "" - target = volume.get("target") or volume.get("dst") or volume.get("destination") or "" - if not isinstance(target, str) or not target: return None - kind = volume.get("type") or ("bind" if source and str(source).startswith(("/", ".", "~")) else "volume") - read_only = bool(volume.get("read_only") or volume.get("readonly")) + def _parse_short_syntax( + self, + service_name: str, + image: str, + volume: str, + env: Dict[str, str], + compose_project: str, + ) -> MountCandidate | None: + parts = volume.split(":") - return MountMapping( - source=str(source), - target=target, - kind=str(kind), - read_only=read_only, - ) + if len(parts) == 1: + # Anonymous volume style: "/data" + return MountCandidate( + service_name=service_name, + image=image, + source="", + target=parts[0], + mount_type="volume", + read_only=False, + env=env, + compose_project=compose_project, + ) + if len(parts) >= 2: + source = parts[0] + target = parts[1] + options = parts[2:] if len(parts) > 2 else [] + read_only = "ro" in options -def normalize_volume_string(value: str) -> MountMapping | None: - parts = value.split(":") - if len(parts) == 1: - return MountMapping(source="", target=parts[0], kind="anonymous", read_only=False) + mount_type = self._guess_mount_type(source) - if len(parts) >= 2: - source = parts[0] - target = parts[1] - options = parts[2:] - read_only = any(option == "ro" for option in options) + return MountCandidate( + service_name=service_name, + image=image, + source=source, + target=target, + mount_type=mount_type, + read_only=read_only, + env=env, + compose_project=compose_project, + ) - if source.startswith(("/", ".", "~")): - kind = "bind" - else: - kind = "volume" + return None - return MountMapping(source=source, target=target, kind=kind, read_only=read_only) + def _parse_long_syntax( + self, + service_name: str, + image: str, + volume: Dict[str, Any], + env: Dict[str, str], + compose_project: str, + ) -> MountCandidate | None: + source = volume.get("source", "") or volume.get("src", "") + target = volume.get("target", "") or volume.get("dst", "") or volume.get("destination", "") + mount_type = volume.get("type", self._guess_mount_type(str(source))) + read_only = bool(volume.get("read_only", False)) - return None + if not target: + return None + return MountCandidate( + service_name=service_name, + image=image, + source=str(source), + target=str(target), + mount_type=str(mount_type), + read_only=read_only, + env=env, + compose_project=compose_project, + ) -def parse_mounts(value: Any) -> list[MountMapping]: - mounts: list[MountMapping] = [] + def _guess_mount_type(self, source: str) -> str: + if not source: + return "volume" - if not isinstance(value, list): - return mounts + if source.startswith("/") or source.startswith("./") or source.startswith("../"): + return "bind" - for item in value: - mapping: MountMapping | None = None - if isinstance(item, str): - mapping = normalize_volume_string(item) - elif isinstance(item, dict): - mapping = normalize_volume_dict(item) - - if mapping: - mounts.append(mapping) - - return mounts - - -def parse_service_definition(name: str, data: Any) -> ServiceDefinition: - if not isinstance(data, dict): - return ServiceDefinition(name=name) - - mounts = parse_mounts(data.get("volumes", [])) - env_files = parse_env_files(data.get("env_file")) - - return ServiceDefinition( - name=name, - image=data.get("image") if isinstance(data.get("image"), str) else None, - restart=data.get("restart") if isinstance(data.get("restart"), str) else None, - env_files=env_files, - mounts=mounts, - ) - - -def merge_service(existing: ServiceDefinition, incoming: ServiceDefinition) -> ServiceDefinition: - mounts_by_key: dict[tuple[str, str, str, bool], MountMapping] = { - (mount.source, mount.target, mount.kind, mount.read_only): mount - for mount in existing.mounts - } - for mount in incoming.mounts: - mounts_by_key[(mount.source, mount.target, mount.kind, mount.read_only)] = mount - - env_files = sorted(set(existing.env_files) | set(incoming.env_files)) - - return ServiceDefinition( - name=existing.name, - image=incoming.image or existing.image, - restart=incoming.restart or existing.restart, - env_files=env_files, - mounts=sorted(mounts_by_key.values(), key=lambda item: (item.target, item.source, item.kind)), - ) - - -def extract_project_from_compose(folder: Path, compose_files: list[Path]) -> ComposeProject: - services_by_name: dict[str, ServiceDefinition] = {} - named_volumes: set[str] = set() - backup_paths: set[str] = set() - - for compose_file in sorted(compose_files): - data = load_yaml_file(compose_file) - - for volume_name in (data.get("volumes") or {}).keys() if isinstance(data.get("volumes"), dict) else []: - if isinstance(volume_name, str): - named_volumes.add(volume_name) - - raw_services = data.get("services") or {} - if not isinstance(raw_services, dict): - continue - - for service_name, service_data in raw_services.items(): - if not isinstance(service_name, str): - continue - - incoming = parse_service_definition(service_name, service_data) - if service_name in services_by_name: - services_by_name[service_name] = merge_service(services_by_name[service_name], incoming) - else: - services_by_name[service_name] = incoming - - for service in services_by_name.values(): - for mount in service.mounts: - if mount.kind == "bind" and mount.source: - candidate = Path(mount.source).expanduser() - if not candidate.is_absolute(): - candidate = (folder / candidate).resolve() - backup_paths.add(str(candidate)) - - for env_file in service.env_files: - candidate = Path(env_file).expanduser() - if not candidate.is_absolute(): - candidate = (folder / candidate).resolve() - backup_paths.add(str(candidate)) - - return ComposeProject( - name=folder.name, - root_path=normalize_path(folder), - compose_files=[file.name for file in sorted(compose_files)], - services=sorted(services_by_name.values(), key=lambda item: item.name), - named_volumes=sorted(named_volumes), - backup_paths=sorted(backup_paths), - ) - - -def group_projects_by_folder(compose_files: list[Path]) -> list[ComposeProject]: - grouped: dict[Path, list[Path]] = {} - - for compose_file in compose_files: - grouped.setdefault(compose_file.parent, []).append(compose_file) - - projects: list[ComposeProject] = [] - - for folder, files in sorted(grouped.items()): - projects.append(extract_project_from_compose(folder, files)) - - return projects - - -def scan_projects(base_path: Path) -> list[ComposeProject]: - if not base_path.exists(): - raise FileNotFoundError(f"Path does not exist: {base_path}") - - if not base_path.is_dir(): - raise NotADirectoryError(f"Path is not a directory: {base_path}") - - compose_files = find_compose_files(base_path) - return group_projects_by_folder(compose_files) + return "volume" diff --git a/dockervault/tests/test_classification_engine.py b/dockervault/tests/test_classification_engine.py new file mode 100644 index 0000000..36d651f --- /dev/null +++ b/dockervault/tests/test_classification_engine.py @@ -0,0 +1,47 @@ +from dockervault.classification.engine import ClassificationEngine +from dockervault.classification.models import MountCandidate, Classification + + +def test_minecraft(): + engine = ClassificationEngine() + + c = MountCandidate( + service_name="mc", + image="itzg/minecraft-server", + source="data", + target="/data", + mount_type="bind" + ) + + result = engine.classify(c) + assert result.classification == Classification.CRITICAL + + +def test_database(): + engine = ClassificationEngine() + + c = MountCandidate( + service_name="db", + image="mysql", + source="db", + target="/var/lib/mysql", + mount_type="bind" + ) + + result = engine.classify(c) + assert result.classification == Classification.CRITICAL + + +def test_logs(): + engine = ClassificationEngine() + + c = MountCandidate( + service_name="nginx", + image="nginx", + source="logs", + target="/var/log/nginx", + mount_type="bind" + ) + + result = engine.classify(c) + assert result.classification == Classification.OPTIONAL diff --git a/dockervault/validation.py b/dockervault/validation.py new file mode 100644 index 0000000..e876a7f --- /dev/null +++ b/dockervault/validation.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from pathlib import Path + +from .models import MountEntry, ValidationResult + + +def validate_paths(include_entries: list[MountEntry], review_entries: list[MountEntry]) -> ValidationResult: + missing: list[MountEntry] = [] + present: list[MountEntry] = [] + + for entry in [*include_entries, *review_entries]: + entry.exists = entry.source.exists() + if entry.exists: + present.append(entry) + else: + missing.append(entry) + + return ValidationResult(missing=missing, present=present) + + +def mkdir_target_for_missing(entry: MountEntry) -> Path: + """ + Heuristic: + - If path looks like a file path (has suffix), create parent directory. + - Otherwise create the directory path itself. + """ + source = entry.source + if source.suffix and not source.name.startswith("."): + return source.parent + return source + + +def apply_mkdir_for_missing(missing: list[MountEntry]) -> list[Path]: + created: list[Path] = [] + + for entry in missing: + target = mkdir_target_for_missing(entry) + if target.exists(): + continue + target.mkdir(parents=True, exist_ok=True) + created.append(target) + + return created diff --git a/dockervault/volume_inspector.py b/dockervault/volume_inspector.py new file mode 100644 index 0000000..9cbdb01 --- /dev/null +++ b/dockervault/volume_inspector.py @@ -0,0 +1,171 @@ +from __future__ import annotations + +import json +import shutil +import subprocess +from dataclasses import dataclass +from pathlib import Path +from typing import Any + + +@dataclass +class NamedVolumeResolution: + compose_name: str + docker_name: str | None + mountpoint: Path | None + available: bool + reason: str | None = None + + +def docker_available() -> bool: + return shutil.which("docker") is not None + + +def run_docker_volume_inspect(volume_name: str) -> dict[str, Any] | None: + if not docker_available(): + return None + + try: + result = subprocess.run( + ["docker", "volume", "inspect", volume_name], + capture_output=True, + text=True, + check=False, + ) + except OSError: + return None + + if result.returncode != 0: + return None + + try: + data = json.loads(result.stdout) + except json.JSONDecodeError: + return None + + if not isinstance(data, list) or not data: + return None + + item = data[0] + if not isinstance(item, dict): + return None + + return item + + +def infer_project_name(compose_path: Path, compose_data: dict[str, Any]) -> str: + top_level_name = compose_data.get("name") + if isinstance(top_level_name, str) and top_level_name.strip(): + return top_level_name.strip() + + return compose_path.parent.name + + +def normalize_top_level_volume_name( + volume_key: str, + compose_data: dict[str, Any], +) -> tuple[str | None, bool]: + """ + Returns: + (explicit_name_or_none, is_external) + """ + volumes = compose_data.get("volumes", {}) + if not isinstance(volumes, dict): + return None, False + + cfg = volumes.get(volume_key) + if not isinstance(cfg, dict): + return None, False + + explicit_name = cfg.get("name") + if not isinstance(explicit_name, str): + explicit_name = None + + external = cfg.get("external", False) + is_external = False + + if isinstance(external, bool): + is_external = external + elif isinstance(external, dict): + is_external = True + ext_name = external.get("name") + if isinstance(ext_name, str) and ext_name.strip(): + explicit_name = ext_name.strip() + + return explicit_name, is_external + + +def build_volume_candidates( + compose_name: str, + compose_path: Path, + compose_data: dict[str, Any], +) -> list[str]: + """ + Try likely Docker volume names in a sensible order. + """ + candidates: list[str] = [] + project_name = infer_project_name(compose_path, compose_data) + + explicit_name, is_external = normalize_top_level_volume_name(compose_name, compose_data) + + # 1) explicit external/name override + if explicit_name: + candidates.append(explicit_name) + + # 2) external volumes often use raw name directly + if is_external: + candidates.append(compose_name) + + # 3) raw compose source + candidates.append(compose_name) + + # 4) compose-created default name: