"""Subprocess-based PyROOT code execution engine.
Executes user-provided Python/PyROOT code in an isolated subprocess,
capturing stdout, stderr, output files, and a structured result.
"""
from __future__ import annotations
import json
import logging
import os
import subprocess
import sys
import tempfile
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from .sandbox import CodeValidator, ValidationResult
from root_mcp.common.root_availability import _build_root_env
logger = logging.getLogger(__name__)
# Template that wraps user code for subprocess execution
_WRAPPER_TEMPLATE = """\
import json
import os
import sys
import traceback
# Redirect working directory
os.chdir({working_dir!r})
# Make input files accessible
_input_files = {input_files!r}
_output_dir = {output_dir!r}
_result = {{
"status": "success",
"return_value": None,
"output_files": [],
"error": None,
"traceback": None,
}}
def _set_result(value):
\"\"\"Set a structured return value (must be JSON-serializable).\"\"\"
_result["return_value"] = value
try:
# --- Begin user code ---
{indented_code}
# --- End user code ---
except Exception as _exc:
_result["status"] = "error"
_result["error"] = str(_exc)
_result["traceback"] = traceback.format_exc()
# Collect output files
if os.path.isdir(_output_dir):
for _f in os.listdir(_output_dir):
_fpath = os.path.join(_output_dir, _f)
if os.path.isfile(_fpath):
_result["output_files"].append(_fpath)
# Write structured result to a known file
_result_path = os.path.join({working_dir!r}, "_result.json")
with open(_result_path, "w") as _rf:
json.dump(_result, _rf)
"""
[docs]
@dataclass
class ExecutionResult:
"""Result of a PyROOT code execution."""
status: str # "success", "error", "timeout", "validation_failed"
stdout: str = ""
stderr: str = ""
return_value: Any = None
output_files: list[str] = field(default_factory=list)
execution_time_seconds: float = 0.0
error: str | None = None
traceback: str | None = None
validation: ValidationResult | None = None
[docs]
class RootCodeExecutor:
"""Execute PyROOT code in an isolated subprocess.
Parameters
----------
execution_timeout : int
Maximum execution time in seconds.
max_output_size : int
Maximum size of captured stdout/stderr in bytes.
allowed_output_formats : list[str]
File extensions allowed in output directory.
working_directory : str
Base directory for execution working dirs.
validator : CodeValidator | None
Code validator instance. If None, a default one is created.
"""
[docs]
def __init__(
self,
*,
execution_timeout: int = 60,
max_output_size: int = 10_000_000,
allowed_output_formats: list[str] | None = None,
working_directory: str = "/tmp/root_mcp_native",
validator: CodeValidator | None = None,
) -> None:
self.execution_timeout = execution_timeout
self.max_output_size = max_output_size
self.allowed_output_formats = allowed_output_formats or [
"png",
"pdf",
"svg",
"root",
"json",
"csv",
]
self.working_directory = working_directory
self.validator = validator or CodeValidator()
[docs]
def execute(
self,
code: str,
*,
input_files: list[str] | None = None,
output_dir: str | None = None,
timeout: int | None = None,
skip_validation: bool = False,
) -> ExecutionResult:
"""Execute PyROOT code in a subprocess.
Parameters
----------
code : str
Python code to execute (may import ROOT).
input_files : list[str] | None
Paths to ROOT files the code needs access to.
output_dir : str | None
Directory for output files. Created inside working_directory if None.
timeout : int | None
Override execution timeout in seconds.
skip_validation : bool
Skip AST validation (for trusted/internal code).
Returns
-------
ExecutionResult
Structured execution result.
"""
effective_timeout = timeout or self.execution_timeout
input_files = input_files or []
# Step 1: Validate code
if not skip_validation:
validation = self.validator.validate(code)
if not validation.is_valid:
return ExecutionResult(
status="validation_failed",
error="Code validation failed: " + "; ".join(validation.errors),
validation=validation,
)
else:
validation = None
# Step 2: Prepare working directory
base_dir = Path(self.working_directory)
base_dir.mkdir(parents=True, exist_ok=True)
work_dir = tempfile.mkdtemp(dir=base_dir, prefix="exec_")
if output_dir is None:
output_dir = os.path.join(work_dir, "output")
os.makedirs(output_dir, exist_ok=True)
# Step 3: Build wrapper script
indented_code = "\n".join(
" " + line if line.strip() else "" for line in code.splitlines()
)
wrapper = _WRAPPER_TEMPLATE.format(
working_dir=work_dir,
output_dir=output_dir,
input_files=input_files,
indented_code=indented_code,
)
script_path = os.path.join(work_dir, "_runner.py")
with open(script_path, "w") as f:
f.write(wrapper)
# Step 4: Execute in subprocess
start_time = time.monotonic()
try:
proc = subprocess.run(
[sys.executable, script_path],
capture_output=True,
text=True,
timeout=effective_timeout,
cwd=work_dir,
env=self._build_env(),
)
elapsed = time.monotonic() - start_time
# Truncate output if too large
stdout = self._truncate(proc.stdout)
stderr = self._truncate(proc.stderr)
# Step 5: Read structured result
result_path = os.path.join(work_dir, "_result.json")
structured = self._read_result_file(result_path)
exec_result = ExecutionResult(
status=structured.get("status", "error"),
stdout=stdout,
stderr=stderr,
return_value=structured.get("return_value"),
output_files=structured.get("output_files", []),
execution_time_seconds=round(elapsed, 3),
error=structured.get("error"),
traceback=structured.get("traceback"),
validation=validation,
)
# Clean up working directory (preserve output files)
self._cleanup_work_dir(work_dir, exec_result.output_files)
return exec_result
except subprocess.TimeoutExpired:
elapsed = time.monotonic() - start_time
logger.warning(
"ROOT code execution timed out after %.1fs (limit: %ds)",
elapsed,
effective_timeout,
)
return ExecutionResult(
status="timeout",
execution_time_seconds=round(elapsed, 3),
error=f"Execution timed out after {effective_timeout} seconds",
validation=validation,
)
except Exception as e:
elapsed = time.monotonic() - start_time
logger.error("ROOT code execution failed: %s", e)
return ExecutionResult(
status="error",
execution_time_seconds=round(elapsed, 3),
error=str(e),
validation=validation,
)
def _build_env(self) -> dict[str, str]:
"""Build environment variables for the subprocess."""
env = _build_root_env() # includes PYTHONPATH with ROOT lib dir if needed
# Ensure ROOT batch mode (no GUI)
env["ROOT_BATCH"] = "1"
return env
def _truncate(self, text: str) -> str:
"""Truncate text to max_output_size."""
if len(text) > self.max_output_size:
return text[: self.max_output_size] + f"\n... [truncated, {len(text)} total bytes]"
return text
@staticmethod
def _read_result_file(path: str) -> dict[str, Any]:
"""Read the structured result JSON file written by the wrapper."""
try:
with open(path) as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError) as e:
logger.debug("Could not read result file %s: %s", path, e)
return {
"status": "error",
"error": f"Failed to read execution result: {e}",
}
@staticmethod
def _cleanup_work_dir(work_dir: str, output_files: list[str]) -> None:
"""Remove the temporary working directory after execution.
Output files that live inside the work_dir are left in place so
callers can still read them; only internal bookkeeping files
(_runner.py, _result.json) and the output sub-directory are removed
once the output file list has been captured.
"""
try:
# Identify files we must keep (output files the caller may need)
keep = {os.path.abspath(f) for f in output_files}
work_abs = os.path.abspath(work_dir)
for root, dirs, files in os.walk(work_abs, topdown=False):
for fname in files:
fpath = os.path.join(root, fname)
if os.path.abspath(fpath) not in keep:
os.remove(fpath)
for dname in dirs:
dpath = os.path.join(root, dname)
# Remove dir only if empty (output files may keep it alive)
try:
os.rmdir(dpath)
except OSError:
pass # Not empty — output files still inside
# Remove the work_dir itself if empty
try:
os.rmdir(work_abs)
except OSError:
pass # Output files still inside
except Exception as e:
logger.debug("Cleanup of %s failed: %s", work_dir, e)