feat(scan): exclude common non-project directories during discovery

This commit is contained in:
Eddie Nielsen 2026-03-23 15:39:23 +00:00
parent 4d36198bdd
commit 02ff096c6b
2 changed files with 73 additions and 163 deletions

View file

@ -1,28 +1,56 @@
from __future__ import annotations
import os
from pathlib import Path
from typing import Iterable
COMPOSE_FILENAMES = (
DEFAULT_SCAN_EXCLUDES = {
".git",
".hg",
".svn",
".venv",
"venv",
"env",
"node_modules",
"__pycache__",
".pytest_cache",
".mypy_cache",
".tox",
".cache",
".idea",
".vscode",
}
COMPOSE_FILENAMES = {
"docker-compose.yml",
"docker-compose.yaml",
"compose.yml",
"compose.yaml",
)
}
def find_compose_files(root: str | Path) -> list[Path]:
def find_compose_files(
root: Path | str,
excludes: Iterable[str] | None = None,
) -> list[Path]:
root_path = Path(root).resolve()
if not root_path.exists():
raise FileNotFoundError(f"Scan root not found: {root_path}")
exclude_set = set(DEFAULT_SCAN_EXCLUDES)
if excludes:
exclude_set.update(x.strip() for x in excludes if x and x.strip())
if not root_path.is_dir():
raise NotADirectoryError(f"Scan root is not a directory: {root_path}")
found: set[Path] = set()
found: list[Path] = []
for current_root, dirnames, filenames in os.walk(root_path, topdown=True):
dirnames[:] = sorted(
d for d in dirnames
if d not in exclude_set
)
for path in root_path.rglob("*"):
if path.is_file() and path.name in COMPOSE_FILENAMES:
found.append(path)
current_path = Path(current_root)
for filename in filenames:
if filename in COMPOSE_FILENAMES:
found.add((current_path / filename).resolve())
return sorted(found)

View file

@ -1,165 +1,47 @@
from __future__ import annotations
import os
from pathlib import Path
from typing import Any, Dict, List
from typing import Iterable
import yaml
DEFAULT_SCAN_EXCLUDES = {
".git",
".venv",
"venv",
"node_modules",
"__pycache__",
".pytest_cache",
}
from dockervault.classification.models import MountCandidate
COMPOSE_FILENAMES = (
"docker-compose.yml",
"docker-compose.yaml",
"compose.yml",
"compose.yaml",
)
class DockerComposeScanner:
def __init__(self, compose_file: str | Path):
self.compose_file = Path(compose_file)
self.base_dir = self.compose_file.parent
def discover_compose_files(
root: Path | str,
excludes: Iterable[str] | None = None,
) -> list[Path]:
def load_compose(self) -> Dict[str, Any]:
with self.compose_file.open("r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
root = Path(root).resolve()
def scan(self) -> List[MountCandidate]:
compose = self.load_compose()
services = compose.get("services", {})
project_name = compose.get("name") or self.base_dir.name
exclude_set = set(DEFAULT_SCAN_EXCLUDES)
if excludes:
exclude_set.update(x.strip() for x in excludes if x)
candidates: List[MountCandidate] = []
found = set()
for service_name, service_def in services.items():
image = service_def.get("image", "")
env = self._normalize_environment(service_def.get("environment", {}))
volumes = service_def.get("volumes", [])
for volume in volumes:
candidate = self._parse_volume(
service_name=service_name,
image=image,
volume=volume,
env=env,
compose_project=project_name,
)
if candidate:
candidates.append(candidate)
return candidates
def _normalize_environment(self, env: Any) -> Dict[str, str]:
if isinstance(env, dict):
return {str(k): str(v) for k, v in env.items()}
if isinstance(env, list):
parsed: Dict[str, str] = {}
for item in env:
if isinstance(item, str) and "=" in item:
key, value = item.split("=", 1)
parsed[key] = value
return parsed
return {}
def _parse_volume(
self,
service_name: str,
image: str,
volume: Any,
env: Dict[str, str],
compose_project: str,
) -> MountCandidate | None:
if isinstance(volume, str):
return self._parse_short_syntax(
service_name=service_name,
image=image,
volume=volume,
env=env,
compose_project=compose_project,
)
if isinstance(volume, dict):
return self._parse_long_syntax(
service_name=service_name,
image=image,
volume=volume,
env=env,
compose_project=compose_project,
)
return None
def _parse_short_syntax(
self,
service_name: str,
image: str,
volume: str,
env: Dict[str, str],
compose_project: str,
) -> MountCandidate | None:
parts = volume.split(":")
if len(parts) == 1:
# Anonymous volume style: "/data"
return MountCandidate(
service_name=service_name,
image=image,
source="",
target=parts[0],
mount_type="volume",
read_only=False,
env=env,
compose_project=compose_project,
)
if len(parts) >= 2:
source = parts[0]
target = parts[1]
options = parts[2:] if len(parts) > 2 else []
read_only = "ro" in options
mount_type = self._guess_mount_type(source)
return MountCandidate(
service_name=service_name,
image=image,
source=source,
target=target,
mount_type=mount_type,
read_only=read_only,
env=env,
compose_project=compose_project,
)
return None
def _parse_long_syntax(
self,
service_name: str,
image: str,
volume: Dict[str, Any],
env: Dict[str, str],
compose_project: str,
) -> MountCandidate | None:
source = volume.get("source", "") or volume.get("src", "")
target = volume.get("target", "") or volume.get("dst", "") or volume.get("destination", "")
mount_type = volume.get("type", self._guess_mount_type(str(source)))
read_only = bool(volume.get("read_only", False))
if not target:
return None
return MountCandidate(
service_name=service_name,
image=image,
source=str(source),
target=str(target),
mount_type=str(mount_type),
read_only=read_only,
env=env,
compose_project=compose_project,
for current_root, dirnames, filenames in os.walk(root, topdown=True):
# 🚫 skip unwanted dirs
dirnames[:] = sorted(
d for d in dirnames if d not in exclude_set
)
def _guess_mount_type(self, source: str) -> str:
if not source:
return "volume"
current_path = Path(current_root)
if source.startswith("/") or source.startswith("./") or source.startswith("../"):
return "bind"
for filename in filenames:
if filename in COMPOSE_FILENAMES:
found.add((current_path / filename).resolve())
return "volume"
return sorted(found)