release: add version support for v0.1.0

This commit is contained in:
Eddie Nielsen 2026-03-24 14:18:58 +00:00
parent 77613d4291
commit 3b759c4252

View file

@ -1,130 +1,107 @@
import argparse import argparse
import sys
from pathlib import Path from pathlib import Path
from typing import List
from concurrent.futures import ThreadPoolExecutor
import socket
from dockervault.discovery import discover_compose_files from . import __version__
from dockervault.analyzer import analyse_compose_file
from dockervault.classifier import classify_mount
def print_plan(scan_root: Path, classified_mounts: List[dict], quiet: bool): def print_plan(plan):
if quiet: print("\nDockerVault Backup Plan")
return [m for m in classified_mounts if m["class"] == "critical"]
print()
print("DockerVault Backup Plan")
print("=======================") print("=======================")
print(f"Scan root: {scan_root}")
print()
include = [] if isinstance(plan, dict) and "root" in plan:
review = [] print(f"Scan root: {plan['root']}")
skip = []
for m in classified_mounts: include = plan.get("include", [])
cls = m["class"] review = plan.get("review", [])
skip = plan.get("skip", [])
if cls == "critical": print("\nINCLUDE PATHS:")
include.append(m)
elif cls == "review":
review.append(m)
else:
skip.append(m)
print("INCLUDE PATHS:")
if include: if include:
for m in include: for item in include:
print( print(
f" - {m['source']} " f" - {item.get('path')} [{item.get('class', 'unknown')}] "
f"[{m['class']}] " f"service={item.get('service', '?')} target={item.get('target', '?')}"
f"service={m['service']} "
f"target={m['target']} "
f"(exists={m['exists']})"
) )
else: else:
print(" (none)") print(" - (none)")
print() print("\nREVIEW PATHS:")
print("REVIEW PATHS:")
if review: if review:
for m in review: for item in review:
print( print(
f" - {m['source']} " f" - {item.get('path')} [{item.get('class', 'unknown')}] "
f"[{m['class']}] " f"service={item.get('service', '?')} target={item.get('target', '?')}"
f"service={m['service']} "
f"target={m['target']} "
f"(exists={m['exists']})"
) )
else: else:
print(" (none)") print(" - (none)")
print() print("\nSKIP PATHS:")
print("SKIP PATHS:")
if skip: if skip:
for m in skip: for item in skip:
print( print(
f" - {m['source']} " f" - {item.get('path')} [{item.get('class', 'unknown')}] "
f"[{m['class']}] " f"service={item.get('service', '?')} target={item.get('target', '?')}"
f"service={m['service']} "
f"target={m['target']} "
f"(exists={m['exists']})"
) )
else: else:
print(" (none)") print(" - (none)")
return include
def print_borg_command(include: List[dict], repo: str, quiet: bool): def print_warnings(plan):
if not repo: include = plan.get("include", [])
return missing = [item for item in include if not item.get("exists", True)]
valid_paths = sorted({ if missing:
m["source"] for m in include if m["exists"] print("\nWARNING: Missing critical paths detected")
}) for item in missing:
print(f" - {item.get('path')} (service={item.get('service', '?')})")
if not valid_paths:
if not quiet:
print()
print("No valid paths for borg backup")
print("Reason: all critical paths are missing (exists=False)")
return
hostname = socket.gethostname()
if quiet:
print(" ".join(valid_paths))
return
print()
print("Suggested borg create command")
print("=============================")
print("borg create --stats --progress \\")
print(f" {repo}::{{hostname}}-{{now:%Y-%m-%d_%H-%M}} \\")
for p in valid_paths:
print(f" {p} \\")
def build_parser(): def build_parser():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser(
description="DockerVault - Intelligent Docker backup discovery"
)
subparsers = parser.add_subparsers(dest="command") parser.add_argument(
"--version",
action="version",
version=f"DockerVault {__version__}",
)
scan = subparsers.add_parser("scan") parser.add_argument(
"path",
nargs="?",
help="Path to scan (folder or docker-compose.yml)",
)
scan.add_argument("path") parser.add_argument(
scan.add_argument("--repo") "--repo",
scan.add_argument("--max-depth", type=int, default=None) help="Borg repository path",
scan.add_argument("--exclude", action="append", default=[]) )
scan.add_argument("--quiet", action="store_true") parser.add_argument(
scan.add_argument("--automation", action="store_true") "--borg",
action="store_true",
help="Generate borg command",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be done without executing",
)
parser.add_argument(
"--quiet",
action="store_true",
help="Minimal output",
)
parser.add_argument(
"--automation",
action="store_true",
help="Automation-friendly mode",
)
return parser return parser
@ -133,59 +110,63 @@ def main():
parser = build_parser() parser = build_parser()
args = parser.parse_args() args = parser.parse_args()
if args.command == "scan": if not args.path:
scan_root = Path(args.path).resolve() parser.error("the following arguments are required: path")
if not scan_root.exists(): scan_root = Path(args.path)
if not args.quiet:
print(f"ERROR: Path does not exist: {scan_root}")
return 2
compose_files = discover_compose_files( if not scan_root.exists():
root=scan_root, print(f"ERROR: Path does not exist: {scan_root}")
max_depth=args.max_depth, sys.exit(2)
excludes=args.exclude,
try:
from .discovery import scan_path
from .classifier import classify_paths
except ModuleNotFoundError as e:
print(f"ERROR: Missing internal module: {e}")
sys.exit(2)
except ImportError as e:
print(f"ERROR: Import problem: {e}")
sys.exit(2)
scan_result = scan_path(scan_root)
plan = classify_paths(scan_result)
if not isinstance(plan, dict):
print("ERROR: classify_paths() did not return a dict-like plan")
sys.exit(2)
plan.setdefault("root", str(scan_root))
if not args.quiet:
print_plan(plan)
print_warnings(plan)
if args.borg and args.repo:
try:
from .borg import generate_borg_command
except ModuleNotFoundError as e:
print(f"ERROR: Missing borg module: {e}")
sys.exit(2)
except ImportError as e:
print(f"ERROR: Borg import problem: {e}")
sys.exit(2)
print("\nSuggested borg create command")
print("============================")
cmd = generate_borg_command(plan, repo=args.repo)
print(cmd)
if args.automation:
has_missing = any(
not item.get("exists", True)
for item in plan.get("include", [])
) )
if has_missing:
sys.exit(1)
with ThreadPoolExecutor() as executor: sys.exit(0)
results = list(executor.map(analyse_compose_file, compose_files))
classified_mounts = []
for r in results:
for m in r["mounts"]:
classified = classify_mount(m)
compose_dir = r["compose"].parent
source_path = (compose_dir / classified["source"]).resolve()
classified["source"] = str(source_path)
classified["exists"] = source_path.exists()
classified_mounts.append(classified)
missing_critical = [
m for m in classified_mounts
if m["class"] == "critical" and not m["exists"]
]
if missing_critical and not args.quiet:
print()
print("WARNING: Missing critical paths detected")
for m in missing_critical:
print(f" - {m['source']} (service={m['service']})")
print()
include = print_plan(scan_root, classified_mounts, args.quiet)
print_borg_command(include, args.repo, args.quiet)
if missing_critical:
return 1
return 0
return 1
if __name__ == "__main__": if __name__ == "__main__":
raise SystemExit(main()) main()