Source code for gdt.core.background.binned

# CONTAINS TECHNICAL DATA/COMPUTER SOFTWARE DELIVERED TO THE U.S. GOVERNMENT WITH UNLIMITED RIGHTS
#
# Contract No.: CA 80MSFC17M0022
# Contractor Name: Universities Space Research Association
# Contractor Address: 7178 Columbia Gateway Drive, Columbia, MD 21046
#
# Copyright 2017-2022 by Universities Space Research Association (USRA). All rights reserved.
#
# Developed by: William Cleveland and Adam Goldstein
#               Universities Space Research Association
#               Science and Technology Institute
#               https://sti.usra.edu
#
# Developed by: Daniel Kocevski
#               National Aeronautics and Space Administration (NASA)
#               Marshall Space Flight Center
#               Astrophysics Branch (ST-12)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing permissions and limitations under the
# License.
#
import warnings
import numpy as np
from scipy.interpolate import CubicSpline
from scipy.stats import norm
from scipy.optimize import curve_fit
from statsmodels.nonparametric.smoothers_lowess import lowess
from gdt.core.data_primitives import TimeChannelBins

__all__ = ['Polynomial', 'RoboLowess']


[docs]class Polynomial(): """Class for performing a polynomial fit on Time-Energy data. The fit is performed over the time axis, treating each energy channel separately, although the fits are performed simultaneously. Parameters: counts (np.array): The array of counts in each bin, shape (``num_times``, ``num_chans``) tstart (np.array): The low-value edges of the time bins, shape (``num_times``,) tstop (np.array): The high-value edges of the time bins, shape (``num_times``,) exposure (np.array): The exposure of each bin, shape (``num_times``,) """ def __init__(self, counts, tstart, tstop, exposure): self._tstart = tstart self._tstop = tstop self._rate = counts / exposure[:, np.newaxis] self._livetime = exposure self._numtimes, self._numchans = self._rate.shape self._chisq = None self._dof = None self._order = None self._coeff = None self._covar = None @property def dof(self): """(np.array): The degrees-of-freedom for each channel""" return self._dof @property def statistic(self): """(np.array): The fit chi-squared statistic for each channel""" return self._chisq @property def statistic_name(self): """(str): 'chisq'""" return 'chisq'
[docs] def fit(self, order=0): """Fit the data with a polynomial. Model variances are used for chi-squared via two fitting passes. Adapted from the RMfit polynomial fitter. Args: order (int): The order of the polynomial Returns: (np.array, np.array): The fitted model value and model uncertainty \ at each input bin """ assert order >= 0, 'Polynomial order must be non-negative' self._order = order self._coeff = np.zeros((order + 1, self._numchans)) self._covar = np.zeros((order + 1, order + 1, self._numchans)) # get basis functions and set up weights array tstart, tstop = self._tstart, self._tstop basis_func = self._eval_basis(tstart, tstop) weights = np.zeros((self._order + 1, self._numtimes, self._numchans)) # Two-pass fitting # First pass uses the data variances calculated from data rates # Second pass uses the data variances calculated from model rates # 1) rate * livetime = counts # 2) variance of counts = counts # 3) variance of rate = variance of counts/livetime^2 = rate/livetime for iPass in range(2): if np.max(self._rate) <= 0.0: continue if iPass == 0: variance = self._rate / self._livetime[:, np.newaxis] idx = variance > 0.0 for iCoeff in range(self._order + 1): weights[iCoeff, idx] = basis_func[iCoeff, idx] / variance[ idx] else: variance = model / self._livetime[:, np.newaxis] idx = variance > 0.0 if np.sum(idx) > 0: for iCoeff in range(self._order + 1): weights[iCoeff, idx] = basis_func[iCoeff, idx] / \ variance[idx] else: raise RuntimeError( 'SEVERE ERROR: Background model negative') # covariance matrix basis_func_list = basis_func.swapaxes(0,2).swapaxes(1,2) weights_list = weights.swapaxes(0,2) rates_list = self._rate.T covar = np.array( list(map(np.dot, basis_func_list, weights_list))).T coeff = np.array(list(map(np.dot, rates_list, weights_list))).T if self._order >= 1: self._covar = np.linalg.inv(covar.T).T else: self._covar = 1.0 / covar # coefficients coeff_list = coeff.T covar_list = self._covar.T coeff = np.array(list(map(np.dot, coeff_list, covar_list))).T # evaluate model self._coeff = coeff model = self._eval_model(tstart, tstop) # evaluate model uncertainty model_uncert = self._eval_uncertainty(tstart, tstop) # evaluate goodness-of-fit self._chisq, self._dof = self._calc_chisq(model) return model, model_uncert
[docs] def interpolate(self, tstart, tstop): """Interpolation of the fitted polynomial Args: tstart (np.array): The starting edge of each bin tstop (np.array): The ending edge of each bin Returns: (np.array, np.array): The interpolated model value and model uncertainty in each bin """ interp = self._eval_model(tstart, tstop) interp_uncert = self._eval_uncertainty(tstart, tstop) return interp, interp_uncert
def _calc_chisq(self, model): """Calculate the chi-squared goodness-of-fit for the fitted model. Args: model (np.array): The fitted model, shape (num_times, num_chans) Returns: (np.array, np.array) : The chi-squared goodness-of-fit and degrees-of-freedom for each fitted channel """ variance = model / self._livetime[:, np.newaxis] # do not calculate using bins with value <= 0.0 idx = self._rate > 0.0 chisq = [np.sum( (self._rate[idx[:, i], i] - model[idx[:, i], i]) ** 2 / variance[ idx[:, i], i]) \ for i in range(self._numchans)] chisq = np.array(chisq) dof = np.sum(idx, axis=0) - (self._order + 1.0) return chisq, dof def _eval_basis(self, tstart, tstop): """Evaluates basis functions, which are the various polynomials averaged over the time bins. Args: tstart (np.array): The starting edge of each bin tstop (np.array): The ending edge of each bin Returns: (np.array): The basis functions for each bin, shape \ (order + 1, num_times, num_chans) """ numtimes = tstart.size dt = tstop - tstart zerowidth = (dt == 0.0) tstop[zerowidth] += 2e-6 dt[zerowidth] = 2e-6 basis_func = np.array( [(tstop ** (i + 1.0) - tstart ** (i + 1.0)) / ((i + 1.0) * dt) \ for i in range(self._order + 1)]) return np.tile(basis_func[:, :, np.newaxis], self._numchans) def _eval_model(self, tstart, tstop): """Evaluates the fitted model over the data Args: tstart (np.array): The starting edge of each bin tstop (np.array): The ending edge of each bin Returns: (np.array): The model value for each bin, shape \ (num_times, num_chans) """ numtimes = tstart.size dt = tstop - tstart zerowidth = (dt == 0.0) tstop[zerowidth] += 2e-6 dt[zerowidth] = 2e-6 model = np.zeros((numtimes, self._numchans)) for i in range(self._order + 1): model += (self._coeff[i, :] * (tstop[:, np.newaxis] ** (i + 1.0) - \ tstart[:, np.newaxis] ** (i + 1.0)) / \ ((i + 1.0) * dt[:, np.newaxis])).astype(float) if model.ndim == 1: model = model.reshape(-1, 1) return model def _eval_uncertainty(self, tstart, tstop): """Evaluates the uncertainty in the model-predicted values for the data intervals based on the uncertainty in the model coefficients. Args: tstart (np.array): The starting edge of each bin tstop (np.array): The ending edge of each bin Returns: (np.array): The model uncertainty for each bin, shape \ (num_times, num_chans) """ numtimes = tstart.size uncertainty = np.zeros((numtimes, self._numchans)) basis_func = self._eval_basis(tstart, tstop) # formal propagation of uncertainty of fit coefficients to uncertainty of model covar_list = self._covar.T basis_func_list = basis_func.swapaxes(0,2).swapaxes(1,2) for i in range(numtimes): dot1 = np.array(list(map(np.dot, covar_list, basis_func_list[:, :, i]))) uncertainty[i, :] = np.array(list(map(np.dot, basis_func_list[:, :, i], dot1))) uncertainty = np.sqrt(uncertainty) return uncertainty
[docs]class RoboLowess: """Background fitting using LOWESS with iterative sigma-clipping. Args: counts (np.ndarray): Count array, shape (num_times, num_channels) tstart (np.ndarray): Time bin start edges, shape (num_times,) tstop (np.ndarray): Time bin stop edges, shape (num_times,) exposure (np.ndarray): Exposure of each bin, shape (num_times,) first_pass_chan_range (tuple, optional): (chan_min, chan_max) for Pass 1 """ def __init__(self, counts, tstart, tstop, exposure, *, first_pass_chan_range=None): num_chans = counts.shape[1] self._data = TimeChannelBins(counts, tstart, tstop, exposure, np.arange(num_chans)) if first_pass_chan_range is None: self._first_pass_chan_min = 0 self._first_pass_chan_max = num_chans-1 else: self._first_pass_chan_min = first_pass_chan_range[0] self._first_pass_chan_max = first_pass_chan_range[1] self._backgrounds = None self._pre_mask = None self._suppress_auto_refine = False self._refined_mask = None self._signal_mask = None self._refit_applied = False self._chisq = None self._dof = None self._statistic_name = 'chisq' @property def dof(self): """(np.array): The degrees-of-freedom for each channel""" return self._dof @property def statistic(self): """(np.array): The fit chi-squared statistic for each channel""" return self._chisq @property def statistic_name(self): """(str): 'chisq'""" return self._statistic_name
[docs] def fit(self, win_size=None, min_frac=0.4, max_frac=0.95, spline_bc_type='clamped', lowess_iter=5, refit_after_clipping=True): """Fit background model using two-pass LOWESS with sigma-clipping. Args: win_size (float, optional): Absolute smoothing window in seconds. Converted internally to a LOWESS fraction via ``frac = win_size / data_range``. If not provided, the fraction is auto-computed from the data. min_frac (float): Minimum window fraction (default 0.4) max_frac (float): Maximum window fraction (default 0.95) spline_bc_type (str): Spline boundary condition (default 'clamped') lowess_iter (int): LOWESS robustness iterations (default 5) refit_after_clipping (bool): If True, runs a final LOWESS fit on the retained background bins after iterative clipping. For high-time-resolution TTE data, such as 64 ms TTE binning, set to False to skip this final refit. Returns: (np.ndarray, np.ndarray, list): Removed bin times, interpolated background at removed times, iteration diagnostics """ warnings.warn( "The uncertainty output is untested and always zero. " "Do not use it for scientific interpretation.", UserWarning, stacklevel=2) data_range = float(self._data.tstop[-1] - self._data.tstart[0]) if win_size is not None: win_size = float(win_size) if win_size > data_range: raise ValueError( "win_size must not exceed the available time range for " "the background fit " f"(win_size={win_size}, available_range={data_range}).") frac = win_size / data_range if frac < min_frac: warnings.warn( f"win_size={win_size} s yields a LOWESS fraction of " f"{frac:.3f}, which is below the recommended minimum of " f"{min_frac}. The fit may be unstable.", stacklevel=2) elif frac > max_frac: warnings.warn( f"win_size={win_size} s yields a LOWESS fraction of " f"{frac:.3f}, which is above the recommended maximum of " f"{max_frac}. The data may be over-smoothed.", stacklevel=2) else: temporal_resolution = float(np.median(self._data.tstop - self._data.tstart)) if (not np.isfinite(temporal_resolution)) or (temporal_resolution <= 0.0): temporal_resolution = 1.024 raw = 1.1 * (data_range ** -0.12) * (temporal_resolution ** -0.15) + 0.15 frac = min(max(min_frac, raw), max_frac) num_channels = self._data.rates.shape[1] backgrounds = np.zeros_like(self._data.rates) # Pass 1: Summed lightcurve to build background mask summed_data = self._data.integrate_channels(self._first_pass_chan_min, self._first_pass_chan_max) time_summed_all = np.asarray(summed_data.centroids).ravel() rate_summed_all = np.asarray(summed_data.rates).ravel() exp_summed = np.asarray(summed_data.exposure).ravel() if time_summed_all.size > 10: (times, bg_rates, bg_mask ) = self._residual_fit( time_summed_all, rate_summed_all, exp_summed, frac=frac, lowess_iter=lowess_iter, sigma_thresh=3.0, return_mask=True, use_pre_mask=True, refit_after_clipping=refit_after_clipping) # Background model on all summed times spline = CubicSpline(times, bg_rates, bc_type=spline_bc_type, extrapolate=True) bg_rates_full = spline(time_summed_all) else: # Fallback: everything is background with constant rate bg_mask = np.ones_like(time_summed_all, dtype=bool) bg_rates_full = np.full_like(rate_summed_all, np.nanmean(rate_summed_all), dtype=float) # Track what we removed and interpolated background removed_bins_times = time_summed_all[~bg_mask] interp_back = (bg_rates_full[~bg_mask] * exp_summed[~bg_mask]) # Store for external access self._last_pass1_mask = bg_mask self._last_iteration_data = [] # Background times from Pass 1 bg_times_chan = time_summed_all[bg_mask] bg_exp_chan = exp_summed[bg_mask] # Pass 2: Per-channel refit using background times only for channel_index in range(num_channels): ch_rates = self._data.rates[bg_mask, channel_index] if bg_times_chan.size > 10: bg_times_ch, bg_rates_ch = self._residual_fit( bg_times_chan, ch_rates, bg_exp_chan, frac=frac, lowess_iter=lowess_iter, sigma_thresh=3.0, return_mask=False, use_pre_mask=False, refit_after_clipping=refit_after_clipping) # Spline to full time grid spline_channel = CubicSpline( bg_times_ch, bg_rates_ch, bc_type=spline_bc_type, extrapolate=True) interp_bg_ch = spline_channel( self._data.time_centroids) else: # Not enough bins: use mean if ch_rates.size: mean_channel_rate = np.nanmean(ch_rates) else: mean_channel_rate = 0.0 interp_bg_ch = np.full_like( self._data.time_centroids, mean_channel_rate) backgrounds[:, channel_index] = interp_bg_ch # Store the full 2D background model (time × channel) self._backgrounds = backgrounds # Optional: Gaussian+Exponential refinement if not self._suppress_auto_refine: obs_counts = rate_summed_all * exp_summed bg_counts = bg_rates_full * exp_summed res_summed = (obs_counts - bg_counts) / np.sqrt( np.maximum(bg_counts, 1.0)) # Check if refinement changes background bg_before = self._backgrounds.copy() refined_mask, signal_mask = self._refit_with_residual_model( residuals=res_summed, threshold=0.5, win_size=win_size, spline_bc_type=spline_bc_type, lowess_iter=lowess_iter, first_pass_mask=bg_mask, refit_after_clipping=refit_after_clipping) if refined_mask is None: self._refit_applied = False else: self._refit_applied = not np.allclose( self._backgrounds, bg_before) # Compute fit statistics on background bins (per channel) bg_mask = getattr(self, '_last_pass1_mask', None) if bg_mask is None: bg_mask = np.ones(self._data.time_centroids.size, dtype=bool) self._compute_statistics(bg_mask) return (removed_bins_times, interp_back, self._last_iteration_data)
[docs] def interpolate(self, tstart, tstop, exposure=None, channel_range=None): """Interpolate the background model at given bin edges. Args: tstart (np.ndarray): Bin start edges. tstop (np.ndarray): Bin stop edges. channel_range (tuple, optional): (chan_min, chan_max) inclusive. Returns: (np.ndarray, np.ndarray): The interpolated model value and model uncertainty in each bin with shape (num_bins, num_channels). """ tstart = np.asarray(tstart, dtype=float) tstop = np.asarray(tstop, dtype=float) centroids = 0.5 * (tstart + tstop) base_times = np.asarray(self._data.time_centroids, dtype=float) num_channels = self._backgrounds.shape[1] if channel_range is None: chan_slice = slice(0, num_channels) else: a, b = channel_range i_min = int(max(0, min(a, b))) i_max = int(min(num_channels - 1, max(a, b))) chan_slice = slice(i_min, i_max + 1) selected = self._backgrounds[:, chan_slice] model_rates = np.zeros((centroids.size, selected.shape[1]), dtype=float) for idx in range(selected.shape[1]): y = np.asarray(selected[:, idx], dtype=float) if base_times.size < 2 or np.all(~np.isfinite(y)): const = float(np.nanmean(y)) if np.isfinite(np.nanmean(y)) else 0.0 model_rates[:, idx] = np.full_like(centroids, const, dtype=float) else: spline = CubicSpline(base_times, y, bc_type='clamped', extrapolate=True) model_rates[:, idx] = spline(centroids) # Uncertainty is always zero return model_rates, np.zeros_like(model_rates)
def _compute_statistics(self, bg_mask): num_times, num_channels = self._data.rates.shape counts = self._data.rates * self._data.exposure[:, None] expected = self._backgrounds * self._data.exposure[:, None] expected = np.maximum(expected, 1.0) counts_bg = counts[bg_mask] expected_bg = expected[bg_mask] resid = counts_bg - expected_bg self._chisq = np.sum((resid ** 2) / expected_bg, axis=0) dof_val = max(counts_bg.shape[0] - 1, 1) self._dof = np.full(num_channels, float(dof_val)) def _residual_fit(self, time_all, rate_all, exposure_all, frac=0.5, lowess_iter=1, sigma_thresh=3.0, core_snr_max=2.0, min_core_points=5, return_mask=False, use_pre_mask=False, max_iter=10, refit_after_clipping=True): """LOWESS with iterative sigma-clipping. Args: time_all (np.ndarray): Time values rate_all (np.ndarray): Count rates exposure_all (np.ndarray): Exposure values frac (float): LOWESS window fraction (default 0.5) lowess_iter (int): LOWESS iterations (default 1) sigma_thresh (float): Threshold for outlier removal (default 3.0) core_snr_max (float): Max SNR for core distribution (default 2.0) min_core_points (int): Minimum points for core fit (default 5) return_mask (bool): Return background mask (default False) use_pre_mask (bool): Use pre-computed mask (default False) max_iter (int): Max iterations (default 10) refit_after_clipping (bool): If True, runs a final LOWESS fit on the retained background bins after iterative clipping. If False, skips this final refit. Returns: If return_mask=False: (times, rates) If return_mask=True: (times, rates, mask) """ time_all = np.asarray(time_all, dtype=float) rate_all = np.asarray(rate_all, dtype=float) exposure_all = np.asarray(exposure_all, dtype=float) num_bins = time_all.size # Not enough bins: flat background if num_bins < 10: mean_rate = np.nanmean(rate_all) flat = np.full_like(rate_all, mean_rate, dtype=float) if return_mask: return (time_all.copy(), flat, np.ones_like(rate_all, dtype=bool)) else: return time_all.copy(), flat if use_pre_mask and self._pre_mask is not None: background_mask = np.asarray(self._pre_mask, dtype=bool) else: background_mask = np.ones(num_bins, dtype=bool) # This is the model returned when refit_after_clipping=False. initial_times = None initial_bg_rates = None for _ in range(max_iter): masked_times = time_all[background_mask] masked_rates = rate_all[background_mask] masked_exposure = exposure_all[background_mask] if masked_times.size < 10: break masked_times = np.asarray(masked_times).ravel() masked_rates = np.asarray(masked_rates).ravel() # LOWESS on rates lowess_result = lowess(masked_rates, masked_times, frac=frac, it=lowess_iter) bg_rates_local = lowess_result[:, 1] if initial_times is None: initial_times = masked_times.copy() initial_bg_rates = bg_rates_local.copy() # Residuals in counts space numerator = (masked_rates - bg_rates_local) * masked_exposure denominator = np.sqrt( np.maximum(bg_rates_local * masked_exposure, 1.0)) snr = numerator / denominator # Core distribution core = snr[snr < core_snr_max] if core.size >= min_core_points: mu, sigma = norm.fit(core) else: mu = float(snr.mean()) sigma_tmp = snr.std() sigma = float(sigma_tmp) if sigma_tmp > 0 else 1.0 # Positive-sided clipping threshold = sigma_thresh * sigma keep_local = (snr < threshold) updated_mask = np.zeros_like(background_mask, dtype=bool) updated_mask[background_mask] = keep_local if np.array_equal(updated_mask, background_mask): background_mask = updated_mask break background_mask = updated_mask bg_times = time_all[background_mask] if bg_times.size < 2: mean_rate = np.nanmean(rate_all) if return_mask: return (time_all.copy(), np.full_like(rate_all, mean_rate, dtype=float), np.ones_like(rate_all, dtype=bool)) else: return (time_all.copy(), np.full_like(rate_all, mean_rate, dtype=float)) bg_times = np.asarray(bg_times).ravel() if refit_after_clipping: bg_rates_input = rate_all[background_mask] bg_rates_input = np.asarray(bg_rates_input).ravel() # Final LOWESS on the retained/background subset. lowess_result_final = lowess(bg_rates_input, bg_times, frac=frac, it=lowess_iter) bg_rates = lowess_result_final[:, 1] else: # Keep the initial LOWESS model. # This avoids refitting on a biased-low retained subset. if initial_times is None or initial_bg_rates is None: mean_rate = np.nanmean(rate_all) bg_times = time_all.copy() bg_rates = np.full_like(rate_all, mean_rate, dtype=float) else: bg_times = np.asarray(initial_times).ravel() bg_rates = np.asarray(initial_bg_rates).ravel() if return_mask: return bg_times, bg_rates, background_mask else: return bg_times, bg_rates def _refit_with_residual_model(self, residuals, threshold=0.7, win_size=None, spline_bc_type='clamped', lowess_iter=1, first_pass_mask=None, refit_after_clipping=True): """Refine background mask using Gaussian+Exponential model. Args: residuals (np.array): Residuals from Pass 1 fit threshold (float): Signal probability threshold (default 0.7) win_size (float, optional): Absolute smoothing window in seconds spline_bc_type (str): Spline BC (default 'clamped') lowess_iter (int): LOWESS iterations (default 1) first_pass_mask (np.array): Initial background mask refit_after_clipping (bool): Propagated into the recursive fit. Returns: (np.array, np.array): Refined mask, full signal mask """ residuals = np.array(residuals) time_centroids = self._data.time_centroids # Identify finite residuals finite_mask = np.isfinite(residuals) # Combine with pass-1 mask if first_pass_mask is None: raise ValueError("Pass-1 mask required for refit") mask = finite_mask & first_pass_mask # Apply combined mask residuals = residuals[mask] time_centroids = time_centroids[mask] if residuals.size == 0: return None, None # Histogram of residuals num_bins = max(20, int(np.sqrt(len(residuals)))) bins = np.linspace(residuals.min(), residuals.max(), num_bins + 1) hist, bin_edges = np.histogram(residuals, bins=bins, density=True) bin_centers = 0.5 * (bin_edges[:-1] + bin_edges[1:]) # Define Gaussian+Exponential model def gauss_exp(x, A, mu, sigma, B, lamb): gauss = A * norm.pdf(x, mu, sigma) exp_part = (B * lamb * np.exp(np.clip(-lamb * (x - mu), -700, 700)) * (x > mu)) return gauss + exp_part try: popt, _ = curve_fit(gauss_exp, bin_centers, hist, p0=[1.0, 0.0, 1.0, 0.1, 1.0]) except RuntimeError: msg = ("Note: Gaussian+Exponential refinement failed. " "Using Pass 2 background model.") print(msg) return None, None A, mu, sigma, B, lamb = popt # Compute signal probabilities gauss_pdf = A * norm.pdf(residuals, mu, sigma) exp_term = -lamb * (residuals - mu) exp_pdf = (B * lamb * np.exp(exp_term) * (residuals > mu)) total_pdf = gauss_pdf + exp_pdf p_signal = np.divide(exp_pdf, total_pdf, out=np.zeros_like(exp_pdf), where=total_pdf > 0) # Define new mask: exclude high-probability signal bins signal_mask = p_signal > threshold # Map refined decisions back to full-length mask full_mask = first_pass_mask.copy() full_mask[mask] = ~signal_mask # Refit using refined mask (Pass 1 and Pass 2 using new background mask) self._pre_mask = full_mask self._suppress_auto_refine = True try: self.fit(win_size=win_size, spline_bc_type=spline_bc_type, lowess_iter=lowess_iter, refit_after_clipping=refit_after_clipping) finally: # Clear flags so future fits are normal self._suppress_auto_refine = False self._pre_mask = None # Full-length signal mask signal_mask_full = np.zeros_like(self._data.time_centroids, dtype=bool) signal_mask_full[mask] = signal_mask return full_mask, signal_mask_full