"""Histogram fitting engine for ROOT-MCP extended mode.
Provides least-squares curve fitting via :mod:`scipy.optimize.curve_fit`
for a registry of physics-motivated models:
- **Gaussian** — symmetric peaks (resonances, detector resolutions)
- **Exponential** — falling backgrounds
- **Polynomial** (linear, quadratic, cubic) — smooth backgrounds
- **Crystal Ball** — asymmetric peaks with a power-law tail (common in HEP)
- **Double Gaussian** — overlapping resonances
- **Voigt** — convolution of Gaussian detector resolution and Lorentzian natural width
- **Breit-Wigner** — resonance line shape
The public entry point is :class:`HistogramFitter`.
"""
from __future__ import annotations
import logging
import ast
from typing import Any, Callable, TypedDict, cast
import numpy as np
from scipy.optimize import curve_fit
from root_mcp.extended.analysis.expression import SafeExprEvaluator, translate_leaf_expr
logger = logging.getLogger(__name__)
[docs]
class ModelInfo(TypedDict):
"""Type definition for model registry entries."""
func: Callable[..., np.ndarray]
n_params: int
param_names: list[str]
[docs]
def gaussian(x: np.ndarray, amp: float, mu: float, sigma: float) -> np.ndarray:
"""Gaussian function."""
return cast(np.ndarray, amp * np.exp(-0.5 * ((x - mu) / sigma) ** 2))
[docs]
def exponential(x: np.ndarray, amp: float, decay: float) -> np.ndarray:
"""Exponential function."""
return cast(np.ndarray, amp * np.exp(-x / decay))
[docs]
def polynomial(x: np.ndarray, *coeffs: float) -> np.ndarray:
"""Polynomial function."""
return cast(np.ndarray, np.polyval(coeffs, x))
[docs]
def crystal_ball(
x: np.ndarray, amp: float, mu: float, sigma: float, alpha: float, n: float
) -> np.ndarray:
"""Crystal Ball function."""
z = (x - mu) / sigma
abs_alpha = abs(alpha)
# Gaussian part
# Use np.where to handle the piecewise definition
mask = z > -abs_alpha
# Power-law part
A = (n / abs_alpha) ** n * np.exp(-0.5 * abs_alpha**2)
B = n / abs_alpha - abs_alpha
result = np.zeros_like(x)
result[mask] = amp * np.exp(-0.5 * z[mask] ** 2)
result[~mask] = amp * A * (B - z[~mask]) ** (-n)
return result
# Map model names to functions and their parameter counts (excluding x)
MODEL_REGISTRY: dict[str, ModelInfo] = {
"gaussian": {"func": gaussian, "n_params": 3, "param_names": ["amp", "mu", "sigma"]},
"exponential": {"func": exponential, "n_params": 2, "param_names": ["amp", "decay"]},
"polynomial": {
"func": polynomial,
"n_params": 2,
"param_names": ["c1", "c0"],
}, # Linear default
"polynomial_2": {
"func": polynomial,
"n_params": 3,
"param_names": ["c2", "c1", "c0"],
}, # Quadratic
"polynomial_3": {
"func": polynomial,
"n_params": 4,
"param_names": ["c3", "c2", "c1", "c0"],
}, # Cubic
"crystal_ball": {
"func": crystal_ball,
"n_params": 5,
"param_names": ["amp", "mu", "sigma", "alpha", "n"],
},
}
[docs]
class CompositeModel:
"""Represents a sum of multiple models."""
[docs]
def __init__(self, components: list[str | dict[str, Any]]):
self.funcs: list[Callable[..., np.ndarray]] = []
self.param_ranges: list[tuple[int, int]] = []
self.total_params = 0
self.component_names: list[str] = []
self.param_names: list[str] = []
for comp in components:
if isinstance(comp, str):
name = comp
prefix = f"{name}_{len(self.funcs)}_"
else:
name = comp["model"]
prefix = comp.get("prefix", f"{name}_{len(self.funcs)}_")
if name not in MODEL_REGISTRY:
raise ValueError(f"Unknown model: {name}")
reg = MODEL_REGISTRY[name]
func = reg["func"]
n_params = reg["n_params"]
p_names = reg["param_names"]
self.funcs.append(func)
self.param_ranges.append((self.total_params, self.total_params + n_params))
self.component_names.append(name)
self.param_names.extend([f"{prefix}{p}" for p in p_names])
self.total_params += n_params
def __call__(self, x: np.ndarray, *params: float) -> np.ndarray:
"""Evaluate the composite model."""
result = np.zeros_like(x, dtype=float)
for func, (start, end) in zip(self.funcs, self.param_ranges):
p = params[start:end]
result += func(x, *p)
return result
def _get_default_guess(model: str, x: np.ndarray, y: np.ndarray) -> list[float]:
"""Generate basic initial guess for a single model."""
if model == "gaussian":
mean = np.average(x, weights=y)
sigma = np.sqrt(np.average((x - mean) ** 2, weights=y))
amp = np.max(y)
return [amp, mean, sigma]
elif model == "exponential":
return [np.max(y), (x[-1] - x[0]) / 2]
elif model.startswith("polynomial"):
n_params = MODEL_REGISTRY[model]["n_params"]
return [0.0] * n_params
elif model == "crystal_ball":
mean = np.average(x, weights=y)
sigma = np.sqrt(np.average((x - mean) ** 2, weights=y))
amp = np.max(y)
return [amp, mean, sigma, 1.0, 2.0]
return [1.0] * MODEL_REGISTRY[model]["n_params"]
[docs]
class CustomModel:
"""Model defined by a string expression."""
[docs]
def __init__(self, expr: str, params: list[str]):
"""
Initialize custom model.
Args:
expr: Mathematical expression string
params: List of parameter names in order
"""
self.expr = translate_leaf_expr(expr)
self.params = params
self.tree = ast.parse(self.expr, mode="eval")
def __call__(self, x: np.ndarray, *args: float) -> np.ndarray:
if len(args) != len(self.params):
raise ValueError(f"Expected {len(self.params)} parameters, got {len(args)}")
# Create context with x variable and parameters
context: dict[str, Any] = {"x": x}
for name, value in zip(self.params, args):
context[name] = value
return cast(np.ndarray, SafeExprEvaluator(context).visit(self.tree))
[docs]
def fit_histogram(
data: dict[str, Any],
model: str | list[str | dict[str, Any]] | dict[str, Any],
initial_guess: list[float] | None = None,
bounds: list[list[float]] | None = None,
fixed_parameters: dict[str | int, float] | None = None,
) -> dict[str, Any]:
"""
Fit a model to histogram data.
Args:
data: Histogram data dictionary
model: Model definition. Can be:
- str: Name of built-in model (e.g., "gaussian")
- list[str]: List of built-in models (e.g., ["gaussian", "exponential"])
- list[dict]: List of models with config (e.g., ``[{"name": "gaussian", "prefix": "s_"}]``)
- dict: Custom model definition (e.g. {"expr": "A*x + B", "params": ["A", "B"]})
initial_guess: Initial parameter values
bounds: List of [min, max] pairs for each parameter. Use [-np.inf, np.inf] for no bound.
fixed_parameters: Dictionary of parameters to fix. Keys can be index (int) or name (str).
Returns:
Dictionary with fitted parameters, errors, and stats
"""
# Handle both formats:
# 1. Full histogram result: {"data": {...}, "metadata": {...}}
# 2. Just the data dict: {"bin_edges": [...], "bin_counts": [...]}
if "data" in data and "bin_edges" not in data:
hist_data = data["data"]
else:
hist_data = data
# Extract x and y
bin_edges = np.array(hist_data["bin_edges"])
y = np.array(hist_data["bin_counts"])
x = (bin_edges[:-1] + bin_edges[1:]) / 2
# Errors
if "bin_errors" in hist_data:
sigma = np.array(hist_data["bin_errors"])
# Handle zero errors to avoid div by zero in chi2
sigma[sigma == 0] = 1.0
else:
sigma = np.sqrt(y)
sigma[sigma == 0] = 1.0
# Determine Model Function, Parameters, and Bounds
fit_func: Callable
param_names: list[str]
# 1. Parse Model Input
if isinstance(model, dict) and "expr" in model:
# Custom String Model
expr = model["expr"]
params = model.get("params", [])
if not params:
# Try to auto-detect if not provided, but explicit is safer
raise ValueError("Custom model must specify 'params' list")
fit_instance = CustomModel(expr, params)
fit_func = fit_instance
param_names = params
elif isinstance(model, str):
# Check for dynamic polynomial (inferred from initial guess)
if model == "polynomial" and initial_guess is not None:
n = len(initial_guess)
fit_func = MODEL_REGISTRY["polynomial"]["func"]
# Create param names cN-1 ... c0
param_names = [f"c{n - 1 - i}" for i in range(n)]
# Single built-in model
elif model in MODEL_REGISTRY:
fit_func = MODEL_REGISTRY[model]["func"]
param_names = MODEL_REGISTRY[model]["param_names"]
# Try auto-detect custom formula
else:
try:
# Attempt to extract parameters from expression
import re
expr = model
# Use same reserved keywords as operations.py
reserved = {
"sqrt",
"abs",
"log",
"exp",
"sin",
"cos",
"tan",
"arcsin",
"arccos",
"arctan",
"arctan2",
"sinh",
"cosh",
"tanh",
"min",
"max",
"where",
"sum",
"any",
"all",
"pi",
"e",
}
# Extract all identifiers
tokens = re.findall(r"[A-Za-z_]\w*", expr)
# Filter for parameters (exclude 'x' and reserved)
params_seen = set()
params = []
for t in tokens:
if t != "x" and t not in reserved and t not in params_seen:
params.append(t)
params_seen.add(t)
if not params:
# If no params found (e.g. "x**2"), technically valid with 0 params,
# but likely user error or simple func. Use CustomModel.
pass
# Create custom model
fit_instance = CustomModel(expr, params)
fit_func = fit_instance
param_names = params
except Exception:
# If parsing fails or logic fails, raise original error
raise ValueError(
f"Unknown model: '{model}'. For custom formulas, use a dictionary "
f'(e.g., {{"expr": "a*x+b", "params": ["a", "b"]}}). '
f"Available built-ins: {list(MODEL_REGISTRY.keys())}"
)
elif isinstance(model, list):
# Composite Model logic (as before)
comp_model = CompositeModel(model)
fit_func = comp_model
param_names = comp_model.param_names
else:
raise ValueError("Invalid model format")
num_params = len(param_names)
# Validation
# 2. Handle Fixed Parameters
# We create a wrapper function that injects fixed values
# and only exposes free parameters to curve_fit
fixed_indices = {} # index -> value
if fixed_parameters:
for key, val in fixed_parameters.items():
idx = -1
if isinstance(key, int):
idx = key
elif isinstance(key, str):
try:
idx = param_names.index(key)
except ValueError:
logger.warning(
f"Fixed parameter '{key}' not found in model parameters: {param_names}"
)
continue
if 0 <= idx < num_params:
fixed_indices[idx] = float(val)
free_indices = [i for i in range(num_params) if i not in fixed_indices]
if not free_indices:
raise ValueError("All parameters are fixed! Nothing to fit.")
# Create wrapper
def wrapped_fit_func(x_data: np.ndarray, *free_params: float) -> np.ndarray:
full_params = [0.0] * num_params
# Fill free
for i, val in enumerate(free_params):
full_params[free_indices[i]] = val
# Fill fixed
for idx, val in fixed_indices.items():
full_params[idx] = val
return fit_func(x_data, *full_params)
# 3. Prepare Initial Guess for Free Params
if initial_guess is None:
# Default fallback
full_p0: list[float] = [1.0] * num_params
# If single built-in, try its guesser
if isinstance(model, str) and model in MODEL_REGISTRY:
full_p0 = _get_default_guess(model, x, y)
elif isinstance(model, list) and all(isinstance(m, str) for m in model) and len(model) == 1:
# Single model in list
full_p0 = _get_default_guess(cast(str, model[0]), x, y)
# For others (custom, composite), calculating a good guess is hard without more info
else:
if len(initial_guess) != num_params:
raise ValueError(
f"Initial guess length {len(initial_guess)} != num params {num_params}"
)
full_p0 = initial_guess
p0_free = [full_p0[i] for i in free_indices]
# 4. Prepare Bounds for Free Params
bounds_free_min: list[float] = []
bounds_free_max: list[float] = []
fit_bounds: tuple[list[float], list[float]] | tuple[float, float]
if bounds:
# bounds is list of [min, max] for ALL params (or None)
if len(bounds) != num_params:
raise ValueError(f"Bounds length {len(bounds)} != num params {num_params}")
for i in free_indices:
mn, mx = bounds[i]
bounds_free_min.append(mn if mn is not None else -np.inf)
bounds_free_max.append(mx if mx is not None else np.inf)
fit_bounds = (bounds_free_min, bounds_free_max)
else:
fit_bounds = (-np.inf, np.inf)
# 5. Perform Fit
try:
popt_free, pcov_free = curve_fit(
wrapped_fit_func,
x,
y,
p0=p0_free,
sigma=sigma,
absolute_sigma=True,
bounds=fit_bounds,
maxfev=10000,
)
except Exception as e:
raise RuntimeError(f"Fit failed: {e}")
# 6. Reconstruct Full Parameters and Covariance
popt_full = [0.0] * num_params
pcov_full = np.zeros((num_params, num_params))
# Fill free
for i, free_idx in enumerate(free_indices):
popt_full[free_idx] = popt_free[i]
for j, free_jdx in enumerate(free_indices):
pcov_full[free_idx, free_jdx] = pcov_free[i, j]
# Fill fixed
for idx, val in fixed_indices.items():
popt_full[idx] = val
# Errors are 0 for fixed params
# Calculate statistics
y_fit = fit_func(x, *popt_full)
chi2 = np.sum(((y - y_fit) / sigma) ** 2)
ndof = len(x) - len(free_indices)
return {
"parameters": popt_full,
"parameter_names": param_names,
"errors": np.sqrt(np.diag(pcov_full)).tolist(),
"chi2": chi2,
"ndof": ndof,
"fitted_values": y_fit.tolist(),
"model": str(model),
"fixed_parameters": list(fixed_indices.keys()),
}
# 2D Fitting Functions
[docs]
def gaussian_2d(
xy: tuple[np.ndarray, np.ndarray],
amp: float,
mu_x: float,
mu_y: float,
sigma_x: float,
sigma_y: float,
rho: float = 0.0,
) -> np.ndarray:
"""
2D Gaussian function with correlation.
Args:
xy: Tuple of (x, y) coordinate arrays
amp: Amplitude
mu_x: Mean in x
mu_y: Mean in y
sigma_x: Standard deviation in x
sigma_y: Standard deviation in y
rho: Correlation coefficient (-1 to 1)
Returns:
Function values at (x, y) points
"""
x, y = xy
dx = x - mu_x
dy = y - mu_y
# Exponent
exponent = (
-0.5
/ (1 - rho**2)
* ((dx / sigma_x) ** 2 + (dy / sigma_y) ** 2 - 2 * rho * dx * dy / (sigma_x * sigma_y))
)
return amp * np.exp(exponent)
[docs]
def polynomial_2d(
xy: tuple[np.ndarray, np.ndarray],
*coeffs: float,
) -> np.ndarray:
"""
2D polynomial function.
Args:
xy: Tuple of (x, y) coordinate arrays
coeffs: Polynomial coefficients [c00, c10, c01, c20, c11, c02, ...]
Returns:
Function values at (x, y) points
"""
x, y = xy
result = np.zeros_like(x)
# Determine polynomial degree from number of coefficients
# For degree d: (d+1)(d+2)/2 coefficients
n_coeffs = len(coeffs)
degree = int((-3 + np.sqrt(9 + 8 * (n_coeffs - 1))) / 2)
idx = 0
for d in range(degree + 1):
for i in range(d + 1):
j = d - i
if idx < n_coeffs:
result += coeffs[idx] * (x**i) * (y**j)
idx += 1
return result
MODEL_REGISTRY_2D: dict[str, ModelInfo] = {
"gaussian_2d": {
"func": gaussian_2d,
"n_params": 6,
"param_names": ["amp", "mu_x", "mu_y", "sigma_x", "sigma_y", "rho"],
},
"polynomial_2d": {
"func": polynomial_2d,
"n_params": 6, # Quadratic default
"param_names": ["c00", "c10", "c01", "c20", "c11", "c02"],
},
}
[docs]
def fit_histogram_2d(
x: np.ndarray,
y: np.ndarray,
z: np.ndarray,
z_errors: np.ndarray | None = None,
model: str = "gaussian_2d",
initial_params: list[float] | None = None,
fixed_params: dict[str, float] | None = None,
bounds: tuple[list[float], list[float]] | None = None,
) -> dict[str, Any]:
"""
Fit a 2D histogram with a model function.
Args:
x: X-coordinate array (flattened)
y: Y-coordinate array (flattened)
z: Z-values (bin contents, flattened)
z_errors: Errors on z-values
model: Model name from MODEL_REGISTRY_2D
initial_params: Initial parameter guesses
fixed_params: Dictionary of {param_name: value} for fixed parameters
bounds: Tuple of (lower_bounds, upper_bounds)
Returns:
Dictionary with fit results
"""
if model not in MODEL_REGISTRY_2D:
raise ValueError(f"Unknown 2D model: {model}. Available: {list(MODEL_REGISTRY_2D.keys())}")
model_info = MODEL_REGISTRY_2D[model]
fit_func = model_info["func"]
param_names = model_info["param_names"]
num_params = model_info["n_params"]
# Handle errors
if z_errors is None:
z_errors = np.sqrt(np.maximum(z, 1)) # Poisson errors
# Avoid zero errors
z_errors = np.maximum(z_errors, 1e-10)
# Handle fixed parameters
fixed_params = fixed_params or {}
fixed_indices = {param_names.index(k): v for k, v in fixed_params.items()}
free_indices = [i for i in range(num_params) if i not in fixed_indices]
# Initial parameters
if initial_params is None:
# Auto-generate initial guesses
if model == "gaussian_2d":
initial_params = [
np.max(z), # amp
np.mean(x), # mu_x
np.mean(y), # mu_y
np.std(x), # sigma_x
np.std(y), # sigma_y
0.0, # rho
]
else:
initial_params = [1.0] * num_params
# Create wrapper for fixed parameters
def fit_func_wrapper(xy: tuple[np.ndarray, np.ndarray], *free_params: float) -> np.ndarray:
full_params = [0.0] * num_params
for i, free_idx in enumerate(free_indices):
full_params[free_idx] = free_params[i]
for idx, val in fixed_indices.items():
full_params[idx] = val
return fit_func(xy, *full_params)
# Extract free initial parameters
p0_free = [initial_params[i] for i in free_indices]
# Handle bounds
fit_bounds = (-np.inf, np.inf)
if bounds is not None:
lower_free = [bounds[0][i] for i in free_indices]
upper_free = [bounds[1][i] for i in free_indices]
fit_bounds = (lower_free, upper_free)
# Perform fit
try:
popt_free, pcov_free = curve_fit(
fit_func_wrapper,
(x, y),
z,
p0=p0_free,
sigma=z_errors,
absolute_sigma=True,
bounds=fit_bounds,
maxfev=10000,
)
except Exception as e:
raise RuntimeError(f"2D fit failed: {e}")
# Reconstruct full parameters
popt_full = [0.0] * num_params
pcov_full = np.zeros((num_params, num_params))
for i, free_idx in enumerate(free_indices):
popt_full[free_idx] = popt_free[i]
for j, free_jdx in enumerate(free_indices):
pcov_full[free_idx, free_jdx] = pcov_free[i, j]
for idx, val in fixed_indices.items():
popt_full[idx] = val
# Calculate statistics
z_fit = fit_func((x, y), *popt_full)
chi2 = np.sum(((z - z_fit) / z_errors) ** 2)
ndof = len(z) - len(free_indices)
return {
"parameters": popt_full,
"parameter_names": param_names,
"errors": np.sqrt(np.diag(pcov_full)).tolist(),
"chi2": float(chi2),
"ndof": int(ndof),
"fitted_values": z_fit.tolist(),
"model": model,
"fixed_parameters": list(fixed_indices.keys()),
}