Source code for root_mcp.core.tools.discovery

"""Discovery tools for ROOT files (list, inspect, etc.)."""

from __future__ import annotations

import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from root_mcp.config import Config
    from root_mcp.core.io.file_manager import FileManager
    from root_mcp.core.io.validators import PathValidator

logger = logging.getLogger(__name__)


[docs] class DiscoveryTools: """Tools for discovering and inspecting ROOT files."""
[docs] def __init__( self, config: Config, file_manager: FileManager, path_validator: PathValidator, ): """ Initialize discovery tools. Args: config: Server configuration file_manager: File manager instance path_validator: Path validator instance """ self.config = config self.file_manager = file_manager self.path_validator = path_validator
[docs] def list_files( self, resource: str | None = None, pattern: str | None = None, limit: int = 100, ) -> dict[str, Any]: """ List ROOT files in a resource. Args: resource: Resource ID (None = default) pattern: Glob pattern to filter files limit: Maximum files to return Returns: List of files with metadata """ # Get resource config if resource: resource_config = self.config.get_resource(resource) if not resource_config: available = [r.name for r in self.config.resources] return { "error": "resource_not_found", "message": f"Resource '{resource}' not found", "details": {"available_resources": available}, "suggestion": f"Use one of: {available}", } else: resource_config = self.config.get_default_resource() if not resource_config: return { "error": "no_resources", "message": "No resources configured", "suggestion": "Configure at least one resource in config.yaml", } # Parse resource URI to get base path uri = resource_config.uri if uri.startswith("file://"): base_path = Path(uri[7:]) else: # For remote resources, we'd need different handling return { "error": "not_implemented", "message": f"Remote resources not yet implemented: {uri}", } # List files if not base_path.exists(): return { "error": "path_not_found", "message": f"Resource path does not exist: {base_path}", } files = [] total_scanned = 0 # Scan directory for file_path in base_path.rglob("*.root"): total_scanned += 1 # Check pattern if pattern and not self._matches_pattern(file_path.name, pattern): continue # Check resource patterns if not self.path_validator.check_file_pattern(file_path, resource_config): continue # Get file info try: stat = file_path.stat() files.append( { "path": str(file_path), "size_bytes": stat.st_size, "modified": stat.st_mtime, "resource": resource_config.name, } ) except OSError as e: logger.warning(f"Failed to stat {file_path}: {e}") continue # Apply limit if len(files) >= limit: break # Generate suggestions suggestions = [] if files: suggestions.append(f"Inspect {files[0]['path']} with inspect_file()") if len(files) >= limit: suggestions.append(f"Showing first {limit} files, use pattern to filter") return { "data": { "files": files, "total_matched": len(files), "total_scanned": total_scanned, }, "metadata": { "operation": "list_files", "resource": resource_config.name, }, "suggestions": suggestions, }
[docs] def inspect_file( self, path: str, include_histograms: bool = True, include_trees: bool = True, ) -> dict[str, Any]: """ Inspect a ROOT file's structure. Args: path: File path include_histograms: Include histogram metadata include_trees: Include tree metadata Returns: File structure and metadata """ try: # Validate path validated_path = self.path_validator.validate_path(path) except Exception as e: return { "error": "invalid_path", "message": str(e), "suggestion": "Check path and ensure it's under an allowed root", } # Get file info try: file_info = self.file_manager.get_file_info(validated_path) except FileNotFoundError: return { "error": "file_not_found", "message": f"File not found: {path}", "suggestion": "Use list_files() to see available files", } except Exception as e: return { "error": "file_read_error", "message": f"Failed to open file: {e}", } # Get trees trees = [] if include_trees: try: trees = self.file_manager.list_trees(validated_path) except Exception as e: logger.warning(f"Failed to list trees: {e}") # Get histograms histograms = [] if include_histograms: try: histograms = self.file_manager.list_histograms(validated_path) except Exception as e: logger.warning(f"Failed to list histograms: {e}") # Get all objects to find directories all_objects = self.file_manager.list_objects(validated_path) directories = [obj["path"] for obj in all_objects if "TDirectory" in obj["type"]] # Other objects (not trees or histograms) known_paths = {t["path"] for t in trees} | {h["path"] for h in histograms} other_objects = [ obj for obj in all_objects if obj["path"] not in known_paths and "TDirectory" not in obj["type"] ] # Generate suggestions suggestions = [] if trees: main_tree = trees[0] suggestions.append( f"Explore '{main_tree['name']}' tree with " f"{main_tree['entries']:,} entries using list_branches()" ) if histograms: suggestions.append(f"Read histogram '{histograms[0]['name']}' with read_histogram()") return { "data": { "path": str(validated_path), "size_bytes": file_info.get("size_bytes"), "compression": file_info.get("compression"), "trees": trees, "histograms": histograms, "directories": directories, "other_objects": other_objects[:10], # Limit to first 10 }, "metadata": { "operation": "inspect_file", }, "suggestions": suggestions, }
[docs] def list_branches( self, path: str, tree_name: str, pattern: str | None = None, limit: int = 100, include_stats: bool = False, ) -> dict[str, Any]: """ List branches in a TTree or RNTuple. Args: path: File path tree_name: Tree name pattern: Glob pattern to filter branches limit: Maximum branches to return include_stats: Compute statistics (slower) Returns: Branch information """ try: validated_path = self.path_validator.validate_path(path) except Exception as e: return { "error": "invalid_path", "message": str(e), } try: tree_obj = self.file_manager.get_tree(validated_path, tree_name) except KeyError as e: available_trees = [t["name"] for t in self.file_manager.list_trees(validated_path)] return { "error": "tree_not_found", "message": str(e), "details": {"available_trees": available_trees}, "suggestion": f"Use one of: {available_trees}", } # Get branch info from root_mcp.core.io.readers import TreeReader reader = TreeReader(self.config, self.file_manager) try: branch_info = reader.get_branch_info(str(validated_path), tree_name, pattern) except Exception as e: return { "error": "read_error", "message": f"Failed to read branches: {e}", } # Limit results total_branches = len(branch_info) branch_info = branch_info[:limit] # Optionally compute stats if include_stats: try: branch_names = [b["name"] for b in branch_info] stats = reader.compute_branch_stats( str(validated_path), tree_name, branch_names, ) # Add stats to branch info for branch in branch_info: if branch["name"] in stats: branch["stats"] = stats[branch["name"]] except Exception as e: logger.warning(f"Failed to compute stats: {e}") # Suggestions suggestions = [] if total_branches > limit: suggestions.append(f"{total_branches} total branches - use pattern to filter") if branch_info: first_branches = [b["name"] for b in branch_info[:3]] suggestions.append(f"Sample data with read_branches(branches={first_branches})") return { "data": { "tree": tree_name, "total_entries": tree_obj.num_entries, "total_branches": total_branches, "branches": branch_info, "matched": len(branch_info), }, "metadata": { "operation": "list_branches", }, "suggestions": suggestions, }
@staticmethod def _matches_pattern(filename: str, pattern: str) -> bool: """Check if filename matches glob pattern.""" import fnmatch return fnmatch.fnmatch(filename, pattern)