Source code for root_mcp.config

"""Configuration management for ROOT-MCP server."""

from __future__ import annotations

import argparse
from importlib.metadata import PackageNotFoundError, version as _dist_version
import logging
import os
from pathlib import Path
import re

import yaml
from pydantic import BaseModel, Field, field_validator


def _package_version() -> str:
    try:
        return _dist_version("root-mcp")
    except PackageNotFoundError:
        return "0.0.0"


[docs] class ServerConfig(BaseModel): """Server-level settings.""" name: str = "root-mcp" version: str = Field(default_factory=_package_version) mode: str = Field("extended", pattern="^(core|extended)$")
[docs] class LimitsConfig(BaseModel): """Resource limits for safety.""" max_rows_per_call: int = Field(1_000_000, gt=0) max_export_rows: int = Field(10_000_000, gt=0)
[docs] class CacheConfig(BaseModel): """File-handle cache settings.""" enabled: bool = True file_cache_size: int = Field(50, gt=0)
[docs] class ResourceConfig(BaseModel): """Configuration for a data resource (MCP root).""" name: str uri: str description: str = "" allowed_patterns: list[str] = Field(default_factory=lambda: ["*.root"]) excluded_patterns: list[str] = Field(default_factory=list) @field_validator("name") @classmethod def validate_name(cls, v: str) -> str: """Ensure resource name is valid.""" if not v.replace("_", "").replace("-", "").isalnum(): raise ValueError("Resource name must be alphanumeric (with _ or -)") return v
[docs] class SecurityConfig(BaseModel): """Security and access control settings.""" allowed_roots: list[str] = Field(default_factory=list) allow_remote: bool = False allowed_protocols: list[str] = Field(default_factory=lambda: ["file"]) max_path_depth: int = Field(10, gt=0)
[docs] def effective_protocols(self, resources: list) -> list[str]: """Return the union of explicitly allowed protocols and protocols inferred from the URI scheme of declared resources. This means a resource with ``uri: "root://…"`` automatically permits the ``root`` protocol without requiring ``allow_remote: true`` or manually adding ``root`` to ``allowed_protocols``. Args: resources: List of :class:`ResourceConfig` objects (pass ``config.resources``). Returns: Deduplicated list of permitted protocol strings. """ from urllib.parse import urlparse auto = {urlparse(r.uri).scheme.lower() for r in resources if r.uri} return list(set(self.allowed_protocols) | auto)
@field_validator("allowed_roots") @classmethod def validate_roots(cls, v: list[str]) -> list[str]: """Ensure allowed roots are absolute paths. An empty list is valid and means permissive / zero-config mode: the server will allow access to any path the OS permits. """ if not v: return v # Empty = permissive mode; nothing to validate validated = [] for root in v: path = Path(root).resolve() if not path.is_absolute(): raise ValueError(f"Allowed root must be absolute: {root}") validated.append(str(path)) return validated
[docs] class OutputConfig(BaseModel): """Output and export settings.""" export_base_path: str = "/tmp/root_mcp_output" allowed_formats: list[str] = Field(default_factory=lambda: ["json", "csv", "parquet"]) @field_validator("export_base_path") @classmethod def validate_export_path(cls, v: str) -> str: """Ensure export path is absolute.""" path = Path(v).resolve() return str(path)
[docs] class HistogramConfig(BaseModel): """Histogram-specific settings.""" max_bins_1d: int = Field(10_000, gt=0) max_bins_2d: int = Field(1_000, gt=0)
[docs] class PlottingConfig(BaseModel): """Plotting and visualization settings.""" # Figure settings figure_width: float = Field(10.0, gt=0) figure_height: float = Field(6.0, gt=0) dpi: int = Field(100, gt=0) # Data point settings marker_size: float = Field(4.0, gt=0) marker_style: str = "o" error_bar_cap_size: float = Field(2.0, ge=0) # Line settings line_width: float = Field(2.0, gt=0) fit_line_color: str = "red" fit_line_style: str = "-" # Histogram settings hist_fill_alpha: float = Field(0.2, ge=0, le=1) hist_fill_color: str = "blue" data_color: str = "black" # Grid settings grid_alpha: float = Field(0.3, ge=0, le=1) grid_enabled: bool = True # Output format default_format: str = "png" allowed_formats: list[str] = Field(default_factory=lambda: ["png", "pdf", "svg"])
[docs] class CoreConfig(BaseModel): """Core mode configuration.""" cache: CacheConfig = Field(default_factory=CacheConfig) limits: LimitsConfig = Field(default_factory=LimitsConfig)
[docs] class ExtendedConfig(BaseModel): """Extended mode configuration.""" histogram: HistogramConfig = Field(default_factory=HistogramConfig) plotting: PlottingConfig = Field(default_factory=PlottingConfig) fitting_max_iterations: int = Field(10_000, gt=0)
[docs] class AnalysisConfig(BaseModel): """Analysis operation settings.""" default_chunk_size: int = Field(10_000, gt=0) default_read_limit: int = Field(1_000, gt=0) histogram: HistogramConfig = Field(default_factory=HistogramConfig) plotting: PlottingConfig = Field(default_factory=PlottingConfig)
[docs] class RootNativeConfig(BaseModel): """Configuration for native ROOT/PyROOT execution.""" execution_timeout: int = Field(60, gt=0) max_output_size: int = Field(10_000_000, gt=0) allowed_output_formats: list[str] = Field( default_factory=lambda: ["png", "pdf", "svg", "root", "json", "csv"] ) working_directory: str = "/tmp/root_mcp_native" max_code_length: int = Field(100_000, gt=0)
[docs] class FeatureFlags(BaseModel): """Feature toggles.""" enable_export: bool = True enable_root: bool = False
[docs] class Config(BaseModel): """Root configuration for ROOT-MCP server.""" server: ServerConfig = Field(default_factory=ServerConfig) core: CoreConfig = Field(default_factory=CoreConfig) extended: ExtendedConfig = Field(default_factory=ExtendedConfig) limits: LimitsConfig = Field(default_factory=LimitsConfig) cache: CacheConfig = Field(default_factory=CacheConfig) resources: list[ResourceConfig] = Field(default_factory=list) security: SecurityConfig = Field(default_factory=SecurityConfig) output: OutputConfig = Field(default_factory=OutputConfig) analysis: AnalysisConfig = Field(default_factory=AnalysisConfig) root_native: RootNativeConfig = Field(default_factory=RootNativeConfig) features: FeatureFlags = Field(default_factory=FeatureFlags)
[docs] def get_resource(self, name: str) -> ResourceConfig | None: """Get resource configuration by name.""" for resource in self.resources: if resource.name == name: return resource return None
[docs] def get_default_resource(self) -> ResourceConfig | None: """Get the first configured resource (default).""" return self.resources[0] if self.resources else None
[docs] def load_config(config_path: str | Path | None = None) -> Config: """ Load configuration from YAML file. After loading (or using built-in defaults when no file is found), the ``ROOT_MCP_DATA_PATH`` environment variable is checked. Any colon-separated directory paths it contains are merged into the config as resources via :func:`apply_data_paths`, exactly as if those directories had been passed via ``--data-path`` on the CLI. YAML-declared resources always take precedence over env-var paths; CLI ``--data-path`` flags (applied in ``main()``) take precedence over both. **Config merge priority** (ascending — later wins): 1. Built-in Pydantic defaults 2. ``ROOT_MCP_DATA_PATH`` environment variable 3. ``--data-path`` CLI flags (applied in ``main()``) 4. YAML config file values Args: config_path: Path to config file. If None, looks for: 1. ROOT_MCP_CONFIG env var 2. ./config.yaml 3. ~/.config/root-mcp/config.yaml Returns: Validated Config object """ if config_path is None: # Try environment variable if "ROOT_MCP_CONFIG" in os.environ: config_path = Path(os.environ["ROOT_MCP_CONFIG"]) # Try current directory elif Path("config.yaml").exists(): config_path = Path("config.yaml") # Try user config directory elif Path.home().joinpath(".config/root-mcp/config.yaml").exists(): config_path = Path.home() / ".config/root-mcp/config.yaml" else: # Use defaults config = Config() _apply_data_path_env(config) return config config_path = Path(config_path) if not config_path.exists(): raise FileNotFoundError(f"Config file not found: {config_path}") with open(config_path) as f: data = yaml.safe_load(f) config = Config(**data) _apply_data_path_env(config) return config
def _apply_data_path_env(config: Config) -> None: """Merge ``ROOT_MCP_DATA_PATH`` env var into *config* (in-place). The variable accepts colon-separated absolute directory paths, identical in semantics to passing each directory via ``--data-path``. Silently ignores empty segments and paths that are already declared as resources in the config (YAML values take precedence). """ raw = os.environ.get("ROOT_MCP_DATA_PATH", "").strip() if not raw: return paths = [p.strip() for p in raw.split(":") if p.strip()] if paths: apply_data_paths(config, paths) # Two-tier YAML template used by both create_default_config() and # `root-mcp init`. Placeholders: {uri}, {enable_root}. _CONFIG_TEMPLATE = """\ # ROOT-MCP configuration # ══════════════════════════════════════════════════════ # QUICK START — edit only this section to get started # ══════════════════════════════════════════════════════ server: mode: "extended" resources: - name: "local_data" uri: "{uri}" description: "Local ROOT files" # Leave allowed_roots empty to allow any directory (permissive / zero-config mode). # Add explicit absolute paths here to restrict access. security: allowed_roots: [] features: enable_root: {enable_root} # set true if ROOT/PyROOT is installed # ══════════════════════════════════════════════════════ # ADVANCED — change only if you need fine-tuning # ══════════════════════════════════════════════════════ core: cache: enabled: true file_cache_size: 50 limits: max_rows_per_call: 1_000_000 max_export_rows: 10_000_000 extended: histogram: max_bins_1d: 10_000 max_bins_2d: 1_000 plotting: figure_width: 10.0 figure_height: 6.0 dpi: 100 default_format: "png" allowed_formats: ["png", "pdf", "svg"] fitting_max_iterations: 10_000 output: export_base_path: "/tmp/root_mcp_output" allowed_formats: ["json", "csv", "parquet"] root_native: execution_timeout: 60 working_directory: "/tmp/root_mcp_native" """
[docs] def create_default_config( output_path: str | Path, data_path: Path | None = None, permissive: bool = True, ) -> None: """Generate a minimal, human-readable ``config.yaml``. The output uses the same two-tier format as ``root-mcp init``: a short **QUICK START** section with the only field most users need to change, followed by an **ADVANCED** section with fine-tuning knobs. Args: output_path: Destination file path. data_path: Local directory to use as the resource URI. When ``None`` and ``permissive`` is ``True``, defaults to the current working directory. When ``None`` and ``permissive`` is ``False``, writes a placeholder that the user must edit before use. permissive: When ``True`` (default), ``security.allowed_roots`` is left empty (allow any OS-readable path). Ignored when ``data_path`` is ``None`` and ``permissive`` is ``False``. """ if data_path is not None: uri = f"file://{Path(data_path).resolve()}" elif permissive: uri = f"file://{Path.cwd()}" else: uri = "file:///REPLACE_WITH_YOUR_DATA_PATH" output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(_CONFIG_TEMPLATE.format(uri=uri, enable_root="false")) print(f"Created default config at: {output_path}")
#: Log levels accepted by :func:`apply_log_level`. _VALID_LOG_LEVELS: tuple[str, ...] = ("DEBUG", "INFO", "WARNING", "ERROR")
[docs] def apply_log_level(level_str: str) -> None: """Set the Python root logger's level. This is called in ``main()`` **before** :func:`load_config` so that log messages emitted during configuration loading and all subsequent startup already respect the requested verbosity. Args: level_str: Case-insensitive level name. Accepted values: ``DEBUG``, ``INFO``, ``WARNING``, ``ERROR``. Raises: ValueError: When *level_str* is not one of the accepted names. """ normalised = level_str.strip().upper() if normalised not in _VALID_LOG_LEVELS: raise ValueError(f"Log level must be one of {_VALID_LOG_LEVELS}, got: {level_str!r}") logging.getLogger().setLevel(normalised)
def _parse_resource_spec(spec: str) -> ResourceConfig: """Parse a resource specification string into a :class:`ResourceConfig`. The specification format is ``NAME=URI`` or ``NAME=URI|DESCRIPTION``. The ``|`` separator is used (rather than ``:``) because URIs routinely contain colons (``root://``, ``file:///``, ``https://``). Args: spec: The raw resource spec string, e.g. ``cms=root://host//path`` or ``cms=root://host//path|CMS open data``. Returns: A new :class:`ResourceConfig` instance. Raises: ValueError: When the spec is malformed or the resource name is invalid. """ if "=" not in spec: raise ValueError(f"Resource spec must be NAME=URI or NAME=URI|DESCRIPTION, got: {spec!r}") name, rest = spec.split("=", 1) name = name.strip() if not name: raise ValueError(f"Resource name is empty in spec: {spec!r}") if "|" in rest: uri, description = rest.split("|", 1) uri = uri.strip() description = description.strip() else: uri = rest.strip() description = "" if not uri: raise ValueError(f"Resource URI is empty in spec: {spec!r}") # ResourceConfig.validate_name will raise if the name contains bad chars. return ResourceConfig(name=name, uri=uri, description=description)
[docs] def apply_env_overrides(config: Config) -> Config: """Read ``ROOT_MCP_*`` environment variables and merge into *config* in-place. Env vars sit between the YAML file (priority 2) and CLI flags (priority 4) in the merge chain — i.e. an env var overrides the YAML but is itself overridden by an explicit ``--flag`` on the command line. Only non-empty env vars are applied; missing or empty vars leave the corresponding field untouched. ** Server & Mode**: * ``ROOT_MCP_MODE`` → :attr:`Config.server.mode` (``core`` or ``extended``) * ``ROOT_MCP_SERVER_NAME`` → :attr:`Config.server.name` ** Security**: * ``ROOT_MCP_ALLOWED_ROOTS`` → :attr:`Config.security.allowed_roots` (colon-separated paths; replaces the YAML list) * ``ROOT_MCP_ALLOW_REMOTE`` → :attr:`Config.security.allow_remote` (``1``/``true``/``yes`` → ``True``) * ``ROOT_MCP_ALLOWED_PROTOCOLS`` → :attr:`Config.security.allowed_protocols` (comma-separated; replaces the YAML list) * ``ROOT_MCP_MAX_PATH_DEPTH`` → :attr:`Config.security.max_path_depth` (positive integer) **Output / Export**: * ``ROOT_MCP_EXPORT_PATH`` → :attr:`Config.output.export_base_path` (directory string) * ``ROOT_MCP_EXPORT_FORMATS`` → :attr:`Config.output.allowed_formats` (comma-separated; replaces the YAML list) * ``ROOT_MCP_ENABLE_EXPORT`` → :attr:`Config.features.enable_export` (``1``/``true``/``yes`` → ``True``, anything else → ``False``) ** Core Limits & Cache**: * ``ROOT_MCP_MAX_ROWS`` → :attr:`Config.core.limits.max_rows_per_call` (positive int) * ``ROOT_MCP_MAX_EXPORT_ROWS`` → :attr:`Config.core.limits.max_export_rows` (positive int) * ``ROOT_MCP_CACHE`` → :attr:`Config.core.cache.enabled` (``1``/``true``/``yes`` → ``True``, anything else → ``False``) * ``ROOT_MCP_CACHE_SIZE`` → :attr:`Config.core.cache.file_cache_size` (positive int) ** Extended Analysis**: * ``ROOT_MCP_MAX_BINS_1D`` → :attr:`Config.extended.histogram.max_bins_1d` (positive int) * ``ROOT_MCP_MAX_BINS_2D`` → :attr:`Config.extended.histogram.max_bins_2d` (positive int) * ``ROOT_MCP_FITTING_ITERATIONS`` → :attr:`Config.extended.fitting_max_iterations` (positive int) * ``ROOT_MCP_PLOT_DPI`` → :attr:`Config.extended.plotting.dpi` (positive int) * ``ROOT_MCP_PLOT_FORMAT`` → :attr:`Config.extended.plotting.default_format` (``png``, ``pdf``, or ``svg``) * ``ROOT_MCP_PLOT_WIDTH`` → :attr:`Config.extended.plotting.figure_width` (positive float) * ``ROOT_MCP_PLOT_HEIGHT`` → :attr:`Config.extended.plotting.figure_height` (positive float) ** Native ROOT Execution**: * ``ROOT_MCP_ROOT_TIMEOUT`` → :attr:`Config.root_native.execution_timeout` (positive int, seconds) * ``ROOT_MCP_ROOT_WORKDIR`` → :attr:`Config.root_native.working_directory` (path string) * ``ROOT_MCP_ROOT_MAX_OUTPUT`` → :attr:`Config.root_native.max_output_size` (positive int, bytes) * ``ROOT_MCP_ROOT_MAX_CODE`` → :attr:`Config.root_native.max_code_length` (positive int, chars) ** Remote Resources**: * ``ROOT_MCP_RESOURCES`` → :attr:`Config.resources` (semicolon-sep list of ``NAME=URI`` or ``NAME=URI|DESCRIPTION`` specs; YAML-declared URIs take precedence via deduplication) Args: config: The :class:`Config` to update in-place. Returns: The same *config* object (mutated) for convenience. Raises: ValueError: When an env var contains an invalid value (e.g. an unrecognised mode string). """ # --- : Server & Mode --- _env_mode = os.environ.get("ROOT_MCP_MODE", "").strip() if _env_mode: if _env_mode not in ("core", "extended"): raise ValueError(f"ROOT_MCP_MODE must be 'core' or 'extended', got: {_env_mode!r}") config.server.mode = _env_mode _env_name = os.environ.get("ROOT_MCP_SERVER_NAME", "").strip() if _env_name: config.server.name = _env_name # --- : Security --- _env_allowed_roots = os.environ.get("ROOT_MCP_ALLOWED_ROOTS", "").strip() if _env_allowed_roots: roots = [r.strip() for r in _env_allowed_roots.split(":") if r.strip()] config.security.allowed_roots = roots _env_allow_remote = os.environ.get("ROOT_MCP_ALLOW_REMOTE", "").strip().lower() if _env_allow_remote: config.security.allow_remote = _env_allow_remote in ("1", "true", "yes") _env_allowed_protocols = os.environ.get("ROOT_MCP_ALLOWED_PROTOCOLS", "").strip() if _env_allowed_protocols: protocols = [p.strip().lower() for p in _env_allowed_protocols.split(",") if p.strip()] config.security.allowed_protocols = protocols _env_max_depth = os.environ.get("ROOT_MCP_MAX_PATH_DEPTH", "").strip() if _env_max_depth: try: depth = int(_env_max_depth) except ValueError: raise ValueError(f"ROOT_MCP_MAX_PATH_DEPTH must be an integer, got: {_env_max_depth!r}") if depth <= 0: raise ValueError(f"ROOT_MCP_MAX_PATH_DEPTH must be > 0, got: {depth}") config.security.max_path_depth = depth # --- : Output / Export --- _env_export_path = os.environ.get("ROOT_MCP_EXPORT_PATH", "").strip() if _env_export_path: config.output.export_base_path = str(Path(_env_export_path).resolve()) _env_export_formats = os.environ.get("ROOT_MCP_EXPORT_FORMATS", "").strip() if _env_export_formats: formats = [f.strip().lower() for f in _env_export_formats.split(",") if f.strip()] config.output.allowed_formats = formats _env_enable_export = os.environ.get("ROOT_MCP_ENABLE_EXPORT", "").strip().lower() if _env_enable_export: config.features.enable_export = _env_enable_export in ("1", "true", "yes") # --- : Core Limits & Cache --- def _parse_positive_int(val: str, var_name: str) -> int: try: n = int(val) except ValueError: raise ValueError(f"{var_name} must be an integer, got: {val!r}") if n <= 0: raise ValueError(f"{var_name} must be > 0, got: {n}") return n _env_max_rows = os.environ.get("ROOT_MCP_MAX_ROWS", "").strip() if _env_max_rows: config.core.limits.max_rows_per_call = _parse_positive_int( _env_max_rows, "ROOT_MCP_MAX_ROWS" ) _env_max_export_rows = os.environ.get("ROOT_MCP_MAX_EXPORT_ROWS", "").strip() if _env_max_export_rows: config.core.limits.max_export_rows = _parse_positive_int( _env_max_export_rows, "ROOT_MCP_MAX_EXPORT_ROWS" ) _env_cache = os.environ.get("ROOT_MCP_CACHE", "").strip().lower() if _env_cache: config.core.cache.enabled = _env_cache in ("1", "true", "yes") _env_cache_size = os.environ.get("ROOT_MCP_CACHE_SIZE", "").strip() if _env_cache_size: config.core.cache.file_cache_size = _parse_positive_int( _env_cache_size, "ROOT_MCP_CACHE_SIZE" ) # --- : Extended Analysis --- def _parse_positive_float(val: str, var_name: str) -> float: try: n = float(val) except ValueError: raise ValueError(f"{var_name} must be a number, got: {val!r}") if n <= 0: raise ValueError(f"{var_name} must be > 0, got: {n}") return n _env_max_bins_1d = os.environ.get("ROOT_MCP_MAX_BINS_1D", "").strip() if _env_max_bins_1d: config.extended.histogram.max_bins_1d = _parse_positive_int( _env_max_bins_1d, "ROOT_MCP_MAX_BINS_1D" ) _env_max_bins_2d = os.environ.get("ROOT_MCP_MAX_BINS_2D", "").strip() if _env_max_bins_2d: config.extended.histogram.max_bins_2d = _parse_positive_int( _env_max_bins_2d, "ROOT_MCP_MAX_BINS_2D" ) _env_fitting_iters = os.environ.get("ROOT_MCP_FITTING_ITERATIONS", "").strip() if _env_fitting_iters: config.extended.fitting_max_iterations = _parse_positive_int( _env_fitting_iters, "ROOT_MCP_FITTING_ITERATIONS" ) _env_plot_dpi = os.environ.get("ROOT_MCP_PLOT_DPI", "").strip() if _env_plot_dpi: config.extended.plotting.dpi = _parse_positive_int(_env_plot_dpi, "ROOT_MCP_PLOT_DPI") _env_plot_format = os.environ.get("ROOT_MCP_PLOT_FORMAT", "").strip().lower() if _env_plot_format: if _env_plot_format not in ("png", "pdf", "svg"): raise ValueError( f"ROOT_MCP_PLOT_FORMAT must be 'png', 'pdf', or 'svg', got: {_env_plot_format!r}" ) config.extended.plotting.default_format = _env_plot_format _env_plot_width = os.environ.get("ROOT_MCP_PLOT_WIDTH", "").strip() if _env_plot_width: config.extended.plotting.figure_width = _parse_positive_float( _env_plot_width, "ROOT_MCP_PLOT_WIDTH" ) _env_plot_height = os.environ.get("ROOT_MCP_PLOT_HEIGHT", "").strip() if _env_plot_height: config.extended.plotting.figure_height = _parse_positive_float( _env_plot_height, "ROOT_MCP_PLOT_HEIGHT" ) # --- : Native ROOT Execution --- _env_root_timeout = os.environ.get("ROOT_MCP_ROOT_TIMEOUT", "").strip() if _env_root_timeout: config.root_native.execution_timeout = _parse_positive_int( _env_root_timeout, "ROOT_MCP_ROOT_TIMEOUT" ) _env_root_workdir = os.environ.get("ROOT_MCP_ROOT_WORKDIR", "").strip() if _env_root_workdir: config.root_native.working_directory = _env_root_workdir _env_root_max_output = os.environ.get("ROOT_MCP_ROOT_MAX_OUTPUT", "").strip() if _env_root_max_output: config.root_native.max_output_size = _parse_positive_int( _env_root_max_output, "ROOT_MCP_ROOT_MAX_OUTPUT" ) _env_root_max_code = os.environ.get("ROOT_MCP_ROOT_MAX_CODE", "").strip() if _env_root_max_code: config.root_native.max_code_length = _parse_positive_int( _env_root_max_code, "ROOT_MCP_ROOT_MAX_CODE" ) # --- : Remote Resources --- _env_resources = os.environ.get("ROOT_MCP_RESOURCES", "").strip() if _env_resources: _existing_uris = {r.uri for r in config.resources} _existing_names = {r.name for r in config.resources} for _spec in _env_resources.split(";"): _spec = _spec.strip() if not _spec: continue _res = _parse_resource_spec(_spec) if _res.uri in _existing_uris: continue # YAML-declared resource takes precedence _base = _res.name _ctr = 1 while _res.name in _existing_names: _res = ResourceConfig( name=f"{_base}_{_ctr}", uri=_res.uri, description=_res.description ) _ctr += 1 config.resources.append(_res) _existing_uris.add(_res.uri) _existing_names.add(_res.name) return config
[docs] def apply_cli_overrides(config: Config, args: "argparse.Namespace") -> Config: """Apply parsed CLI arguments onto *config* in-place. CLI flags are the highest-priority source — they override both YAML and env vars. This function only mutates a field when the corresponding ``args`` attribute is explicitly set (not ``None``). ** Server & Mode**: * ``args.mode`` → :attr:`Config.server.mode` * ``args.server_name`` → :attr:`Config.server.name` ** Security**: * ``args.allowed_root`` → :attr:`Config.security.allowed_roots` (list; replaces) * ``args.allow_remote`` → :attr:`Config.security.allow_remote` (True/False/None) * ``args.allowed_protocols`` → :attr:`Config.security.allowed_protocols` (comma-string, split on parse) * ``args.max_path_depth`` → :attr:`Config.security.max_path_depth` ** Output / Export**: * ``args.export_path`` → :attr:`Config.output.export_base_path` * ``args.export_formats`` → :attr:`Config.output.allowed_formats` (comma-string) * ``args.enable_export`` → :attr:`Config.features.enable_export` (False when ``--no-export`` is given, ``None`` when not specified) ** Core Limits & Cache**: * ``args.max_rows`` → :attr:`Config.core.limits.max_rows_per_call` * ``args.max_export_rows`` → :attr:`Config.core.limits.max_export_rows` * ``args.cache_enabled`` → :attr:`Config.core.cache.enabled` (False when ``--no-cache`` is given, ``None`` when not specified) * ``args.cache_size`` → :attr:`Config.core.cache.file_cache_size` ** Extended Analysis**: * ``args.max_bins_1d`` → :attr:`Config.extended.histogram.max_bins_1d` * ``args.max_bins_2d`` → :attr:`Config.extended.histogram.max_bins_2d` * ``args.fitting_iterations`` → :attr:`Config.extended.fitting_max_iterations` * ``args.plot_dpi`` → :attr:`Config.extended.plotting.dpi` * ``args.plot_format`` → :attr:`Config.extended.plotting.default_format` * ``args.plot_width`` → :attr:`Config.extended.plotting.figure_width` * ``args.plot_height`` → :attr:`Config.extended.plotting.figure_height` ** Native ROOT Execution**: * ``args.root_timeout`` → :attr:`Config.root_native.execution_timeout` * ``args.root_workdir`` → :attr:`Config.root_native.working_directory` * ``args.root_max_output`` → :attr:`Config.root_native.max_output_size` * ``args.root_max_code`` → :attr:`Config.root_native.max_code_length` ** Remote Resources**: * ``args.resource`` → :attr:`Config.resources` (repeated ``NAME=URI[|DESCRIPTION]`` specs; YAML-declared URIs take precedence via deduplication) Args: config: The :class:`Config` to update in-place. args: Parsed :class:`argparse.Namespace` from the server's argument parser. Returns: The same *config* object (mutated) for convenience. Raises: ValueError: When a CLI flag contains an invalid value. """ # --- Server & Mode --- _cli_mode = getattr(args, "mode", None) if _cli_mode is not None: if _cli_mode not in ("core", "extended"): raise ValueError(f"--mode must be 'core' or 'extended', got: {_cli_mode!r}") config.server.mode = _cli_mode _cli_name = getattr(args, "server_name", None) if _cli_name is not None: config.server.name = _cli_name # --- Security --- _cli_allowed_roots = getattr(args, "allowed_root", None) # list from action="append" if _cli_allowed_roots: config.security.allowed_roots = list(_cli_allowed_roots) _cli_allow_remote = getattr(args, "allow_remote", None) # True / False / None if _cli_allow_remote is not None: config.security.allow_remote = _cli_allow_remote _cli_protocols = getattr(args, "allowed_protocols", None) # comma-string if _cli_protocols is not None: protocols = [p.strip().lower() for p in _cli_protocols.split(",") if p.strip()] config.security.allowed_protocols = protocols _cli_depth = getattr(args, "max_path_depth", None) # int from type=int if _cli_depth is not None: if _cli_depth <= 0: raise ValueError(f"--max-path-depth must be > 0, got: {_cli_depth}") config.security.max_path_depth = _cli_depth # --- Output / Export --- _cli_export_path = getattr(args, "export_path", None) if _cli_export_path is not None: config.output.export_base_path = str(Path(_cli_export_path).resolve()) _cli_export_formats = getattr(args, "export_formats", None) # comma-string if _cli_export_formats is not None: formats = [f.strip().lower() for f in _cli_export_formats.split(",") if f.strip()] config.output.allowed_formats = formats _cli_enable_export = getattr(args, "enable_export", None) # False or None if _cli_enable_export is not None: config.features.enable_export = _cli_enable_export # --- Core Limits & Cache --- def _cli_positive_int(value: int, flag_name: str) -> int: if value <= 0: raise ValueError(f"{flag_name} must be > 0, got: {value}") return value _cli_max_rows = getattr(args, "max_rows", None) if _cli_max_rows is not None: config.core.limits.max_rows_per_call = _cli_positive_int(_cli_max_rows, "--max-rows") _cli_max_export_rows = getattr(args, "max_export_rows", None) if _cli_max_export_rows is not None: config.core.limits.max_export_rows = _cli_positive_int( _cli_max_export_rows, "--max-export-rows" ) _cli_cache_enabled = getattr(args, "cache_enabled", None) # False or None if _cli_cache_enabled is not None: config.core.cache.enabled = _cli_cache_enabled _cli_cache_size = getattr(args, "cache_size", None) if _cli_cache_size is not None: config.core.cache.file_cache_size = _cli_positive_int(_cli_cache_size, "--cache-size") # --- Extended Analysis --- def _cli_positive_float(value: float, flag_name: str) -> float: if value <= 0: raise ValueError(f"{flag_name} must be > 0, got: {value}") return value _cli_max_bins_1d = getattr(args, "max_bins_1d", None) if _cli_max_bins_1d is not None: config.extended.histogram.max_bins_1d = _cli_positive_int(_cli_max_bins_1d, "--max-bins-1d") _cli_max_bins_2d = getattr(args, "max_bins_2d", None) if _cli_max_bins_2d is not None: config.extended.histogram.max_bins_2d = _cli_positive_int(_cli_max_bins_2d, "--max-bins-2d") _cli_fitting_iters = getattr(args, "fitting_iterations", None) if _cli_fitting_iters is not None: config.extended.fitting_max_iterations = _cli_positive_int( _cli_fitting_iters, "--fitting-iterations" ) _cli_plot_dpi = getattr(args, "plot_dpi", None) if _cli_plot_dpi is not None: config.extended.plotting.dpi = _cli_positive_int(_cli_plot_dpi, "--plot-dpi") _cli_plot_format = getattr(args, "plot_format", None) if _cli_plot_format is not None: if _cli_plot_format not in ("png", "pdf", "svg"): raise ValueError( f"--plot-format must be 'png', 'pdf', or 'svg', got: {_cli_plot_format!r}" ) config.extended.plotting.default_format = _cli_plot_format _cli_plot_width = getattr(args, "plot_width", None) if _cli_plot_width is not None: config.extended.plotting.figure_width = _cli_positive_float(_cli_plot_width, "--plot-width") _cli_plot_height = getattr(args, "plot_height", None) if _cli_plot_height is not None: config.extended.plotting.figure_height = _cli_positive_float( _cli_plot_height, "--plot-height" ) # --- Native ROOT Execution --- _cli_root_timeout = getattr(args, "root_timeout", None) if _cli_root_timeout is not None: config.root_native.execution_timeout = _cli_positive_int( _cli_root_timeout, "--root-timeout" ) _cli_root_workdir = getattr(args, "root_workdir", None) if _cli_root_workdir is not None: config.root_native.working_directory = _cli_root_workdir _cli_root_max_output = getattr(args, "root_max_output", None) if _cli_root_max_output is not None: config.root_native.max_output_size = _cli_positive_int( _cli_root_max_output, "--root-max-output" ) _cli_root_max_code = getattr(args, "root_max_code", None) if _cli_root_max_code is not None: config.root_native.max_code_length = _cli_positive_int( _cli_root_max_code, "--root-max-code" ) # --- Remote Resources --- _cli_resource_specs = getattr(args, "resource", None) # list from action="append" if _cli_resource_specs: _existing_uris = {r.uri for r in config.resources} _existing_names = {r.name for r in config.resources} for _spec in _cli_resource_specs: _res = _parse_resource_spec(_spec) if _res.uri in _existing_uris: continue # YAML-declared or earlier env-var resource wins _base = _res.name _ctr = 1 while _res.name in _existing_names: _res = ResourceConfig( name=f"{_base}_{_ctr}", uri=_res.uri, description=_res.description ) _ctr += 1 config.resources.append(_res) _existing_uris.add(_res.uri) _existing_names.add(_res.name) return config
[docs] def apply_data_paths(config: Config, paths: list[str]) -> Config: """Merge a list of local directory paths into *config* as resources. Each path is added as a new :class:`ResourceConfig` only when no existing resource already points to the same URI (YAML-declared resources take precedence). When ``security.allowed_roots`` is non-empty (restrictive mode), the path is also appended there so the validator permits access. This is the shared building block used by both the ``--data-path`` CLI flag and the ``ROOT_MCP_DATA_PATH`` environment variable. Args: config: The :class:`Config` to update in-place. paths: List of local directory paths (resolved to absolute). Returns: The same *config* object (mutated) for convenience. """ existing_uris = {r.uri for r in config.resources} existing_names = {r.name for r in config.resources} for raw_path in paths: path = Path(raw_path).resolve() uri = f"file://{path}" if uri in existing_uris: # Already declared in YAML — don't duplicate; YAML wins. continue # Derive a unique, valid resource name from the directory basename. base = re.sub(r"[^a-zA-Z0-9_-]", "_", path.name).strip("_") or "data" if base[0].isdigit(): base = f"data_{base}" name = base counter = 1 while name in existing_names: name = f"{base}_{counter}" counter += 1 resource = ResourceConfig( name=name, uri=uri, description=f"Data directory: {path}", ) config.resources.append(resource) existing_uris.add(uri) existing_names.add(name) # In restrictive mode (allowed_roots explicitly set), also whitelist # the path so the path validator allows access to it. if config.security.allowed_roots: str_path = str(path) if str_path not in config.security.allowed_roots: config.security.allowed_roots.append(str_path) return config