"""File manager for opening, caching, and managing ROOT files."""
from __future__ import annotations
import logging
from collections import OrderedDict
from pathlib import Path
from typing import TYPE_CHECKING, Any
import uproot
if TYPE_CHECKING:
from root_mcp.config import Config
logger = logging.getLogger(__name__)
[docs]
class FileCache:
"""LRU cache for open ROOT files."""
[docs]
def __init__(self, max_size: int):
"""
Initialize file cache.
Args:
max_size: Maximum number of files to keep open
"""
self.max_size = max_size
self._cache: OrderedDict[str, Any] = OrderedDict()
[docs]
def get(self, path: str) -> Any | None:
"""
Get file from cache.
Args:
path: File path
Returns:
Open file object or None if not cached
"""
if path in self._cache:
# Move to end (most recently used)
self._cache.move_to_end(path)
logger.debug(f"Cache hit: {path}")
return self._cache[path]
logger.debug(f"Cache miss: {path}")
return None
[docs]
def put(self, path: str, file_obj: Any) -> None:
"""
Add file to cache.
Args:
path: File path
file_obj: Open file object
"""
# If already exists, update and move to end
if path in self._cache:
self._cache.move_to_end(path)
self._cache[path] = file_obj
return
# Add new entry
self._cache[path] = file_obj
# Evict oldest if over limit
if len(self._cache) > self.max_size:
oldest_path, oldest_file = self._cache.popitem(last=False)
logger.debug(f"Evicting from cache: {oldest_path}")
# Note: uproot files don't need explicit closing
# They close automatically when garbage collected
[docs]
def clear(self) -> None:
"""Clear the cache."""
self._cache.clear()
logger.info("File cache cleared")
[docs]
def size(self) -> int:
"""Get current cache size."""
return len(self._cache)
[docs]
class FileManager:
"""
Manages opening and caching of ROOT files.
Provides safe, efficient access to local and remote ROOT files with
automatic caching and connection pooling.
"""
[docs]
def __init__(self, config: Config):
"""
Initialize file manager.
Args:
config: Server configuration
"""
self.config = config
self._cache = FileCache(config.cache.file_cache_size) if config.cache.enabled else None
self._open_files: set[str] = set()
logger.info(
f"FileManager initialized (cache: {config.cache.enabled}, "
f"max_files: {config.cache.file_cache_size})"
)
[docs]
def open(self, path: str | Path, **kwargs: Any) -> Any:
"""
Open a ROOT file with caching.
Args:
path: File path or URI
**kwargs: Additional arguments to pass to uproot.open()
Returns:
Open uproot file object
Raises:
FileNotFoundError: If file doesn't exist
OSError: If file cannot be opened
"""
path_str = str(path)
# Check cache first
if self._cache:
cached = self._cache.get(path_str)
if cached is not None:
return cached
# Open file
logger.info(f"Opening ROOT file: {path_str}")
try:
file_obj = uproot.open(path_str, **kwargs)
except FileNotFoundError as e:
logger.error(f"File not found: {path_str}")
raise FileNotFoundError(f"ROOT file not found: {path_str}") from e
except Exception as e:
logger.error(f"Failed to open {path_str}: {e}")
raise OSError(f"Failed to open ROOT file {path_str}: {e}") from e
# Add to cache
if self._cache:
self._cache.put(path_str, file_obj)
self._open_files.add(path_str)
return file_obj
[docs]
def get_file_info(self, path: str | Path) -> dict[str, Any]:
"""
Get basic information about a ROOT file.
Args:
path: File path
Returns:
Dictionary with file metadata
"""
file_obj = self.open(path)
# Get file-level metadata
info = {
"path": str(path),
"compression": str(file_obj.compression) if hasattr(file_obj, "compression") else None,
"keys": list(file_obj.keys()),
"classnames": file_obj.classnames() if hasattr(file_obj, "classnames") else {},
"trees": self.list_trees(path),
}
# Get file size if local
path_obj = Path(path)
if path_obj.exists():
info["size_bytes"] = path_obj.stat().st_size
return info
[docs]
def list_trees(self, path: str | Path) -> list[dict[str, Any]]:
"""
List all TTrees and RNTuples in a ROOT file.
Args:
path: File path
Returns:
List of tree metadata dictionaries
"""
file_obj = self.open(path)
trees = []
# Recursively find all TTrees
def find_trees(directory: Any, current_path: str = "") -> None:
for key in directory.keys():
obj = directory[key]
full_path = f"{current_path}/{key}" if current_path else key
# Check if it's a TTree or RNTuple
classname = directory.classname_of(key)
if "TTree" in classname or "RNTuple" in classname:
tree_info = {
"name": key,
"path": full_path,
"classname": classname,
"entries": obj.num_entries,
"branches": len(obj.keys()),
}
trees.append(tree_info)
# Recurse into directories
elif "TDirectory" in classname or classname == "TDirectoryFile":
find_trees(obj, full_path)
find_trees(file_obj)
return trees
[docs]
def list_histograms(self, path: str | Path) -> list[dict[str, Any]]:
"""
List all histograms in a ROOT file.
Args:
path: File path
Returns:
List of histogram metadata dictionaries
"""
file_obj = self.open(path)
histograms = []
def find_histograms(directory: Any, current_path: str = "") -> None:
for key in directory.keys():
classname = directory.classname_of(key)
full_path = f"{current_path}/{key}" if current_path else key
# Check if it's a histogram
if classname.startswith("TH") or classname.startswith("TProfile"):
obj = directory[key]
hist_info = {
"name": key,
"path": full_path,
"type": classname,
}
# Add dimension-specific info
if hasattr(obj, "axes"):
hist_info["bins"] = [len(axis) for axis in obj.axes]
if hasattr(obj, "values"):
hist_info["entries"] = int(obj.values().sum())
histograms.append(hist_info)
# Recurse into directories
elif "TDirectory" in classname or classname == "TDirectoryFile":
obj = directory[key]
find_histograms(obj, full_path)
find_histograms(file_obj)
return histograms
[docs]
def list_objects(self, path: str | Path) -> list[dict[str, Any]]:
"""
List all objects in a ROOT file.
Args:
path: File path
Returns:
List of object metadata dictionaries
"""
file_obj = self.open(path)
objects = []
def find_objects(directory: Any, current_path: str = "") -> None:
for key in directory.keys():
classname = directory.classname_of(key)
full_path = f"{current_path}/{key}" if current_path else key
objects.append(
{
"name": key,
"path": full_path,
"type": classname,
}
)
# Recurse into directories
if "TDirectory" in classname or classname == "TDirectoryFile":
obj = directory[key]
find_objects(obj, full_path)
find_objects(file_obj)
return objects
[docs]
def get_tree(self, path: str | Path, tree_name: str) -> Any:
"""
Get a specific TTree from a file.
Args:
path: File path
tree_name: Name or path to tree
Returns:
uproot TTree object
Raises:
KeyError: If tree doesn't exist
"""
file_obj = self.open(path)
try:
tree = file_obj[tree_name]
except KeyError as e:
# Try to provide helpful error message
available_trees = [t["name"] for t in self.list_trees(path)]
raise KeyError(
f"Tree '{tree_name}' not found in {path}. Available trees: {available_trees}"
) from e
return tree
[docs]
def clear_cache(self) -> None:
"""Clear the file cache."""
if self._cache:
self._cache.clear()
[docs]
def get_cache_stats(self) -> dict[str, int]:
"""
Get cache statistics.
Returns:
Dictionary with cache stats
"""
return {
"size": self._cache.size() if self._cache else 0,
"max_size": self.config.cache.file_cache_size,
"open_files": len(self._open_files),
}
[docs]
def validate_file(self, path: str | Path) -> dict[str, Any]:
"""
Validate ROOT file integrity and readability.
Args:
path: File path
Returns:
Dictionary with validation results
"""
path_obj = Path(path)
validation: dict[str, Any] = {
"path": str(path),
"valid": False,
"readable": False,
"errors": [],
"warnings": [],
"metadata": {},
}
# Check if file exists
if not path_obj.exists():
validation["errors"].append("File does not exist")
return validation
# Check file size
try:
size = path_obj.stat().st_size
validation["metadata"]["size_bytes"] = size
if size == 0:
validation["errors"].append("File is empty")
return validation
except OSError as e:
validation["errors"].append(f"Cannot access file: {e}")
return validation
# Try to open file
try:
file_obj = self.open(path)
validation["readable"] = True
except Exception as e:
validation["errors"].append(f"Cannot open file: {e}")
return validation
# Check for readable objects
try:
keys = list(file_obj.keys())
validation["metadata"]["num_objects"] = len(keys)
if len(keys) == 0:
validation["warnings"].append("File contains no objects")
except Exception as e:
validation["errors"].append(f"Cannot read file keys: {e}")
return validation
# Try to read trees
try:
trees = self.list_trees(path)
validation["metadata"]["num_trees"] = len(trees)
validation["metadata"]["trees"] = [t["name"] for t in trees]
# Check if trees are readable
for tree_info in trees:
try:
tree = file_obj[tree_info["path"]]
# Try to read first entry
if tree.num_entries > 0:
_ = tree.arrays(entry_stop=1, library="ak")
except Exception as e:
validation["warnings"].append(
f"Tree '{tree_info['name']}' may be corrupted: {e}"
)
except Exception as e:
validation["warnings"].append(f"Cannot validate trees: {e}")
# Check compression
try:
if hasattr(file_obj, "compression"):
validation["metadata"]["compression"] = str(file_obj.compression)
except Exception:
pass
# File is valid if readable and no critical errors
validation["valid"] = validation["readable"] and len(validation["errors"]) == 0
return validation
[docs]
def get_tree_info(self, path: str | Path, tree_name: str) -> dict[str, Any]:
"""
Get comprehensive metadata about a TTree or RNTuple.
Args:
path: File path
tree_name: Tree name
Returns:
Dictionary with comprehensive tree metadata
"""
tree = self.get_tree(path, tree_name)
info = {
"name": tree_name,
"entries": tree.num_entries,
"branches": len(tree.keys()),
"branch_names": list(tree.keys()),
}
# Get total size and compression info
try:
if hasattr(tree, "compression"):
info["compression"] = str(tree.compression)
except Exception:
pass
# Get basket information if available
try:
total_compressed = 0
total_uncompressed = 0
for branch_name in tree.keys():
branch = tree[branch_name]
if hasattr(branch, "compressed_bytes"):
total_compressed += branch.compressed_bytes
if hasattr(branch, "uncompressed_bytes"):
total_uncompressed += branch.uncompressed_bytes
if total_compressed > 0 and total_uncompressed > 0:
info["total_compressed_bytes"] = total_compressed
info["total_uncompressed_bytes"] = total_uncompressed
info["compression_ratio"] = total_uncompressed / total_compressed
except Exception:
pass
return info
[docs]
def get_branch_schema(
self, path: str | Path, tree_name: str, branch_name: str | None = None
) -> dict[str, Any]:
"""
Get detailed schema information for branches.
Args:
path: File path
tree_name: Tree name
branch_name: Optional specific branch (None = all branches)
Returns:
Dictionary with branch schema information
"""
tree = self.get_tree(path, tree_name)
if branch_name:
branches = [branch_name]
else:
branches = list(tree.keys())
schema = {}
for name in branches:
try:
branch = tree[name]
# Get type information
typename = str(branch.typename) if hasattr(branch, "typename") else "unknown"
# Determine if jagged
is_jagged = "[]" in typename or "vector" in typename.lower()
# Get interpretation (awkward type)
interpretation = None
if hasattr(branch, "interpretation"):
try:
interpretation = str(branch.interpretation)
except Exception:
pass
schema[name] = {
"type": typename,
"is_jagged": is_jagged,
"interpretation": interpretation,
"title": str(branch.title) if hasattr(branch, "title") else "",
}
# Get size information
if hasattr(branch, "compressed_bytes"):
schema[name]["compressed_bytes"] = branch.compressed_bytes
if hasattr(branch, "uncompressed_bytes"):
schema[name]["uncompressed_bytes"] = branch.uncompressed_bytes
except Exception as e:
logger.warning(f"Failed to get schema for branch {name}: {e}")
schema[name] = {"error": str(e)}
return schema