Source code for root_mcp.core.io.validators

"""Path validation and security checks."""

from __future__ import annotations

import re
from pathlib import Path
from typing import TYPE_CHECKING
from urllib.parse import urlparse

if TYPE_CHECKING:
    from root_mcp.config import Config, ResourceConfig


[docs] class SecurityError(Exception): """Raised when a security constraint is violated.""" pass
[docs] class PathValidator: """Validates file paths against security constraints."""
[docs] def __init__(self, config: Config): """ Initialize validator with configuration. Args: config: Server configuration """ self.config = config self.allowed_roots = [Path(root).resolve() for root in config.security.allowed_roots] # Protocols explicitly listed in config (the baseline). self._base_protocols = set(config.security.allowed_protocols) # Protocols auto-elevated from declared resource URI schemes. self._auto_protocols = ( set(config.security.effective_protocols(config.resources)) - self._base_protocols ) # Effective set used for the protocol allow-list check. self.allowed_protocols = self._base_protocols | self._auto_protocols self.max_depth = config.security.max_path_depth
[docs] def validate_path(self, path: str, resource: ResourceConfig | None = None) -> Path: """ Validate a file path against security constraints. Args: path: File path or URI to validate resource: Optional resource context Returns: Resolved, validated Path object Raises: SecurityError: If path violates security constraints """ # Parse URI if it looks like one if "://" in path: return self._validate_uri(path, resource) # Handle resource-relative paths (e.g., "local_data://file.root") if path.startswith("@"): return self._validate_dataset_alias(path) # Local file path return self._validate_local_path(path)
def _validate_local_path(self, path: str) -> Path: """Validate a local file path.""" # Resolve to absolute path try: resolved = Path(path).resolve(strict=False) except (OSError, RuntimeError) as e: raise SecurityError(f"Invalid path: {path}") from e # Check if path is absolute if not resolved.is_absolute(): raise SecurityError(f"Path must be absolute: {path}") # Check path depth (prevent excessively deep paths) depth = len(resolved.parts) if depth > self.max_depth: raise SecurityError(f"Path depth {depth} exceeds maximum {self.max_depth}: {path}") # Check if path is under any allowed root. # Empty allowed_roots = zero-config permissive mode: allow any OS-readable path. if self.allowed_roots: for allowed_root in self.allowed_roots: try: resolved.relative_to(allowed_root) return resolved # Path is valid except ValueError: continue # Try next root raise SecurityError( f"Path '{path}' is not under any allowed root. " f"Allowed roots: {[str(r) for r in self.allowed_roots]}" ) return resolved def _validate_uri(self, uri: str, resource: ResourceConfig | None) -> Path: """Validate a URI (file://, root://, http://, etc.).""" parsed = urlparse(uri) protocol = parsed.scheme.lower() # Check if protocol is allowed if protocol not in self.allowed_protocols: raise SecurityError( f"Protocol '{protocol}' not allowed. " f"Allowed protocols: {list(self.allowed_protocols)}" ) # For file:// URIs, validate as local path if protocol == "file": local_path = parsed.path return self._validate_local_path(local_path) # For remote protocols (root://, http://, etc.) # Skip the allow_remote gate when the protocol was auto-elevated from a # resource URI declaration — the resource entry is already the opt-in. if not self.config.security.allow_remote and protocol not in self._auto_protocols: raise SecurityError( f"Remote file access is disabled for protocol '{protocol}'. " "Set 'security.allow_remote: true' in config, or declare a resource " "whose URI uses this protocol to enable it automatically." ) # If resource is provided, check if URI matches resource pattern if resource: if not uri.startswith(resource.uri): raise SecurityError(f"URI '{uri}' does not match resource URI '{resource.uri}'") # Return a Path-like object (will be handled specially by file manager) # For remote URIs, we don't return a local Path return Path(uri) # This is a placeholder; actual handling is in FileManager def _validate_dataset_alias(self, alias: str) -> Path: """ Validate and resolve a dataset alias. Dataset aliases are configured shortcuts like "@atlas_2024/signal" that map to actual file paths. Args: alias: Dataset alias starting with @ Returns: Resolved path Raises: SecurityError: If alias is not configured """ # Extract alias name # Format: @resource_name/file_path or @alias parts = alias[1:].split("/", 1) resource_name = parts[0] # Find resource resource = self.config.get_resource(resource_name) if not resource: available = [r.name for r in self.config.resources] raise SecurityError(f"Unknown resource '{resource_name}'. Available: {available}") # Get file path from alias if len(parts) > 1: file_path = parts[1] else: raise SecurityError(f"Invalid alias format: {alias}. Use @resource/file") # Combine resource URI with file path full_path = f"{resource.uri}/{file_path}" return self.validate_path(full_path, resource)
[docs] def check_file_pattern(self, path: Path, resource: ResourceConfig) -> bool: """ Check if a file matches resource patterns. Args: path: File path resource: Resource configuration Returns: True if file matches allowed patterns and not excluded """ filename = path.name # Check excluded patterns first for pattern in resource.excluded_patterns: if self._matches_pattern(filename, pattern): return False # Check allowed patterns for pattern in resource.allowed_patterns: if self._matches_pattern(filename, pattern): return True return False
@staticmethod def _matches_pattern(filename: str, pattern: str) -> bool: """Check if filename matches a glob pattern.""" # Convert glob pattern to regex regex = re.escape(pattern).replace(r"\*", ".*").replace(r"\?", ".") return re.fullmatch(regex, filename) is not None
[docs] def validate_output_path(self, path: str) -> Path: """ Validate an output path for exports. Args: path: Destination path for export Returns: Validated path Raises: SecurityError: If path is not allowed for output """ resolved = Path(path).resolve() # Check if under export base path export_base = Path(self.config.output.export_base_path).resolve() try: resolved.relative_to(export_base) except ValueError as e: raise SecurityError(f"Output path must be under {export_base}: {path}") from e return resolved
[docs] def validate_write_operation( self, input_path: str, output_path: str, allow_overwrite: bool = False, ) -> tuple[Path, Path]: """ Validate a write operation to ensure security constraints. This method enforces critical security rules: 1. Input and output paths must be different 2. Output path must be in allowed output directory 3. Cannot overwrite existing files unless explicitly allowed Args: input_path: Source file path output_path: Destination file path allow_overwrite: Whether to allow overwriting existing files Returns: Tuple of (validated_input_path, validated_output_path) Raises: SecurityError: If security constraints are violated """ # Validate input path validated_input = self.validate_path(input_path) # Validate output path validated_output = self.validate_output_path(output_path) # CRITICAL: Ensure input and output are different if validated_input.resolve() == validated_output.resolve(): raise SecurityError( "Cannot write to input file. Input and output paths must be different. " f"Input: {validated_input}, Output: {validated_output}" ) # Check if output exists if validated_output.exists() and not allow_overwrite: raise SecurityError( f"Output file already exists: {validated_output}. " "Use allow_overwrite=True to overwrite." ) # Log the operation for audit trail import logging logger = logging.getLogger(__name__) logger.info( f"Write operation validated: {validated_input} -> {validated_output} " f"(overwrite={allow_overwrite})" ) return validated_input, validated_output