feat: add named volume detection and image-aware classification

This commit is contained in:
Eddie Nielsen 2026-03-23 13:27:12 +00:00
parent 932c668e65
commit 483e2720f1
2 changed files with 770 additions and 221 deletions

View file

@ -1,291 +1,546 @@
from __future__ import annotations
import json
import shutil
import subprocess
from pathlib import Path
from typing import Any
import yaml
from .models import MountEntry
CRITICAL_TARGET_PATTERNS = (
"/var/lib/mysql",
"/var/lib/postgresql",
"/var/lib/postgres",
"/var/lib/mariadb",
"/data",
# ----------------------------
# Image-aware rules
# ----------------------------
IMAGE_RULES = {
"mariadb": {
"/var/lib/mysql": "critical",
},
"mysql": {
"/var/lib/mysql": "critical",
},
"postgres": {
"/var/lib/postgresql/data": "critical",
},
"redis": {
"/data": "critical",
},
"grafana": {
"/var/lib/grafana": "critical",
},
"prometheus": {
"/prometheus": "critical",
},
"influxdb": {
"/var/lib/influxdb": "critical",
},
"nginx": {
"/var/log/nginx": "optional",
},
}
# ----------------------------
# Generic rules
# ----------------------------
CRITICAL_TARGETS = {
"/config",
"/var/www",
"/srv",
"/app/data",
"/bitnami",
"/var/opt",
"/data",
"/var/lib/mysql",
"/var/lib/mariadb",
"/var/lib/postgresql/data",
"/bitnami/postgresql",
"/var/lib/redis",
"/redis",
"/var/lib/mongodb",
"/mongodb",
"/data/db",
"/var/lib/grafana",
"/var/lib/influxdb",
"/var/lib/prometheus",
"/etc/letsencrypt",
"/acme.sh",
"/app/data",
"/srv",
}
REVIEW_TARGET_KEYWORDS = {
"backup",
"uploads",
"media",
"www",
"html",
"content",
"storage",
"files",
"database",
"db",
"config",
}
SKIP_TARGET_PREFIXES = (
"/tmp",
"/var/tmp",
"/run",
"/var/run",
"/dev",
)
OPTIONAL_TARGET_PATTERNS = (
SKIP_TARGET_EXACT = {
"/var/log",
"/var/log/nginx",
"/logs",
"/log",
"/tmp",
"/cache",
"/var/cache",
"/run",
)
OPTIONAL_SOURCE_PATTERNS = (
"logs",
"log",
"cache",
"tmp",
"temp",
)
SKIP_TARGET_PATTERNS = (
"/dev",
"/proc",
"/sys",
"/run",
"/tmp",
)
SKIP_SOURCE_PATTERNS = (
"/var/run/docker.sock",
"docker.sock",
)
}
def load_compose(compose_file: Path) -> dict[str, Any]:
CLASS_PRIORITY = {
"critical": 3,
"review": 2,
"optional": 1,
"unknown": 0,
}
# ----------------------------
# Compose loader
# ----------------------------
def load_compose(compose_path: str | Path) -> dict[str, Any]:
compose_file = Path(compose_path).expanduser().resolve()
with compose_file.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
if not isinstance(data, dict):
raise ValueError("Compose file did not parse into a dictionary")
raise ValueError(f"Compose file did not parse as a mapping: {compose_file}")
return data
def is_bind_mount(volume: Any) -> bool:
if isinstance(volume, str):
return ":" in volume
# ----------------------------
# Docker helpers
# ----------------------------
if isinstance(volume, dict):
return volume.get("type") == "bind"
return False
def docker_available() -> bool:
return shutil.which("docker") is not None
def parse_volume_entry(
volume: Any,
compose_file: Path,
) -> dict[str, str] | None:
project_root = compose_file.parent.resolve()
def run_docker_volume_inspect(volume_name: str) -> dict[str, Any] | None:
if not docker_available():
return None
if isinstance(volume, str):
parts = volume.split(":")
if len(parts) < 2:
return None
try:
result = subprocess.run(
["docker", "volume", "inspect", volume_name],
capture_output=True,
text=True,
check=False,
)
except OSError:
return None
source_raw = parts[0].strip()
target = parts[1].strip()
if result.returncode != 0:
return None
if not source_raw or not target:
return None
try:
data = json.loads(result.stdout)
except json.JSONDecodeError:
return None
# Named volumes should not be treated as bind mounts
if not source_raw.startswith(("/", ".", "~")):
return None
if not isinstance(data, list) or not data:
return None
source = resolve_source_path(source_raw, project_root)
first = data[0]
if not isinstance(first, dict):
return None
return {
"source": str(source),
"target": target,
}
if isinstance(volume, dict):
if volume.get("type") != "bind":
return None
source_raw = str(volume.get("source", "")).strip()
target = str(volume.get("target", "")).strip()
if not source_raw or not target:
return None
source = resolve_source_path(source_raw, project_root)
return {
"source": str(source),
"target": target,
}
return None
return first
def resolve_source_path(source_raw: str, project_root: Path) -> Path:
source_path = Path(source_raw).expanduser()
# ----------------------------
# Volume resolution
# ----------------------------
if not source_path.is_absolute():
source_path = (project_root / source_path).resolve()
else:
source_path = source_path.resolve()
def infer_project_name(compose_path: Path, compose_data: dict[str, Any]) -> str:
top_level_name = compose_data.get("name")
if isinstance(top_level_name, str) and top_level_name.strip():
return top_level_name.strip()
return source_path
return compose_path.parent.name
def classify_mount(
service_name: str,
source: str,
target: str,
) -> tuple[str, str, str]:
source_lower = source.lower()
target_lower = target.lower()
def normalize_top_level_volume_name(
volume_key: str,
compose_data: dict[str, Any],
) -> tuple[str | None, bool]:
volumes = compose_data.get("volumes", {})
if not isinstance(volumes, dict):
return None, False
for pattern in SKIP_SOURCE_PATTERNS:
if pattern in source_lower:
return "skip", "optional", "docker runtime socket"
cfg = volumes.get(volume_key)
if not isinstance(cfg, dict):
return None, False
for pattern in SKIP_TARGET_PATTERNS:
if target_lower == pattern or target_lower.startswith(pattern + "/"):
return "skip", "optional", "runtime/system path"
explicit_name = cfg.get("name")
if not isinstance(explicit_name, str) or not explicit_name.strip():
explicit_name = None
for pattern in CRITICAL_TARGET_PATTERNS:
if target_lower == pattern or target_lower.startswith(pattern + "/"):
return "include", "critical", "persistent app data"
external = cfg.get("external", False)
is_external = False
for pattern in OPTIONAL_TARGET_PATTERNS:
if target_lower == pattern or target_lower.startswith(pattern + "/"):
return "skip", "optional", "logs/cache/temp path"
if isinstance(external, bool):
is_external = external
elif isinstance(external, dict):
is_external = True
ext_name = external.get("name")
if isinstance(ext_name, str) and ext_name.strip():
explicit_name = ext_name.strip()
source_name = Path(source).name.lower()
for pattern in OPTIONAL_SOURCE_PATTERNS:
if pattern in source_name:
return "skip", "optional", "logs/cache/temp source"
return "review", "medium", "unknown bind mount"
return explicit_name, is_external
def classify_service_mounts(
service_name: str,
service_data: dict[str, Any],
compose_file: Path,
) -> list[dict[str, str]]:
results: list[dict[str, str]] = []
def build_volume_candidates(
compose_name: str,
compose_path: Path,
compose_data: dict[str, Any],
) -> list[str]:
project_name = infer_project_name(compose_path, compose_data)
explicit_name, is_external = normalize_top_level_volume_name(compose_name, compose_data)
volumes = service_data.get("volumes", [])
if not isinstance(volumes, list):
return results
candidates: list[str] = []
for volume in volumes:
if not is_bind_mount(volume):
if explicit_name:
candidates.append(explicit_name)
if is_external:
candidates.append(compose_name)
candidates.append(compose_name)
candidates.append(f"{project_name}_{compose_name}")
unique: list[str] = []
seen: set[str] = set()
for candidate in candidates:
if candidate not in seen:
unique.append(candidate)
seen.add(candidate)
return unique
def resolve_named_volume(
compose_name: str,
compose_path: Path,
compose_data: dict[str, Any],
) -> tuple[Path | None, str]:
if not docker_available():
return None, "docker CLI not available"
for candidate in build_volume_candidates(compose_name, compose_path, compose_data):
inspected = run_docker_volume_inspect(candidate)
if not inspected:
continue
parsed = parse_volume_entry(volume, compose_file)
if not parsed:
continue
mountpoint = inspected.get("Mountpoint")
if isinstance(mountpoint, str) and mountpoint.strip():
return Path(mountpoint), f"named volume '{compose_name}' -> docker volume '{candidate}'"
bucket, priority, reason = classify_mount(
service_name=service_name,
source=parsed["source"],
target=parsed["target"],
)
results.append(
{
"bucket": bucket,
"priority": priority,
"reason": reason,
"service": service_name,
"source": parsed["source"],
"target": parsed["target"],
}
)
return results
return None, f"named volume '{compose_name}' could not be resolved"
def deduplicate_items(items: list[dict[str, str]]) -> list[dict[str, str]]:
seen: set[tuple[str, str, str, str]] = set()
deduped: list[dict[str, str]] = []
# ----------------------------
# Parsing helpers
# ----------------------------
for item in items:
key = (
item["service"],
item["source"],
item["target"],
item["bucket"],
)
if key in seen:
continue
seen.add(key)
deduped.append(item)
def _extract_image_name(image: str | None) -> str | None:
if not image or not isinstance(image, str):
return None
return deduped
if "/" in image:
image = image.split("/")[-1]
if ":" in image:
image = image.split(":")[0]
return image.lower()
def sort_items(items: list[dict[str, str]]) -> list[dict[str, str]]:
priority_order = {
"critical": 0,
"high": 1,
"medium": 2,
"low": 3,
"optional": 4,
}
return sorted(
items,
key=lambda item: (
priority_order.get(item["priority"], 99),
item["service"],
item["source"],
item["target"],
),
def _is_bind_source(source: str) -> bool:
return (
source.startswith("/")
or source.startswith("./")
or source.startswith("../")
or source.startswith("~/")
)
def classify_compose(compose_file: str | Path) -> dict[str, Any]:
compose_path = Path(compose_file).resolve()
data = load_compose(compose_path)
def _normalize_bind_path(source: str, compose_file: Path) -> Path:
path = Path(source).expanduser()
if path.is_absolute():
return path.resolve()
return (compose_file.parent / path).resolve()
services = data.get("services", {})
if not isinstance(services, dict):
raise ValueError("Compose file does not contain a valid 'services' section")
all_items: list[dict[str, str]] = []
def _parse_volume_string(spec: str) -> dict[str, str | None]:
parts = spec.split(":")
for service_name, service_data in services.items():
if not isinstance(service_data, dict):
if len(parts) == 1:
return {
"source": None,
"target": parts[0],
"mode": None,
"kind": "anonymous",
}
source = parts[0]
target = parts[1]
mode = ":".join(parts[2:]) if len(parts) > 2 else None
kind = "bind" if _is_bind_source(source) else "named"
return {
"source": source,
"target": target,
"mode": mode,
"kind": kind,
}
def _parse_volume_entry(entry: Any) -> dict[str, str | None]:
if isinstance(entry, str):
return _parse_volume_string(entry)
if isinstance(entry, dict):
entry_type = entry.get("type")
source = entry.get("source") or entry.get("src")
target = entry.get("target") or entry.get("dst") or entry.get("destination")
if entry_type == "bind":
kind = "bind"
elif entry_type == "volume":
kind = "named" if source else "anonymous"
else:
if isinstance(source, str) and source:
kind = "bind" if _is_bind_source(source) else "named"
else:
kind = "anonymous"
return {
"source": source,
"target": target,
"mode": None,
"kind": kind,
}
return {
"source": None,
"target": None,
"mode": None,
"kind": "unknown",
}
# ----------------------------
# Classification logic
# ----------------------------
def _classify_target(target_path: str | None, image_name: str | None = None) -> tuple[str, str]:
if not target_path:
return "review", "missing container target path"
if image_name and image_name in IMAGE_RULES:
rules = IMAGE_RULES[image_name]
if target_path in rules:
level = rules[target_path]
if level == "critical":
return "critical", f"{image_name} rule for {target_path}"
if level == "optional":
return "optional", f"{image_name} rule for {target_path}"
if target_path in CRITICAL_TARGETS:
return "critical", f"critical target path {target_path}"
if target_path in SKIP_TARGET_EXACT:
return "optional", f"non-essential target path {target_path}"
if target_path.startswith(SKIP_TARGET_PREFIXES):
return "optional", f"ephemeral target path {target_path}"
lowered = target_path.lower()
for keyword in REVIEW_TARGET_KEYWORDS:
if keyword in lowered:
return "review", f"data-like target path {target_path} requires review"
return "review", f"unknown target path {target_path}"
def _merge_reason(existing: str, new: str) -> str:
if not existing:
return new
if not new or new == existing:
return existing
parts = [p.strip() for p in existing.split(" | ") if p.strip()]
if new not in parts:
parts.append(new)
return " | ".join(parts)
def _prefer_entry(existing: MountEntry, new: MountEntry) -> MountEntry:
existing_priority = CLASS_PRIORITY.get(existing.classification, 0)
new_priority = CLASS_PRIORITY.get(new.classification, 0)
if new_priority > existing_priority:
preferred = new
other = existing
else:
preferred = existing
other = new
preferred.reason = _merge_reason(preferred.reason, other.reason)
if other.service and other.service not in preferred.reason:
preferred.reason = _merge_reason(preferred.reason, f"also used by service={other.service} target={other.target}")
preferred.exists = preferred.exists or other.exists
return preferred
def _dedupe_entries(entries: list[MountEntry]) -> list[MountEntry]:
deduped: dict[str, MountEntry] = {}
for entry in entries:
key = str(entry.source.resolve()) if entry.source.is_absolute() else str(entry.source)
if key not in deduped:
deduped[key] = entry
continue
all_items.extend(
classify_service_mounts(
service_name=service_name,
service_data=service_data,
compose_file=compose_path,
deduped[key] = _prefer_entry(deduped[key], entry)
return list(deduped.values())
def _make_entry(
source: Path,
service: str,
target: str | None,
classification: str,
reason: str,
) -> MountEntry:
return MountEntry(
source=source,
service=service,
target=target or "unknown",
classification=classification,
reason=reason,
exists=source.exists(),
)
# ----------------------------
# Main classifier
# ----------------------------
def classify_compose(compose_path: str | Path) -> list[MountEntry]:
compose_file = Path(compose_path).expanduser().resolve()
compose_data = load_compose(compose_file)
services = compose_data.get("services", {})
if not isinstance(services, dict):
return []
entries: list[MountEntry] = []
for service_name, service_cfg in services.items():
if not isinstance(service_cfg, dict):
continue
raw_volumes = service_cfg.get("volumes", [])
if not isinstance(raw_volumes, list):
continue
image_name = _extract_image_name(service_cfg.get("image"))
for raw_entry in raw_volumes:
parsed = _parse_volume_entry(raw_entry)
source = parsed.get("source")
target = parsed.get("target")
kind = parsed.get("kind")
if kind == "anonymous":
entries.append(
MountEntry(
source=Path("/__anonymous_volume__"),
service=service_name,
target=target or "unknown",
classification="review",
reason="anonymous volume cannot be safely mapped to host path",
exists=False,
)
)
continue
if kind == "bind" and isinstance(source, str):
host_path = _normalize_bind_path(source, compose_file)
classification, base_reason = _classify_target(target, image_name)
reason = f"{base_reason}; bind mount source '{source}' -> '{host_path}'"
entries.append(
_make_entry(
source=host_path,
service=service_name,
target=target,
classification=classification,
reason=reason,
)
)
continue
if kind == "named" and isinstance(source, str):
mountpoint, volume_reason = resolve_named_volume(source, compose_file, compose_data)
if mountpoint is None:
entries.append(
MountEntry(
source=Path(f"/__named_volume_unresolved__/{source}"),
service=service_name,
target=target or "unknown",
classification="review",
reason=volume_reason,
exists=False,
)
)
continue
classification, base_reason = _classify_target(target, image_name)
reason = f"{base_reason}; {volume_reason}; mountpoint '{mountpoint}'"
entries.append(
_make_entry(
source=mountpoint,
service=service_name,
target=target,
classification=classification,
reason=reason,
)
)
continue
entries.append(
MountEntry(
source=Path("/__unknown_volume__"),
service=service_name,
target=target or "unknown",
classification="review",
reason="unrecognized volume entry",
exists=False,
)
)
)
all_items = deduplicate_items(all_items)
all_items = sort_items(all_items)
include = [strip_bucket(item) for item in all_items if item["bucket"] == "include"]
review = [strip_bucket(item) for item in all_items if item["bucket"] == "review"]
skip = [strip_bucket(item) for item in all_items if item["bucket"] == "skip"]
return {
"include": include,
"review": review,
"skip": skip,
}
def strip_bucket(item: dict[str, str]) -> dict[str, str]:
return {
"service": item["service"],
"source": item["source"],
"target": item["target"],
"priority": item["priority"],
"reason": item["reason"],
}
return _dedupe_entries(entries)

View file

@ -0,0 +1,294 @@
from __future__ import annotations
import textwrap
from pathlib import Path
import pytest
from dockervault.classifier import classify_compose
def write_compose(tmp_path: Path, content: str) -> Path:
compose_file = tmp_path / "docker-compose.yml"
compose_file.write_text(textwrap.dedent(content).strip() + "\n", encoding="utf-8")
return compose_file
def find_entry(entries, service: str, target: str):
for entry in entries:
if entry.service == service and entry.target == target:
return entry
raise AssertionError(f"No entry found for service={service!r} target={target!r}")
def test_bind_mount_relative_path_is_resolved_and_classified_critical(tmp_path: Path):
data_dir = tmp_path / "db"
data_dir.mkdir()
compose_file = write_compose(
tmp_path,
"""
services:
db:
image: mariadb:11
volumes:
- ./db:/var/lib/mysql
""",
)
entries = classify_compose(compose_file)
assert len(entries) == 1
entry = find_entry(entries, "db", "/var/lib/mysql")
assert entry.source == data_dir.resolve()
assert entry.classification == "critical"
assert entry.exists is True
assert entry.service == "db"
assert "mariadb" in entry.reason or "critical" in entry.reason
def test_named_volume_resolves_and_is_classified_critical(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
mountpoint = tmp_path / "docker-volumes" / "project_dbdata" / "_data"
mountpoint.mkdir(parents=True)
compose_file = write_compose(
tmp_path,
"""
services:
db:
image: mariadb:11
volumes:
- dbdata:/var/lib/mysql
volumes:
dbdata:
""",
)
monkeypatch.setattr("dockervault.classifier.docker_available", lambda: True)
monkeypatch.setattr(
"dockervault.classifier.run_docker_volume_inspect",
lambda volume_name: {"Mountpoint": str(mountpoint)} if volume_name == "dbdata" else None,
)
entries = classify_compose(compose_file)
assert len(entries) == 1
entry = find_entry(entries, "db", "/var/lib/mysql")
assert entry.source == mountpoint
assert entry.classification == "critical"
assert entry.exists is True
assert "named volume 'dbdata'" in entry.reason
def test_named_volume_unresolved_falls_back_to_review(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
compose_file = write_compose(
tmp_path,
"""
services:
app:
image: redis:7
volumes:
- cachedata:/data
volumes:
cachedata:
""",
)
monkeypatch.setattr("dockervault.classifier.docker_available", lambda: True)
monkeypatch.setattr("dockervault.classifier.run_docker_volume_inspect", lambda volume_name: None)
entries = classify_compose(compose_file)
assert len(entries) == 1
entry = find_entry(entries, "app", "/data")
assert entry.classification == "review"
assert entry.exists is False
assert "__named_volume_unresolved__" in str(entry.source)
assert "could not be resolved" in entry.reason
def test_named_volume_review_when_docker_not_available(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
compose_file = write_compose(
tmp_path,
"""
services:
app:
image: redis:7
volumes:
- cachedata:/data
volumes:
cachedata:
""",
)
monkeypatch.setattr("dockervault.classifier.docker_available", lambda: False)
entries = classify_compose(compose_file)
assert len(entries) == 1
entry = find_entry(entries, "app", "/data")
assert entry.classification == "review"
assert entry.exists is False
assert "docker CLI not available" in entry.reason
def test_image_rule_overrides_generic_logic_for_nginx_logs(tmp_path: Path):
logs_dir = tmp_path / "logs"
logs_dir.mkdir()
compose_file = write_compose(
tmp_path,
"""
services:
nginx:
image: nginx:latest
volumes:
- ./logs:/var/log/nginx
""",
)
entries = classify_compose(compose_file)
assert len(entries) == 1
entry = find_entry(entries, "nginx", "/var/log/nginx")
assert entry.source == logs_dir.resolve()
assert entry.classification == "optional"
assert entry.exists is True
def test_dedupe_prefers_stronger_classification_for_same_source(tmp_path: Path):
shared_dir = tmp_path / "shared"
shared_dir.mkdir()
compose_file = write_compose(
tmp_path,
f"""
services:
db:
image: mariadb:11
volumes:
- {shared_dir}:/var/lib/mysql
backup:
image: busybox
volumes:
- {shared_dir}:/backup
""",
)
entries = classify_compose(compose_file)
assert len(entries) == 1
entry = entries[0]
assert entry.source == shared_dir.resolve()
assert entry.classification == "critical"
assert entry.exists is True
assert "mariadb" in entry.reason or "/var/lib/mysql" in entry.reason
def test_top_level_volume_name_override_is_used(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
mountpoint = tmp_path / "docker-volumes" / "real-db-volume" / "_data"
mountpoint.mkdir(parents=True)
compose_file = write_compose(
tmp_path,
"""
services:
db:
image: postgres:16
volumes:
- dbdata:/var/lib/postgresql/data
volumes:
dbdata:
name: real-db-volume
""",
)
seen = []
def fake_inspect(volume_name: str):
seen.append(volume_name)
if volume_name == "real-db-volume":
return {"Mountpoint": str(mountpoint)}
return None
monkeypatch.setattr("dockervault.classifier.docker_available", lambda: True)
monkeypatch.setattr("dockervault.classifier.run_docker_volume_inspect", fake_inspect)
entries = classify_compose(compose_file)
entry = find_entry(entries, "db", "/var/lib/postgresql/data")
assert entry.source == mountpoint
assert entry.classification == "critical"
assert "real-db-volume" in entry.reason
assert seen[0] == "real-db-volume"
def test_external_volume_tries_raw_name(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
mountpoint = tmp_path / "docker-volumes" / "shared-prod-data" / "_data"
mountpoint.mkdir(parents=True)
compose_file = write_compose(
tmp_path,
"""
services:
app:
image: redis:7
volumes:
- shareddata:/data
volumes:
shareddata:
external: true
""",
)
tried = []
def fake_inspect(volume_name: str):
tried.append(volume_name)
if volume_name == "shareddata":
return {"Mountpoint": str(mountpoint)}
return None
monkeypatch.setattr("dockervault.classifier.docker_available", lambda: True)
monkeypatch.setattr("dockervault.classifier.run_docker_volume_inspect", fake_inspect)
entries = classify_compose(compose_file)
entry = find_entry(entries, "app", "/data")
assert entry.source == mountpoint
assert entry.classification == "critical"
assert "shareddata" in tried
def test_anonymous_volume_becomes_review(tmp_path: Path):
compose_file = write_compose(
tmp_path,
"""
services:
app:
image: busybox
volumes:
- /data
""",
)
entries = classify_compose(compose_file)
assert len(entries) == 1
entry = find_entry(entries, "app", "/data")
assert entry.classification == "review"
assert entry.exists is False
assert "__anonymous_volume__" in str(entry.source) or "__anonymous__" in str(entry.source)