Source code for root_mcp.extended.analysis.histograms

"""Advanced histogram operations for extended mode."""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any

import awkward as ak
import numpy as np

if TYPE_CHECKING:
    from root_mcp.config import Config
    from root_mcp.core.io.file_manager import FileManager

logger = logging.getLogger(__name__)


[docs] class HistogramOperations: """ Advanced histogram operations with scipy-based fitting. Extends core histogram capabilities with: - 2D and 3D histograms - Profile histograms - Weighted histograms with proper error propagation - Histogram arithmetic """
[docs] def __init__(self, config: Config, file_manager: FileManager): """ Initialize histogram operations. Args: config: Server configuration file_manager: File manager instance """ self.config = config self.file_manager = file_manager
[docs] def compute_histogram_1d( self, path: str, tree_name: str, branch: str, bins: int, range: tuple[float, float] | None = None, selection: str | None = None, weights: str | None = None, ) -> dict[str, Any]: """ Compute a 1D histogram with advanced features. Args: path: File path tree_name: Tree name branch: Branch to histogram bins: Number of bins range: (min, max) for histogram selection: Optional cut expression weights: Optional weight branch Returns: Histogram data with metadata """ tree = self.file_manager.get_tree(path, tree_name) # Validate bins max_bins = self.config.extended.histogram.max_bins_1d if bins > max_bins: raise ValueError(f"Number of bins ({bins}) exceeds maximum ({max_bins})") # Read data branches_to_read = [branch] if weights: branches_to_read.append(weights) arrays = tree.arrays( filter_name=branches_to_read, cut=selection, library="ak", ) # Get data data = arrays[branch] if self._is_jagged(data): data = ak.flatten(data, axis=None) data_np = np.asarray(data) # Get weights if specified weights_np = None if weights: weights_data = arrays[weights] if self._is_jagged(weights_data): weights_data = ak.flatten(weights_data, axis=None) weights_np = np.asarray(weights_data) # Determine range if range is None: data_finite = data_np[np.isfinite(data_np)] if len(data_finite) == 0: raise ValueError(f"No finite values in branch {branch}") range = (float(np.min(data_finite)), float(np.max(data_finite))) # Compute histogram counts, edges = np.histogram(data_np, bins=bins, range=range, weights=weights_np) # Compute bin centers centers = (edges[:-1] + edges[1:]) / 2 # Compute errors if weights_np is None: errors = np.sqrt(counts) else: errors_sq, _ = np.histogram(data_np, bins=bins, range=range, weights=weights_np**2) errors = np.sqrt(errors_sq) # Count overflow/underflow underflow = np.sum(data_np < range[0]) overflow = np.sum(data_np > range[1]) return { "data": { "bin_edges": edges.tolist(), "bin_centers": centers.tolist(), "bin_counts": counts.tolist(), "bin_errors": errors.tolist(), "entries": int(np.sum(counts)), "underflow": int(underflow), "overflow": int(overflow), "range": range, }, "metadata": { "operation": "compute_histogram_1d", "branch": branch, "bins": bins, "weighted": weights is not None, "selection": selection, }, }
[docs] def compute_histogram_2d( self, path: str, tree_name: str, branch_x: str, branch_y: str, bins_x: int, bins_y: int, range_x: tuple[float, float] | None = None, range_y: tuple[float, float] | None = None, selection: str | None = None, weights: str | None = None, ) -> dict[str, Any]: """ Compute a 2D histogram. Args: path: File path tree_name: Tree name branch_x: X-axis branch branch_y: Y-axis branch bins_x: Number of bins in X bins_y: Number of bins in Y range_x: (min, max) for X axis range_y: (min, max) for Y axis selection: Optional cut expression weights: Optional weight branch Returns: 2D histogram data """ tree = self.file_manager.get_tree(path, tree_name) # Validate bins max_bins = self.config.extended.histogram.max_bins_2d if bins_x > max_bins or bins_y > max_bins: raise ValueError( f"Number of bins ({bins_x}x{bins_y}) exceeds maximum ({max_bins}x{max_bins})" ) # Read data branches_to_read = [branch_x, branch_y] if weights: branches_to_read.append(weights) arrays = tree.arrays( filter_name=branches_to_read, cut=selection, library="ak", ) # Get data data_x = arrays[branch_x] data_y = arrays[branch_y] if self._is_jagged(data_x): data_x = ak.flatten(data_x, axis=None) if self._is_jagged(data_y): data_y = ak.flatten(data_y, axis=None) data_x_np = np.asarray(data_x) data_y_np = np.asarray(data_y) # Get weights if specified weights_np = None if weights: weights_data = arrays[weights] if self._is_jagged(weights_data): weights_data = ak.flatten(weights_data, axis=None) weights_np = np.asarray(weights_data) # Determine ranges if range_x is None: data_finite = data_x_np[np.isfinite(data_x_np)] if len(data_finite) == 0: raise ValueError(f"No finite values in branch {branch_x}") range_x = (float(np.min(data_finite)), float(np.max(data_finite))) if range_y is None: data_finite = data_y_np[np.isfinite(data_y_np)] if len(data_finite) == 0: raise ValueError(f"No finite values in branch {branch_y}") range_y = (float(np.min(data_finite)), float(np.max(data_finite))) # Compute 2D histogram counts, edges_x, edges_y = np.histogram2d( data_x_np, data_y_np, bins=[bins_x, bins_y], range=[range_x, range_y], weights=weights_np, ) # Compute bin centers centers_x = (edges_x[:-1] + edges_x[1:]) / 2 centers_y = (edges_y[:-1] + edges_y[1:]) / 2 # Compute errors if weights_np is None: errors = np.sqrt(counts) else: errors_sq, _, _ = np.histogram2d( data_x_np, data_y_np, bins=[bins_x, bins_y], range=[range_x, range_y], weights=weights_np**2, ) errors = np.sqrt(errors_sq) return { "data": { "bin_edges_x": edges_x.tolist(), "bin_edges_y": edges_y.tolist(), "bin_centers_x": centers_x.tolist(), "bin_centers_y": centers_y.tolist(), "bin_counts": counts.tolist(), "bin_errors": errors.tolist(), "entries": int(np.sum(counts)), "range_x": range_x, "range_y": range_y, }, "metadata": { "operation": "compute_histogram_2d", "branch_x": branch_x, "branch_y": branch_y, "bins_x": bins_x, "bins_y": bins_y, "weighted": weights is not None, "selection": selection, }, }
[docs] def compute_profile( self, path: str, tree_name: str, branch_x: str, branch_y: str, bins: int, range_x: tuple[float, float] | None = None, selection: str | None = None, ) -> dict[str, Any]: """ Compute a profile histogram (mean of Y vs X). Args: path: File path tree_name: Tree name branch_x: X-axis branch branch_y: Y-axis branch (to be averaged) bins: Number of bins in X range_x: (min, max) for X axis selection: Optional cut expression Returns: Profile histogram data """ tree = self.file_manager.get_tree(path, tree_name) # Read data arrays = tree.arrays( filter_name=[branch_x, branch_y], cut=selection, library="ak", ) # Get data data_x = arrays[branch_x] data_y = arrays[branch_y] if self._is_jagged(data_x): data_x = ak.flatten(data_x) if self._is_jagged(data_y): data_y = ak.flatten(data_y) data_x_np = ak.to_numpy(data_x) data_y_np = ak.to_numpy(data_y) # Determine range if range_x is None: data_finite = data_x_np[np.isfinite(data_x_np)] if len(data_finite) == 0: raise ValueError(f"No finite values in branch {branch_x}") range_x = (float(np.min(data_finite)), float(np.max(data_finite))) # Compute bin edges edges = np.linspace(range_x[0], range_x[1], bins + 1) centers = (edges[:-1] + edges[1:]) / 2 # Compute mean and error in each bin means = np.zeros(bins) errors = np.zeros(bins) entries = np.zeros(bins, dtype=int) for i in range(bins): mask = (data_x_np >= edges[i]) & (data_x_np < edges[i + 1]) if i == bins - 1: # Include right edge in last bin mask = (data_x_np >= edges[i]) & (data_x_np <= edges[i + 1]) y_in_bin = data_y_np[mask] if len(y_in_bin) > 0: means[i] = np.mean(y_in_bin) errors[i] = np.std(y_in_bin) / np.sqrt(len(y_in_bin)) # Standard error entries[i] = len(y_in_bin) return { "data": { "bin_edges": edges.tolist(), "bin_centers": centers.tolist(), "bin_means": means.tolist(), "bin_errors": errors.tolist(), "bin_entries": entries.tolist(), "range": range_x, }, "metadata": { "operation": "compute_profile", "branch_x": branch_x, "branch_y": branch_y, "bins": bins, "selection": selection, }, }
@staticmethod def _is_jagged(array: ak.Array) -> bool: """Check if array is jagged (variable-length).""" try: layout = ak.to_layout(array) # Check the top-level layout type name = type(layout).__name__ # ListOffsetArray and ListArray indicate jagged/variable-length arrays return "ListArray" in name or "ListOffset" in name except Exception: return False