Source code for root_mcp.server

"""ROOT-MCP Server - Mode-aware implementation with lazy loading."""

from __future__ import annotations

import argparse
import asyncio
import logging
from pathlib import Path
import sys
from typing import Any, cast
from mcp.server import Server
from mcp.types import Resource, Tool, TextContent
from mcp.server.stdio import stdio_server

from root_mcp.config import Config, load_config, _CONFIG_TEMPLATE
from root_mcp.common.root_availability import is_root_available, get_root_version, get_root_features
from root_mcp.core.io import FileManager, PathValidator, TreeReader, HistogramReader, DataExporter
from root_mcp.core.operations import BasicStatistics
from root_mcp.core.tools import DiscoveryTools, DataAccessTools

# Setup logging - must use stderr to avoid interfering with stdio MCP protocol
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    stream=sys.stderr,
)
logger = logging.getLogger(__name__)


[docs] class ROOTMCPServer: """Mode-aware ROOT-MCP server with lazy loading."""
[docs] def __init__(self, config: Config): """ Initialize ROOT-MCP server in specified mode. Args: config: Server configuration """ self.config = config self.server = Server(config.server.name) self.current_mode = config.server.mode # Initialize core components (always available) logger.info(f"Initializing ROOT-MCP server in {self.current_mode} mode...") self._initialize_core_components() # Initialize extended components if in extended mode self._extended_components_loaded = False self._root_native_available = False if self.current_mode == "extended": self._initialize_extended_components() # Register handlers self._register_resources() self._register_tools() logger.info(f"ROOT-MCP server initialized successfully in {self.current_mode} mode")
def _initialize_core_components(self) -> None: """Initialize core components (always available).""" self.file_manager = FileManager(self.config) self.path_validator = PathValidator(self.config) self.tree_reader = TreeReader(self.config, self.file_manager) self.histogram_reader = HistogramReader(self.config, self.file_manager) self.data_exporter = DataExporter(self.config) self.basic_stats = BasicStatistics(self.config, self.file_manager) # Core tool handlers self.discovery_tools = DiscoveryTools(self.config, self.file_manager, self.path_validator) self.data_access_tools = DataAccessTools( config=self.config, file_manager=self.file_manager, path_validator=self.path_validator, tree_reader=self.tree_reader, ) logger.info("Core components initialized") def _initialize_extended_components(self) -> None: """Initialize extended analysis components (lazy loaded).""" if self._extended_components_loaded: return try: # Import extended modules from root_mcp.extended.analysis import ( AnalysisOperations, HistogramOperations, KinematicsOperations, CorrelationAnalysis, ) from root_mcp.extended.tools import AnalysisTools, PlottingTools # Initialize extended components self.analysis_ops = AnalysisOperations(self.config, self.file_manager) self.histogram_ops = HistogramOperations(self.config, self.file_manager) self.kinematics_ops = KinematicsOperations(self.config, self.file_manager) self.correlation_analysis = CorrelationAnalysis(self.config, self.file_manager) # Extended tool handlers self.analysis_tools = AnalysisTools( config=self.config, file_manager=self.file_manager, path_validator=self.path_validator, analysis_ops=self.analysis_ops, tree_reader=self.tree_reader, ) self.plotting_tools = PlottingTools( config=self.config, file_manager=self.file_manager, path_validator=self.path_validator, histogram_ops=self.histogram_ops, ) self._extended_components_loaded = True logger.info("Extended components initialized") # Initialize native ROOT tools if enabled and available self._initialize_root_native() except ImportError as e: logger.error(f"Failed to load extended components: {e}") logger.warning( "Extended mode requires scipy and matplotlib. Falling back to core mode." ) self.current_mode = "core" self._extended_components_loaded = False def _initialize_root_native(self) -> None: """Initialize native ROOT tools if enabled and available.""" if not self.config.features.enable_root: logger.info("Native ROOT support disabled (enable_root=false)") self._root_native_available = False return if not is_root_available(): logger.info("Native ROOT support enabled but ROOT not found in environment") self._root_native_available = False return try: from root_mcp.extended.tools.root_native import RootNativeTools self.root_native_tools = RootNativeTools(config=self.config) self._root_native_available = True logger.info( "Native ROOT tools initialized (ROOT %s)", get_root_version() or "unknown version", ) except Exception as e: logger.warning("Failed to initialize native ROOT tools: %s", e) self._root_native_available = False def _unload_extended_components(self) -> None: """Unload extended components to free memory.""" if not self._extended_components_loaded: return # Remove references to extended components if hasattr(self, "analysis_ops"): del self.analysis_ops if hasattr(self, "histogram_ops"): del self.histogram_ops if hasattr(self, "kinematics_ops"): del self.kinematics_ops if hasattr(self, "correlation_analysis"): del self.correlation_analysis if hasattr(self, "analysis_tools"): del self.analysis_tools if hasattr(self, "root_native_tools"): del self.root_native_tools self._extended_components_loaded = False self._root_native_available = False logger.info("Extended components unloaded")
[docs] def switch_mode(self, new_mode: str) -> dict[str, Any]: """ Switch between core and extended modes at runtime. Args: new_mode: Target mode ('core' or 'extended') Returns: Status dictionary """ if new_mode not in ["core", "extended"]: raise ValueError(f"Invalid mode: {new_mode}. Must be 'core' or 'extended'") if new_mode == self.current_mode: return { "status": "no_change", "current_mode": self.current_mode, "message": f"Already in {new_mode} mode", } old_mode = self.current_mode if new_mode == "extended": # Switch to extended mode try: self._initialize_extended_components() self.current_mode = "extended" self.config.server.mode = "extended" return { "status": "success", "previous_mode": old_mode, "current_mode": self.current_mode, "message": f"Switched from {old_mode} to {new_mode} mode", "extended_features_available": True, } except Exception as e: return { "status": "error", "current_mode": self.current_mode, "message": f"Failed to switch to extended mode: {e}", } else: # new_mode == "core" # Switch to core mode self._unload_extended_components() self.current_mode = "core" self.config.server.mode = "core" return { "status": "success", "previous_mode": old_mode, "current_mode": self.current_mode, "message": f"Switched from {old_mode} to {new_mode} mode", "extended_features_available": False, }
def _register_resources(self) -> None: """Register MCP resources (file roots).""" @self.server.list_resources() async def list_resources() -> list[Resource]: """List available ROOT file resources.""" resources = [] for resource_config in self.config.resources: resources.append( Resource( uri=cast(Any, f"root-mcp://{resource_config.name}"), name=resource_config.name, description=resource_config.description, mimeType="application/x-root", ) ) return resources def _get_core_tools(self) -> list[Tool]: """Get core mode tools.""" return [ # Discovery tools Tool( name="list_files", description="List ROOT files in a resource", inputSchema={ "type": "object", "properties": { "resource": {"type": "string", "description": "Resource name"}, "pattern": {"type": "string", "description": "Optional glob pattern"}, }, "required": ["resource"], }, ), Tool( name="inspect_file", description="Inspect ROOT file structure and contents", inputSchema={ "type": "object", "properties": { "path": {"type": "string", "description": "File path"}, }, "required": ["path"], }, ), Tool( name="list_branches", description="List branches in a TTree or RNTuple", inputSchema={ "type": "object", "properties": { "path": {"type": "string", "description": "File path"}, "tree_name": {"type": "string", "description": "Tree name"}, "pattern": {"type": "string", "description": "Optional glob pattern"}, }, "required": ["path", "tree_name"], }, ), Tool( name="validate_file", description="Validate ROOT file integrity", inputSchema={ "type": "object", "properties": { "path": {"type": "string", "description": "File path"}, }, "required": ["path"], }, ), # Data access tools Tool( name="read_branches", description="Read branch data from a TTree or RNTuple", inputSchema={ "type": "object", "properties": { "path": {"type": "string", "description": "File path"}, "tree_name": {"type": "string", "description": "Tree name"}, "branches": {"type": "array", "items": {"type": "string"}}, "entry_start": {"type": "integer", "description": "Start entry"}, "entry_stop": {"type": "integer", "description": "Stop entry"}, "selection": {"type": "string", "description": "Optional cut expression"}, }, "required": ["path", "tree_name", "branches"], }, ), Tool( name="get_branch_stats", description="Get statistics for branches (supports derived variables via defines)", inputSchema={ "type": "object", "properties": { "path": {"type": "string", "description": "File path"}, "tree_name": {"type": "string", "description": "Tree name"}, "branches": {"type": "array", "items": {"type": "string"}}, "selection": {"type": "string", "description": "Optional cut expression"}, "defines": { "type": "object", "description": "Derived variable definitions (dict of name: expression)", }, }, "required": ["path", "tree_name", "branches"], }, ), Tool( name="export_data", description="Export branch data to JSON, CSV, or Parquet", inputSchema={ "type": "object", "properties": { "path": {"type": "string", "description": "File path"}, "tree_name": {"type": "string", "description": "Tree name"}, "branches": {"type": "array", "items": {"type": "string"}}, "output_path": {"type": "string", "description": "Output file path"}, "format": {"type": "string", "enum": ["json", "csv", "parquet"]}, "selection": {"type": "string", "description": "Optional cut expression"}, "compress": {"type": "boolean", "description": "Compress output"}, }, "required": ["path", "tree_name", "branches", "output_path", "format"], }, ), # Mode switching Tool( name="switch_mode", description="Switch between core and extended modes", inputSchema={ "type": "object", "properties": { "mode": {"type": "string", "enum": ["core", "extended"]}, }, "required": ["mode"], }, ), Tool( name="get_server_info", description="Get server mode and capabilities", inputSchema={"type": "object", "properties": {}}, ), ] def _get_extended_tools(self) -> list[Tool]: """Get extended mode tools (in addition to core tools).""" return [ Tool( name="compute_histogram", description="Compute 1D histogram with fitting support", inputSchema={ "type": "object", "properties": { "path": { "anyOf": [ {"type": "string", "description": "File path"}, { "type": "array", "items": {"type": "string"}, "description": "List of file paths", }, ] }, "tree_name": {"type": "string"}, "branch": {"type": "string"}, "bins": {"type": "integer"}, "range": {"type": "array", "items": {"type": "number"}}, "selection": {"type": "string"}, "weights": {"type": "string"}, }, "required": ["path", "tree_name", "branch", "bins"], }, ), Tool( name="compute_histogram_2d", description="Compute 2D histogram (supports derived variables via defines)", inputSchema={ "type": "object", "properties": { "path": { "anyOf": [ {"type": "string", "description": "File path"}, { "type": "array", "items": {"type": "string"}, "description": "List of file paths", }, ] }, "tree_name": {"type": "string", "description": "Tree name"}, "x_branch": {"type": "string", "description": "X-axis branch"}, "y_branch": {"type": "string", "description": "Y-axis branch"}, "x_bins": {"type": "integer", "description": "Number of bins in X"}, "y_bins": {"type": "integer", "description": "Number of bins in Y"}, "x_range": { "type": "array", "items": {"type": "number"}, "minItems": 2, "maxItems": 2, "description": "X-axis range [min, max]", }, "y_range": { "type": "array", "items": {"type": "number"}, "minItems": 2, "maxItems": 2, "description": "Y-axis range [min, max]", }, "selection": {"type": "string", "description": "Optional cut expression"}, "defines": { "type": "object", "description": "Derived variable definitions (dict of name: expression)", }, }, "required": ["path", "tree_name", "x_branch", "y_branch", "x_bins", "y_bins"], }, ), Tool( name="fit_histogram", description="Fit histogram with model function", inputSchema={ "type": "object", "properties": { "path": {"type": "string"}, "tree_name": {"type": "string"}, "branch": {"type": "string"}, "bins": {"type": "integer"}, "model": { "anyOf": [ {"type": "string"}, {"type": "array", "items": {"type": "string"}}, {"type": "array", "items": {"type": "object"}}, {"type": "object"}, ] }, "range": { "type": "array", "items": {"type": "number"}, "minItems": 2, "maxItems": 2, }, "selection": {"type": "string"}, "weights": {"type": "string"}, "defines": { "type": "object", "description": "Derived variable definitions", }, "initial_guess": {"type": "array", "items": {"type": "number"}}, "bounds": { "type": "array", "items": { "type": "array", "items": {"type": "number"}, "minItems": 2, "maxItems": 2, }, }, "fixed_parameters": { "type": "object", "additionalProperties": {"type": "number"}, }, }, "required": ["path", "tree_name", "branch", "bins", "model"], }, ), Tool( name="compute_invariant_mass", description="Compute invariant mass from 4-vectors", inputSchema={ "type": "object", "properties": { "path": {"type": "string"}, "tree_name": {"type": "string"}, "pt_branches": {"type": "array", "items": {"type": "string"}}, "eta_branches": {"type": "array", "items": {"type": "string"}}, "phi_branches": {"type": "array", "items": {"type": "string"}}, "mass_branches": {"type": "array", "items": {"type": "string"}}, }, "required": [ "path", "tree_name", "pt_branches", "eta_branches", "phi_branches", ], }, ), Tool( name="compute_correlation", description="Compute correlation between branches", inputSchema={ "type": "object", "properties": { "path": {"type": "string"}, "tree_name": {"type": "string"}, "branch_x": {"type": "string"}, "branch_y": {"type": "string"}, "method": {"type": "string", "enum": ["pearson", "spearman"]}, }, "required": ["path", "tree_name", "branch_x", "branch_y"], }, ), Tool( name="plot_histogram_1d", description="Create and save a 1D histogram plot. Provide EITHER 'data' (pre-calculated) OR 'path', 'tree_name', 'branch', 'bins' (compute from file).", inputSchema={ "type": "object", "properties": { "data": { "type": "object", "description": "Pre-calculated histogram data (bin_counts, bin_edges, etc.)", }, "path": { "type": "string", "description": "File path (required if data not provided)", }, "tree_name": { "type": "string", "description": "Tree name (required if data not provided)", }, "branch": { "type": "string", "description": "Branch to histogram (required if data not provided)", }, "bins": { "type": "integer", "description": "Number of bins (required if data not provided)", }, "range": { "type": "array", "items": {"type": "number"}, "description": "Histogram range [min, max]", }, "selection": {"type": "string", "description": "Optional cut expression"}, "weights": {"type": "string", "description": "Optional weight branch"}, "defines": { "type": "object", "description": "Derived variable definitions (dict of name: expression)", }, "output_path": { "type": "string", "description": "Output file path (e.g., /tmp/plot.png)", }, "title": {"type": "string", "description": "Plot title"}, "xlabel": {"type": "string", "description": "X-axis label"}, "ylabel": {"type": "string", "description": "Y-axis label"}, "log_y": {"type": "boolean", "description": "Use log scale for y-axis"}, "style": { "type": "string", "enum": ["default", "publication", "presentation"], "description": "Plot style", }, }, "required": ["output_path"], }, ), Tool( name="plot_histogram_2d", description="Create and save a 2D histogram plot. Provide EITHER 'data' (pre-calculated) OR 'path', 'tree_name', 'branch_x'...' (compute from file).", inputSchema={ "type": "object", "properties": { "data": { "type": "object", "description": "Pre-calculated histogram data (bin_counts, x_edges, y_edges, etc.)", }, "path": { "type": "string", "description": "File path (required if data not provided)", }, "tree_name": { "type": "string", "description": "Tree name (required if data not provided)", }, "branch_x": { "type": "string", "description": "X-axis branch (required if data not provided)", }, "branch_y": { "type": "string", "description": "Y-axis branch (required if data not provided)", }, "bins_x": { "type": "integer", "description": "Number of bins in X (required if data not provided)", }, "bins_y": { "type": "integer", "description": "Number of bins in Y (required if data not provided)", }, "range_x": { "type": "array", "items": {"type": "number"}, "description": "X-axis range [min, max]", }, "range_y": { "type": "array", "items": {"type": "number"}, "description": "Y-axis range [min, max]", }, "selection": {"type": "string", "description": "Optional cut expression"}, "weights": {"type": "string", "description": "Optional weight branch"}, "defines": { "type": "object", "description": "Derived variable definitions (dict of name: expression)", }, "output_path": {"type": "string", "description": "Output file path"}, "title": {"type": "string", "description": "Plot title"}, "xlabel": {"type": "string", "description": "X-axis label"}, "ylabel": {"type": "string", "description": "Y-axis label"}, "colormap": {"type": "string", "description": "Matplotlib colormap name"}, "log_z": {"type": "boolean", "description": "Use log scale for color"}, "style": { "type": "string", "enum": ["default", "publication", "presentation"], }, }, "required": ["output_path"], }, ), Tool( name="histogram_arithmetic", description="Perform bin-by-bin arithmetic on two histograms (e.g. asymmetry, difference, ratio)", inputSchema={ "type": "object", "properties": { "operation": { "type": "string", "enum": ["add", "subtract", "multiply", "divide", "asymmetry"], "description": "Operation to perform: data1 [op] data2. Asymmetry is (d1-d2)/(d1+d2).", }, "data1": { "type": "object", "description": "First histogram data (result from compute_histogram)", }, "data2": { "type": "object", "description": "Second histogram data", }, }, "required": ["operation", "data1", "data2"], }, ), ] def _get_root_native_tools(self) -> list[Tool]: """Get native ROOT tools (only when ROOT is enabled and available).""" return [ Tool( name="run_root_code", description=( "Execute PyROOT/Python code with native ROOT. " "Use for operations not possible with uproot: RDataFrame, RooFit, " "custom classes, TCanvas plots, C++ interop, etc. " "IMPORTANT: Always start code with 'import ROOT' and " "'ROOT.gROOT.SetBatch(True)' (prevents GUI). " "The variable '_output_dir' is available in code as a writable " "directory for saving files (plots, ROOT files). " "The variable '_input_files' contains the list of input file paths. " "To return structured data, call '_set_result(value)' where value " "is a JSON-serializable object, or print JSON to stdout. " "Prefer run_rdataframe for simple histograms." ), inputSchema={ "type": "object", "properties": { "code": { "type": "string", "description": ( "Python code to execute. Must import ROOT explicitly. " "Use ROOT.gROOT.SetBatch(True) to prevent GUI. " "Use _output_dir for file output, _set_result() for structured results." ), }, "output_dir": { "type": "string", "description": "Directory for output files (optional, defaults to temp dir)", }, "timeout": { "type": "integer", "description": ( "Execution timeout in seconds (default: 60). " "Increase for large files or complex fits." ), }, "input_files": { "type": "array", "items": {"type": "string"}, "description": ( "Paths to ROOT files the code needs. " "Available inside code as _input_files list." ), }, }, "required": ["code"], }, ), Tool( name="run_rdataframe", description=( "Compute a 1D histogram using ROOT RDataFrame. " "Only supports 1D histograms of a single branch. " "For 2D histograms, profiles, Define() columns, or other RDataFrame " "operations, use run_root_code instead. " "Preferred over run_root_code for simple 1D histograms — no boilerplate needed. " "Returns JSON with entries, mean, std_dev, bin_contents, bin_errors, bin_edges. " "Use inspect_file first to discover tree and branch names. " "Selection uses C++ syntax (e.g. 'pt > 20 && abs(eta) < 2.5'). " "Requires native ROOT." ), inputSchema={ "type": "object", "properties": { "file_path": { "type": "string", "description": "Path to the ROOT file", }, "tree_name": { "type": "string", "description": "Name of the TTree or RNTuple", }, "branch": { "type": "string", "description": "Branch to histogram", }, "bins": { "type": "integer", "description": "Number of bins", }, "range_min": { "type": "number", "description": "Histogram range minimum", }, "range_max": { "type": "number", "description": "Histogram range maximum", }, "selection": { "type": "string", "description": "Optional cut expression (C++ syntax for RDF Filter)", }, "weight": { "type": "string", "description": "Optional weight column name", }, "output_path": { "type": "string", "description": "Save histogram plot to this path (png, pdf, svg)", }, "timeout": { "type": "integer", "description": "Execution timeout in seconds", }, }, "required": [ "file_path", "tree_name", "branch", "bins", "range_min", "range_max", ], }, ), Tool( name="run_root_macro", description=( "Execute a ROOT C++ macro via gROOT.ProcessLine. " 'Use for short C++ snippets (e.g. \'TH1F h("h","h",100,-5,5); ' 'h.FillRandom("gaus",10000);\'). Multi-line code is supported. ' "For complex analysis, prefer run_root_code with Python. " "Requires native ROOT." ), inputSchema={ "type": "object", "properties": { "macro_code": { "type": "string", "description": "C++ code to execute", }, "output_path": { "type": "string", "description": "Save any canvas output to this path (optional)", }, "timeout": { "type": "integer", "description": "Execution timeout in seconds", }, }, "required": ["macro_code"], }, ), ] def _register_tools(self) -> None: """Register all MCP tools based on current mode.""" @self.server.list_tools() async def list_tools() -> list[Tool]: """List available tools based on current mode.""" tools = self._get_core_tools() if self.current_mode == "extended" and self._extended_components_loaded: tools.extend(self._get_extended_tools()) if self._root_native_available: tools.extend(self._get_root_native_tools()) return tools @self.server.call_tool() async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: """Handle tool calls with mode awareness.""" import json try: # Mode management tools if name == "switch_mode": result = self.switch_mode(arguments["mode"]) elif name == "get_server_info": result = { "server_name": self.config.server.name, "version": self.config.server.version, "current_mode": self.current_mode, "extended_components_loaded": self._extended_components_loaded, "available_modes": ["core", "extended"], "root_native_available": is_root_available(), "root_native_enabled": ( self.config.features.enable_root and is_root_available() ), "root_version": get_root_version(), "root_features": get_root_features(), } # Core tools (always available) elif name == "list_files": result = self.discovery_tools.list_files(**arguments) elif name == "inspect_file": result = self.discovery_tools.inspect_file(**arguments) elif name == "list_branches": result = self.discovery_tools.list_branches(**arguments) elif name == "validate_file": result = self.file_manager.validate_file(arguments["path"]) elif name == "read_branches": result = self.data_access_tools.read_branches(**arguments) elif name == "get_branch_stats": # Handle defines parameter if passed as JSON string defines = arguments.get("defines") if defines is not None and isinstance(defines, str): import json try: defines = json.loads(defines) except json.JSONDecodeError: result = { "error": "invalid_parameter", "message": "Invalid JSON in defines parameter", } return [TextContent(type="text", text=json.dumps(result, indent=2))] result = self.basic_stats.compute_stats( arguments["path"], arguments["tree_name"], arguments["branches"], arguments.get("selection"), defines, ) elif name == "export_data": # Read data directly for export tree = self.file_manager.get_tree(arguments["path"], arguments["tree_name"]) arrays = tree.arrays( filter_name=arguments["branches"], cut=arguments.get("selection"), library="ak", ) # Export result = self.data_exporter.export( arrays, arguments["output_path"], arguments["format"], compress=arguments.get("compress", False), ) # Extended tools (only in extended mode) elif name in [ "compute_histogram", "compute_histogram_2d", "fit_histogram", "compute_invariant_mass", "compute_correlation", "plot_histogram_1d", "plot_histogram_2d", "histogram_arithmetic", ]: if self.current_mode != "extended" or not self._extended_components_loaded: result = { "error": "mode_error", "message": f"Tool '{name}' requires extended mode. Current mode: {self.current_mode}", "hint": "Use switch_mode tool to enable extended mode", } else: # Delegate to appropriate handler if name == "compute_histogram": result = self.analysis_tools.compute_histogram(**arguments) elif name == "compute_histogram_2d": result = self.analysis_tools.compute_histogram_2d(**arguments) elif name == "fit_histogram": result = self.analysis_tools.fit_histogram(**arguments) elif name == "compute_invariant_mass": result = self.kinematics_ops.compute_invariant_mass(**arguments) elif name == "compute_correlation": result = self.correlation_analysis.compute_correlation(**arguments) elif name == "plot_histogram_1d": result = self.plotting_tools.plot_histogram_1d(**arguments) elif name == "plot_histogram_2d": result = self.plotting_tools.plot_histogram_2d(**arguments) elif name == "histogram_arithmetic": result = self.analysis_tools.compute_histogram_arithmetic(**arguments) # Native ROOT tools elif name in ["run_root_code", "run_rdataframe", "run_root_macro"]: if not self._root_native_available: result = { "error": "root_not_available", "message": ( "Native ROOT tools are not available. " "Ensure ROOT is installed and enable_root is set to true in config." ), "hint": "Use get_server_info to check ROOT availability", } else: if name == "run_root_code": result = self.root_native_tools.run_root_code(**arguments) elif name == "run_rdataframe": result = self.root_native_tools.run_rdataframe(**arguments) elif name == "run_root_macro": result = self.root_native_tools.run_root_macro(**arguments) else: result = { "error": "unknown_tool", "message": f"Unknown tool: {name}", } except Exception as e: logger.error(f"Tool {name} failed: {e}", exc_info=True) result = { "error": "internal_error", "message": f"Internal error: {e}", } return [TextContent(type="text", text=json.dumps(result, indent=2))]
[docs] async def run(self) -> None: """Run the MCP server.""" logger.info(f"Starting {self.config.server.name} v{self.config.server.version}") logger.info(f"Mode: {self.current_mode}") logger.info(f"Resources configured: {len(self.config.resources)}") async with stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, self.server.create_initialization_options(), )
def _run_init(argv: list[str]) -> None: """Handle the ``root-mcp init`` sub-command.""" parser = argparse.ArgumentParser( prog="root-mcp init", description="Generate a minimal config.yaml for ROOT-MCP.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=( "Examples:\n" " root-mcp init # placeholder URI, edit before use\n" " root-mcp init --permissive # URI set to current directory\n" " root-mcp init --permissive --output ~/my-config.yaml\n" ), ) parser.add_argument( "--permissive", action="store_true", help=( "Set the resource URI to the current working directory so the " "generated config works immediately without further editing." ), ) parser.add_argument( "--output", type=str, default="config.yaml", metavar="PATH", help="Where to write the config file (default: ./config.yaml).", ) args = parser.parse_args(argv) output_path = Path(args.output).resolve() if output_path.exists(): print(f"Warning: {output_path} already exists — overwriting.", file=sys.stderr) if args.permissive: uri = f"file://{Path.cwd()}" else: uri = "file:///REPLACE_WITH_YOUR_DATA_PATH" # Detect whether ROOT/PyROOT is available so the flag is pre-set correctly. root_detected = is_root_available() enable_root = "true" if root_detected else "false" if root_detected: print("ROOT/PyROOT detected — setting enable_root: true in generated config.") content = _CONFIG_TEMPLATE.format(uri=uri, enable_root=enable_root) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(content) print(f"Created: {output_path}") if not args.permissive: print( f" → Edit the 'uri' field to point at your ROOT files, then run:\n" f" root-mcp --config {output_path}" ) else: print(f" → Config is ready. Run:\n" f" root-mcp --config {output_path}")
[docs] def main() -> None: """Main entry point.""" # Dispatch 'root-mcp init …' before the main parser so the init sub-command # gets its own clean argument namespace and the existing server flags are # unaffected. if len(sys.argv) > 1 and sys.argv[1] == "init": _run_init(sys.argv[2:]) return parser = argparse.ArgumentParser( description="ROOT-MCP Server", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=( "Zero-config quick start:\n" " root-mcp --data-path /path/to/root/files\n\n" "With native ROOT support (no config file needed):\n" " root-mcp --data-path /path/to/root/files --enable-root\n\n" "Multiple directories:\n" " root-mcp --data-path /data/run3 --data-path /data/mc\n\n" "Via environment variable:\n" " ROOT_MCP_DATA_PATH=/data/run3 root-mcp\n\n" "Generate a config file:\n" " root-mcp init --permissive" ), ) parser.add_argument( "--config", type=str, help="Path to configuration file (overrides ROOT_MCP_CONFIG env var)", ) parser.add_argument( "--data-path", action="append", metavar="DIR", dest="data_paths", help=( "Local directory containing ROOT files. " "Can be specified multiple times. " "Adds a resource and permits access to that directory. " "No config.yaml required." ), ) parser.add_argument( "--enable-root", action="store_true", default=False, dest="enable_root", help=( "Enable native ROOT/PyROOT tools (run_root_code, run_rdataframe, " "run_root_macro). Requires a ROOT installation on PATH. " "No config.yaml required." ), ) parser.add_argument( "--mode", choices=["core", "extended"], default=None, dest="mode", metavar="MODE", help=( "Server mode: 'core' (file I/O + basic stats only) or " "'extended' (full analysis suite, default). " "Overrides config.yaml and ROOT_MCP_MODE." ), ) parser.add_argument( "--server-name", default=None, dest="server_name", metavar="NAME", help=( "Override the MCP server name reported to clients. " "Overrides config.yaml and ROOT_MCP_SERVER_NAME." ), ) # Security parser.add_argument( "--allowed-root", action="append", dest="allowed_root", metavar="DIR", help=( "Restrict file access to this directory (absolute path). " "Repeat to allow multiple directories. " "Replaces any allowed_roots set in config.yaml. " "Overrides ROOT_MCP_ALLOWED_ROOTS." ), ) _allow_remote_group = parser.add_mutually_exclusive_group() _allow_remote_group.add_argument( "--allow-remote", dest="allow_remote", action="store_true", help="Allow access to remote (non-file://) URIs.", ) _allow_remote_group.add_argument( "--no-allow-remote", dest="allow_remote", action="store_false", help="Deny access to remote URIs (default behaviour).", ) parser.set_defaults(allow_remote=None) parser.add_argument( "--allowed-protocols", default=None, dest="allowed_protocols", metavar="PROTOCOLS", help=( "Comma-separated list of permitted URI protocols " "(e.g. 'file,root,http'). " "Replaces config.yaml allowed_protocols. " "Overrides ROOT_MCP_ALLOWED_PROTOCOLS." ), ) parser.add_argument( "--max-path-depth", type=int, default=None, dest="max_path_depth", metavar="N", help=( "Maximum directory depth for path validation (default: 10). " "Overrides ROOT_MCP_MAX_PATH_DEPTH." ), ) # Output / Export parser.add_argument( "--export-path", default=None, dest="export_path", metavar="DIR", help=( "Directory for exported files (default: /tmp/root_mcp_output). " "Overrides ROOT_MCP_EXPORT_PATH." ), ) parser.add_argument( "--export-formats", default=None, dest="export_formats", metavar="FORMATS", help=( "Comma-separated list of permitted export formats " "(e.g. 'json,csv,parquet'). " "Overrides ROOT_MCP_EXPORT_FORMATS." ), ) parser.add_argument( "--no-export", dest="enable_export", action="store_false", help="Disable the file export feature entirely.", ) parser.set_defaults(enable_export=None) # Core Limits & Cache parser.add_argument( "--max-rows", type=int, default=None, dest="max_rows", metavar="N", help=( "Maximum rows returned per read call (default: 1_000_000). " "Overrides ROOT_MCP_MAX_ROWS." ), ) parser.add_argument( "--max-export-rows", type=int, default=None, dest="max_export_rows", metavar="N", help=( "Maximum rows written per export (default: 10_000_000). " "Overrides ROOT_MCP_MAX_EXPORT_ROWS." ), ) parser.add_argument( "--no-cache", dest="cache_enabled", action="store_false", help="Disable the in-memory file metadata cache.", ) parser.set_defaults(cache_enabled=None) parser.add_argument( "--cache-size", type=int, default=None, dest="cache_size", metavar="N", help=( "Number of file entries held in the metadata cache (default: 50). " "Overrides ROOT_MCP_CACHE_SIZE." ), ) # Extended Analysis parser.add_argument( "--max-bins-1d", type=int, default=None, dest="max_bins_1d", metavar="N", help="Maximum bins for 1D histograms (default: 10000). Overrides ROOT_MCP_MAX_BINS_1D.", ) parser.add_argument( "--max-bins-2d", type=int, default=None, dest="max_bins_2d", metavar="N", help="Maximum bins for 2D histograms (default: 1000). Overrides ROOT_MCP_MAX_BINS_2D.", ) parser.add_argument( "--fitting-iterations", type=int, default=None, dest="fitting_iterations", metavar="N", help="Maximum fitting iterations (default: 10000). Overrides ROOT_MCP_FITTING_ITERATIONS.", ) parser.add_argument( "--plot-dpi", type=int, default=None, dest="plot_dpi", metavar="N", help="Plot resolution in DPI (default: 100). Overrides ROOT_MCP_PLOT_DPI.", ) parser.add_argument( "--plot-format", choices=["png", "pdf", "svg"], default=None, dest="plot_format", metavar="FMT", help="Default plot output format: png, pdf, or svg (default: png). Overrides ROOT_MCP_PLOT_FORMAT.", ) parser.add_argument( "--plot-width", type=float, default=None, dest="plot_width", metavar="N", help="Plot figure width in inches (default: 10.0). Overrides ROOT_MCP_PLOT_WIDTH.", ) parser.add_argument( "--plot-height", type=float, default=None, dest="plot_height", metavar="N", help="Plot figure height in inches (default: 6.0). Overrides ROOT_MCP_PLOT_HEIGHT.", ) # Native ROOT Execution parser.add_argument( "--root-timeout", type=int, default=None, dest="root_timeout", metavar="N", help="ROOT execution timeout in seconds (default: 60). Overrides ROOT_MCP_ROOT_TIMEOUT.", ) parser.add_argument( "--root-workdir", type=str, default=None, dest="root_workdir", metavar="DIR", help="Working directory for ROOT execution (default: /tmp/root_mcp_native). Overrides ROOT_MCP_ROOT_WORKDIR.", ) parser.add_argument( "--root-max-output", type=int, default=None, dest="root_max_output", metavar="N", help="Maximum output size from ROOT in bytes (default: 10_000_000). Overrides ROOT_MCP_ROOT_MAX_OUTPUT.", ) parser.add_argument( "--root-max-code", type=int, default=None, dest="root_max_code", metavar="N", help="Maximum ROOT script length in characters (default: 100_000). Overrides ROOT_MCP_ROOT_MAX_CODE.", ) # Remote Resources parser.add_argument( "--resource", action="append", default=None, dest="resource", metavar="NAME=URI[|DESCRIPTION]", help=( "Declare a named resource. Format: NAME=URI or NAME=URI|DESCRIPTION " "(use | to separate the optional description from the URI, since URIs " "contain colons). Can be repeated. Overrides ROOT_MCP_RESOURCES." ), ) # Log Level parser.add_argument( "--log-level", choices=["DEBUG", "INFO", "WARNING", "ERROR"], default=None, dest="log_level", metavar="LEVEL", help=( "Set logging verbosity: DEBUG, INFO, WARNING, or ERROR " "(default: INFO). Overrides ROOT_MCP_LOG_LEVEL." ), ) args = parser.parse_args() # Apply log level as early as possible — before load_config so that # config-loading log messages are also at the right verbosity. import os as _os from root_mcp.config import apply_log_level as _apply_log_level _env_log_level = _os.environ.get("ROOT_MCP_LOG_LEVEL", "").strip().upper() _cli_log_level = getattr(args, "log_level", None) # CLI wins over env _final_log_level = _cli_log_level or _env_log_level or None if _final_log_level: try: _apply_log_level(_final_log_level) except ValueError as e: # Use print because logger level may not be set correctly yet. print(f"root-mcp: invalid log level: {e}", file=sys.stderr) sys.exit(1) try: config = load_config(args.config) except Exception as e: logger.error(f"Failed to load configuration: {e}") sys.exit(1) # Merge data paths from CLI --data-path flags. from root_mcp.config import apply_data_paths cli_paths: list[str] = args.data_paths or [] if cli_paths: apply_data_paths(config, cli_paths) logger.info(f"Added {len(cli_paths)} data path(s) from --data-path: {cli_paths}") # Apply environment variable overrides (priority 3: above YAML, below CLI). from root_mcp.config import apply_env_overrides, apply_cli_overrides try: apply_env_overrides(config) except ValueError as e: logger.error(f"Invalid environment variable: {e}") sys.exit(1) # Apply CLI flag overrides (priority 4: highest). try: apply_cli_overrides(config, args) except ValueError as e: logger.error(f"Invalid CLI argument: {e}") sys.exit(1) # Enable native ROOT support via --enable-root flag or ROOT_MCP_ENABLE_ROOT env var. # (Shipped before apply_env/cli_overrides; kept inline for backward compat.) import os as _os if args.enable_root or _os.environ.get("ROOT_MCP_ENABLE_ROOT", "").lower() in ( "1", "true", "yes", ): config.features.enable_root = True logger.info("Native ROOT support enabled via --enable-root / ROOT_MCP_ENABLE_ROOT") server = ROOTMCPServer(config) try: asyncio.run(server.run()) except KeyboardInterrupt: logger.info("Server stopped by user") except Exception as e: logger.error(f"Server error: {e}", exc_info=True) sys.exit(1)
if __name__ == "__main__": main()