feat: add YAML parsing and backup detection

This commit is contained in:
Eddie Nielsen 2026-03-22 12:04:11 +00:00
parent cf630318db
commit 21906ff37e
9 changed files with 249 additions and 45 deletions

4
.gitignore vendored Normal file
View file

@ -0,0 +1,4 @@
.venv/
*.egg-info/
__pycache__/
*.pyc

View file

@ -46,7 +46,31 @@ def render_text(projects: list) -> str:
lines.append(f"- {project.name}")
lines.append(f" Path: {project.root_path}")
lines.append(f" Compose files: {', '.join(project.compose_files) or '-'}")
lines.append(f" Services: {', '.join(project.services) or '-'}")
lines.append(f" Services: {', '.join(project.service_names) or '-'}")
lines.append(f" Named volumes: {', '.join(project.named_volumes) or '-'}")
if project.backup_paths:
lines.append(" Backup candidates:")
for backup_path in project.backup_paths:
lines.append(f" - {backup_path}")
else:
lines.append(" Backup candidates: -")
for service in project.services:
lines.append(f" Service: {service.name}")
lines.append(f" Image: {service.image or '-'}")
lines.append(f" Restart: {service.restart or '-'}")
lines.append(f" Env files: {', '.join(service.env_files) or '-'}")
if service.mounts:
lines.append(" Mounts:")
for mount in service.mounts:
ro = " (ro)" if mount.read_only else ""
lines.append(
f" - {mount.kind}: {mount.source or '[anonymous]'} -> {mount.target}{ro}"
)
else:
lines.append(" Mounts: -")
return "\n".join(lines)
@ -61,6 +85,12 @@ def main() -> int:
except (FileNotFoundError, NotADirectoryError) as exc:
print(f"Error: {exc}", file=sys.stderr)
return 2
except json.JSONDecodeError as exc:
print(f"Error: invalid JSON/YAML data: {exc}", file=sys.stderr)
return 2
except Exception as exc:
print(f"Error: {exc}", file=sys.stderr)
return 2
if args.json:
print(json.dumps([project.to_dict() for project in projects], indent=2))

View file

@ -4,15 +4,51 @@ from dataclasses import asdict, dataclass, field
from pathlib import Path
@dataclass(slots=True)
class MountMapping:
source: str
target: str
kind: str
read_only: bool = False
def to_dict(self) -> dict:
return asdict(self)
@dataclass(slots=True)
class ServiceDefinition:
name: str
image: str | None = None
restart: str | None = None
env_files: list[str] = field(default_factory=list)
mounts: list[MountMapping] = field(default_factory=list)
def to_dict(self) -> dict:
return asdict(self)
@dataclass(slots=True)
class ComposeProject:
name: str
root_path: str
compose_files: list[str] = field(default_factory=list)
services: list[str] = field(default_factory=list)
services: list[ServiceDefinition] = field(default_factory=list)
named_volumes: list[str] = field(default_factory=list)
backup_paths: list[str] = field(default_factory=list)
def to_dict(self) -> dict:
return asdict(self)
return {
"name": self.name,
"root_path": self.root_path,
"compose_files": self.compose_files,
"services": [service.to_dict() for service in self.services],
"named_volumes": self.named_volumes,
"backup_paths": self.backup_paths,
}
@property
def service_names(self) -> list[str]:
return [service.name for service in self.services]
DEFAULT_COMPOSE_FILENAMES = {

View file

@ -1,11 +1,17 @@
from __future__ import annotations
import re
from pathlib import Path
from typing import Any
from dockervault.models import ComposeProject, DEFAULT_COMPOSE_FILENAMES, normalize_path
import yaml
SERVICE_LINE_RE = re.compile(r"^\s{2}([A-Za-z0-9_.-]+):\s*$")
from dockervault.models import (
ComposeProject,
DEFAULT_COMPOSE_FILENAMES,
MountMapping,
ServiceDefinition,
normalize_path,
)
def find_compose_files(base_path: Path) -> list[Path]:
@ -19,41 +25,176 @@ def find_compose_files(base_path: Path) -> list[Path]:
return sorted(matches)
def parse_services_from_compose(compose_path: Path) -> list[str]:
"""
Light parser for service names.
Keeps dependencies minimal for v0.
It looks for the `services:` block and collects entries indented by two spaces.
"""
def load_yaml_file(compose_path: Path) -> dict[str, Any]:
try:
lines = compose_path.read_text(encoding="utf-8").splitlines()
content = compose_path.read_text(encoding="utf-8")
except UnicodeDecodeError:
lines = compose_path.read_text(encoding="utf-8", errors="ignore").splitlines()
content = compose_path.read_text(encoding="utf-8", errors="ignore")
in_services = False
services: list[str] = []
data = yaml.safe_load(content) or {}
if not isinstance(data, dict):
return {}
return data
for line in lines:
stripped = line.strip()
if not stripped or stripped.startswith("#"):
def parse_env_files(value: Any) -> list[str]:
if isinstance(value, str):
return [value]
if isinstance(value, list):
items: list[str] = []
for item in value:
if isinstance(item, str):
items.append(item)
elif isinstance(item, dict):
path = item.get("path")
if isinstance(path, str):
items.append(path)
return sorted(set(items))
return []
def normalize_volume_dict(volume: dict[str, Any]) -> MountMapping | None:
source = volume.get("source") or volume.get("src") or ""
target = volume.get("target") or volume.get("dst") or volume.get("destination") or ""
if not isinstance(target, str) or not target:
return None
kind = volume.get("type") or ("bind" if source and str(source).startswith(("/", ".", "~")) else "volume")
read_only = bool(volume.get("read_only") or volume.get("readonly"))
return MountMapping(
source=str(source),
target=target,
kind=str(kind),
read_only=read_only,
)
def normalize_volume_string(value: str) -> MountMapping | None:
parts = value.split(":")
if len(parts) == 1:
return MountMapping(source="", target=parts[0], kind="anonymous", read_only=False)
if len(parts) >= 2:
source = parts[0]
target = parts[1]
options = parts[2:]
read_only = any(option == "ro" for option in options)
if source.startswith(("/", ".", "~")):
kind = "bind"
else:
kind = "volume"
return MountMapping(source=source, target=target, kind=kind, read_only=read_only)
return None
def parse_mounts(value: Any) -> list[MountMapping]:
mounts: list[MountMapping] = []
if not isinstance(value, list):
return mounts
for item in value:
mapping: MountMapping | None = None
if isinstance(item, str):
mapping = normalize_volume_string(item)
elif isinstance(item, dict):
mapping = normalize_volume_dict(item)
if mapping:
mounts.append(mapping)
return mounts
def parse_service_definition(name: str, data: Any) -> ServiceDefinition:
if not isinstance(data, dict):
return ServiceDefinition(name=name)
mounts = parse_mounts(data.get("volumes", []))
env_files = parse_env_files(data.get("env_file"))
return ServiceDefinition(
name=name,
image=data.get("image") if isinstance(data.get("image"), str) else None,
restart=data.get("restart") if isinstance(data.get("restart"), str) else None,
env_files=env_files,
mounts=mounts,
)
def merge_service(existing: ServiceDefinition, incoming: ServiceDefinition) -> ServiceDefinition:
mounts_by_key: dict[tuple[str, str, str, bool], MountMapping] = {
(mount.source, mount.target, mount.kind, mount.read_only): mount
for mount in existing.mounts
}
for mount in incoming.mounts:
mounts_by_key[(mount.source, mount.target, mount.kind, mount.read_only)] = mount
env_files = sorted(set(existing.env_files) | set(incoming.env_files))
return ServiceDefinition(
name=existing.name,
image=incoming.image or existing.image,
restart=incoming.restart or existing.restart,
env_files=env_files,
mounts=sorted(mounts_by_key.values(), key=lambda item: (item.target, item.source, item.kind)),
)
def extract_project_from_compose(folder: Path, compose_files: list[Path]) -> ComposeProject:
services_by_name: dict[str, ServiceDefinition] = {}
named_volumes: set[str] = set()
backup_paths: set[str] = set()
for compose_file in sorted(compose_files):
data = load_yaml_file(compose_file)
for volume_name in (data.get("volumes") or {}).keys() if isinstance(data.get("volumes"), dict) else []:
if isinstance(volume_name, str):
named_volumes.add(volume_name)
raw_services = data.get("services") or {}
if not isinstance(raw_services, dict):
continue
if not in_services:
if stripped == "services:":
in_services = True
continue
for service_name, service_data in raw_services.items():
if not isinstance(service_name, str):
continue
# Leaving top-level services block
if not line.startswith(" ") and not line.startswith("\t"):
break
incoming = parse_service_definition(service_name, service_data)
if service_name in services_by_name:
services_by_name[service_name] = merge_service(services_by_name[service_name], incoming)
else:
services_by_name[service_name] = incoming
match = SERVICE_LINE_RE.match(line)
if match:
services.append(match.group(1))
for service in services_by_name.values():
for mount in service.mounts:
if mount.kind == "bind" and mount.source:
candidate = Path(mount.source).expanduser()
if not candidate.is_absolute():
candidate = (folder / candidate).resolve()
backup_paths.add(str(candidate))
return sorted(set(services))
for env_file in service.env_files:
candidate = Path(env_file).expanduser()
if not candidate.is_absolute():
candidate = (folder / candidate).resolve()
backup_paths.add(str(candidate))
return ComposeProject(
name=folder.name,
root_path=normalize_path(folder),
compose_files=[file.name for file in sorted(compose_files)],
services=sorted(services_by_name.values(), key=lambda item: item.name),
named_volumes=sorted(named_volumes),
backup_paths=sorted(backup_paths),
)
def group_projects_by_folder(compose_files: list[Path]) -> list[ComposeProject]:
@ -65,19 +206,7 @@ def group_projects_by_folder(compose_files: list[Path]) -> list[ComposeProject]:
projects: list[ComposeProject] = []
for folder, files in sorted(grouped.items()):
service_names: set[str] = set()
for compose_file in files:
service_names.update(parse_services_from_compose(compose_file))
projects.append(
ComposeProject(
name=folder.name,
root_path=normalize_path(folder),
compose_files=[file.name for file in sorted(files)],
services=sorted(service_names),
)
)
projects.append(extract_project_from_compose(folder, files))
return projects

View file

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "dockervault"
version = "0.1.0"
version = "0.2.0"
description = "CLI backup discovery tool for Docker environments"
readme = "README.md"
requires-python = ">=3.10"
@ -16,11 +16,16 @@ keywords = ["docker", "backup", "cli", "borg", "inventory"]
classifiers = [
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent"
]
dependencies = [
"PyYAML>=6.0",
]
[project.scripts]
dockervault = "dockervault.cli:main"