diff --git a/dockervault/discovery.py b/dockervault/discovery.py index 65893c8..78f9db8 100644 --- a/dockervault/discovery.py +++ b/dockervault/discovery.py @@ -1,28 +1,56 @@ from __future__ import annotations +import os from pathlib import Path +from typing import Iterable -COMPOSE_FILENAMES = ( +DEFAULT_SCAN_EXCLUDES = { + ".git", + ".hg", + ".svn", + ".venv", + "venv", + "env", + "node_modules", + "__pycache__", + ".pytest_cache", + ".mypy_cache", + ".tox", + ".cache", + ".idea", + ".vscode", +} + +COMPOSE_FILENAMES = { "docker-compose.yml", "docker-compose.yaml", "compose.yml", "compose.yaml", -) +} -def find_compose_files(root: str | Path) -> list[Path]: +def find_compose_files( + root: Path | str, + excludes: Iterable[str] | None = None, +) -> list[Path]: root_path = Path(root).resolve() - if not root_path.exists(): - raise FileNotFoundError(f"Scan root not found: {root_path}") + exclude_set = set(DEFAULT_SCAN_EXCLUDES) + if excludes: + exclude_set.update(x.strip() for x in excludes if x and x.strip()) - if not root_path.is_dir(): - raise NotADirectoryError(f"Scan root is not a directory: {root_path}") + found: set[Path] = set() - found: list[Path] = [] + for current_root, dirnames, filenames in os.walk(root_path, topdown=True): + dirnames[:] = sorted( + d for d in dirnames + if d not in exclude_set + ) - for path in root_path.rglob("*"): - if path.is_file() and path.name in COMPOSE_FILENAMES: - found.append(path) + current_path = Path(current_root) + + for filename in filenames: + if filename in COMPOSE_FILENAMES: + found.add((current_path / filename).resolve()) return sorted(found) diff --git a/dockervault/scanner.py b/dockervault/scanner.py index b3b69ce..909dc1a 100644 --- a/dockervault/scanner.py +++ b/dockervault/scanner.py @@ -1,165 +1,47 @@ -from __future__ import annotations - +import os from pathlib import Path -from typing import Any, Dict, List +from typing import Iterable -import yaml +DEFAULT_SCAN_EXCLUDES = { + ".git", + ".venv", + "venv", + "node_modules", + "__pycache__", + ".pytest_cache", +} -from dockervault.classification.models import MountCandidate +COMPOSE_FILENAMES = ( + "docker-compose.yml", + "docker-compose.yaml", + "compose.yml", + "compose.yaml", +) -class DockerComposeScanner: - def __init__(self, compose_file: str | Path): - self.compose_file = Path(compose_file) - self.base_dir = self.compose_file.parent +def discover_compose_files( + root: Path | str, + excludes: Iterable[str] | None = None, +) -> list[Path]: - def load_compose(self) -> Dict[str, Any]: - with self.compose_file.open("r", encoding="utf-8") as f: - return yaml.safe_load(f) or {} + root = Path(root).resolve() - def scan(self) -> List[MountCandidate]: - compose = self.load_compose() - services = compose.get("services", {}) - project_name = compose.get("name") or self.base_dir.name + exclude_set = set(DEFAULT_SCAN_EXCLUDES) + if excludes: + exclude_set.update(x.strip() for x in excludes if x) - candidates: List[MountCandidate] = [] + found = set() - for service_name, service_def in services.items(): - image = service_def.get("image", "") - env = self._normalize_environment(service_def.get("environment", {})) - volumes = service_def.get("volumes", []) - - for volume in volumes: - candidate = self._parse_volume( - service_name=service_name, - image=image, - volume=volume, - env=env, - compose_project=project_name, - ) - if candidate: - candidates.append(candidate) - - return candidates - - def _normalize_environment(self, env: Any) -> Dict[str, str]: - if isinstance(env, dict): - return {str(k): str(v) for k, v in env.items()} - - if isinstance(env, list): - parsed: Dict[str, str] = {} - for item in env: - if isinstance(item, str) and "=" in item: - key, value = item.split("=", 1) - parsed[key] = value - return parsed - - return {} - - def _parse_volume( - self, - service_name: str, - image: str, - volume: Any, - env: Dict[str, str], - compose_project: str, - ) -> MountCandidate | None: - if isinstance(volume, str): - return self._parse_short_syntax( - service_name=service_name, - image=image, - volume=volume, - env=env, - compose_project=compose_project, - ) - - if isinstance(volume, dict): - return self._parse_long_syntax( - service_name=service_name, - image=image, - volume=volume, - env=env, - compose_project=compose_project, - ) - - return None - - def _parse_short_syntax( - self, - service_name: str, - image: str, - volume: str, - env: Dict[str, str], - compose_project: str, - ) -> MountCandidate | None: - parts = volume.split(":") - - if len(parts) == 1: - # Anonymous volume style: "/data" - return MountCandidate( - service_name=service_name, - image=image, - source="", - target=parts[0], - mount_type="volume", - read_only=False, - env=env, - compose_project=compose_project, - ) - - if len(parts) >= 2: - source = parts[0] - target = parts[1] - options = parts[2:] if len(parts) > 2 else [] - read_only = "ro" in options - - mount_type = self._guess_mount_type(source) - - return MountCandidate( - service_name=service_name, - image=image, - source=source, - target=target, - mount_type=mount_type, - read_only=read_only, - env=env, - compose_project=compose_project, - ) - - return None - - def _parse_long_syntax( - self, - service_name: str, - image: str, - volume: Dict[str, Any], - env: Dict[str, str], - compose_project: str, - ) -> MountCandidate | None: - source = volume.get("source", "") or volume.get("src", "") - target = volume.get("target", "") or volume.get("dst", "") or volume.get("destination", "") - mount_type = volume.get("type", self._guess_mount_type(str(source))) - read_only = bool(volume.get("read_only", False)) - - if not target: - return None - - return MountCandidate( - service_name=service_name, - image=image, - source=str(source), - target=str(target), - mount_type=str(mount_type), - read_only=read_only, - env=env, - compose_project=compose_project, + for current_root, dirnames, filenames in os.walk(root, topdown=True): + # 🚫 skip unwanted dirs + dirnames[:] = sorted( + d for d in dirnames if d not in exclude_set ) - def _guess_mount_type(self, source: str) -> str: - if not source: - return "volume" + current_path = Path(current_root) - if source.startswith("/") or source.startswith("./") or source.startswith("../"): - return "bind" + for filename in filenames: + if filename in COMPOSE_FILENAMES: + found.add((current_path / filename).resolve()) - return "volume" + return sorted(found)