# CONTAINS TECHNICAL DATA/COMPUTER SOFTWARE DELIVERED TO THE U.S. GOVERNMENT WITH UNLIMITED RIGHTS
#
# Contract No.: CA 80MSFC17M0022
# Contractor Name: Universities Space Research Association
# Contractor Address: 7178 Columbia Gateway Drive, Columbia, MD 21046
#
# Copyright 2017-2022 by Universities Space Research Association (USRA). All rights reserved.
#
# Developed by: William Cleveland and Adam Goldstein
# Universities Space Research Association
# Science and Technology Institute
# https://sti.usra.edu
#
# Developed by: Daniel Kocevski
# National Aeronautics and Space Administration (NASA)
# Marshall Space Flight Center
# Astrophysics Branch (ST-12)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing permissions and limitations under the
# License.
#
import sys
import warnings
from datetime import datetime
import numpy as np
from scipy.linalg import inv
from gdt.misc import derivative
from scipy.optimize import minimize, brentq
from scipy.stats import chi2
from gdt.core.background.primitives import BackgroundRates, BackgroundSpectrum
from gdt.core.data_primitives import EnergyBins
from gdt.core.pha import Bak
__all__ = ['SpectralFitter', 'SpectralFitterChisq', 'SpectralFitterCstat',
'SpectralFitterPgstat', 'SpectralFitterPstat', 'chisq', 'cstat',
'pgstat', 'pstat']
[docs]class SpectralFitter:
"""Base class for jointly-fitting spectra from multiple detectors.
This is a (large) wrapper around `scipy.optimize.minimize
<https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.minimize.html>`_.
This class should not be instantiated by the user, but instead a child class
for a specific fit statistic should be instantiated.
Child classes should be designed with the following requirements:
#. An ``__init__`` method that takes in the inputs in the Parent
``__init__`` method. Can have additional arguments/keywords.
#. A ``_eval_stat`` method that accepts the index into the data sets and
the source model rate for that data set. It must return a single number
that represents the likelihood for a single detector/dataset.
Parameters:
pha_list (list of :class:`~gdt.core.pha.Pha`):
The PHA objects containg the count spectrum for each detector
bkgd_list (list of :class:`~gdt.background.primitives.BackgroundRates`, \
list of :class:`~gdt.background.primitives.BackgroundSpectrum`, \
or list of :class:`~gdt.core.pha.Bak`):
The background rates object, background spectrum, or Bak object
for each detector. If given the background rates object, the times
in the corresponding PHA object will be used for the limits of
integration.
rsp_list (list of :class:`~gdt.core.response.Rsp`):
The response object for each detector
statistic (<function>): The function to be used as a fit statistic
channel_masks (list of np.array(dtype=bool), optional):
A boolean mask for each detector that indicates which energy
channels are to be fit.
Note:
The channel_mask will typically be set by the
:attr:`~gdt.core.pha.Pha.valid_channels` attribute in the PHA
object. You can set channel_masks to override the PHA
:attr:`~gdt.core.pha.Pha.valid_channels`, but this is usually
unnecessary.
method (str, optional):
The fitting algorithm, which should be one of the options for
scipy.optimize.minimize.
Note:
All solvers, with the exception of 'dogleg' and 'trust-exact',
are supported at this time.
rng (Generator, optional): The RNG object
"""
def __init__(self, pha_list, bkgd_list, rsp_list, statistic,
channel_masks=None, method='SLSQP', rng=None):
# check that we have the right numbers of data, backgrounds, responses,
# and fit masks
self._num_sets = len(pha_list)
if (len(bkgd_list) != self._num_sets) or (
len(rsp_list) != self._num_sets):
raise ValueError('Number of datasets, backgrounds, and responses must be the same')
# set the energy channel masks
if channel_masks is not None:
if len(channel_masks) != self._num_sets:
raise ValueError('If channel_masks is set, must be equal to the number of datasets')
# store response, exposure, and channel masks
self._rsp = rsp_list
self._exposure = np.array([pha.exposure for pha in pha_list])
if channel_masks is None:
channel_masks = []
for pha in pha_list:
chan_mask = np.zeros(pha.num_chans, dtype=bool)
chan_mask[pha.valid_channels] = True
channel_masks.append(chan_mask)
self._chan_masks = channel_masks
# extract observed counts and apply channel mask
self._data = self._apply_masks([pha.data.counts for pha in pha_list])
# extract background rates and variances and apply channel masks
self._back_rates = []
self._back_var = []
for bkgd, pha in zip(bkgd_list, pha_list):
if isinstance(bkgd, BackgroundRates):
bkgd_spec = bkgd.integrate_time(*pha.time_range)
elif isinstance(bkgd, BackgroundSpectrum):
bkgd_spec = bkgd
elif isinstance(bkgd, Bak):
bkgd_spec = bkgd.data
else:
raise ValueError('Unknown Background object')
self._back_rates.append(bkgd_spec.rates)
self._back_var.append(bkgd_spec.rate_uncertainty ** 2)
self._back_rates = self._apply_masks(self._back_rates)
self._back_var = self._apply_masks(self._back_var)
# fitter/function info
self._stat = statistic
self._method = method
self._function = None
self._res = None
# set the RNG used to sample fits
self._rng = rng or np.random.default_rng()
[docs] def set_rng(self, rng):
"""Set/change the generator.
Args:
rng (numpy.random.Generator): random number generator
"""
self._rng = rng
@property
def covariance(self):
"""(np.array): The covariance matrix of the fitted parameters.
Note:
The assumption of negligible higher-order terms in the Taylor
expansion may not be valid and therefore the covariance matrix
may not be valid."""
if self._res is not None:
cov = inv(-self.hessian)
return cov
@property
def detectors(self):
"""(list of str): The detectors used in the fit"""
return [rsp.detector for rsp in self._rsp]
@property
def dof(self):
"""(int): The number of degrees of freedom"""
free_params = np.sum(self._function.free)
datapts = np.sum([chan_mask.sum() for chan_mask in self._chan_masks])
return datapts - free_params
@property
def energy_range(self):
"""(float, float): The full energy range of the fit"""
emin = np.concatenate(
self._apply_masks([rsp.ebounds.low_edges() for rsp in self._rsp]))
emax = np.concatenate(
self._apply_masks([rsp.ebounds.high_edges() for rsp in self._rsp]))
return np.min(emin), np.max(emax)
@property
def function_components(self):
"""(list of str): The names of the individual model components, if
applicable"""
if self._function is not None:
try:
return self._function.names
except AttributeError:
# We will gracefully return none if the function doesn't define names
pass
@property
def function_name(self):
"""(str): The name of the model function being fit"""
if self._function is not None:
return self._function.name
@property
def hessian(self):
"""(np.array): The Hessian matrix of the fitted parameters, for which
the covariance matrix is -inverse(Hessian).
Note:
The assumption of negligible higher-order terms in the Taylor
expansion may not be valid and therefore the Hessian matrix may
not be valid."""
if self._res is not None:
return self._hessian(self.parameters, self._function)
@property
def jacobian(self):
"""(np.array): The Jacobian vector of the likelihood as a function of
the parameters."""
if self._res is not None:
return self._res.jac
@property
def message(self):
"""(str): The message from the fitter on convergence/non-convergence"""
if self._res is not None:
return self._res.message
@property
def num_components(self):
"""(int): Number of function components"""
if self._function is not None:
return self._function.num_components
@property
def num_sets(self):
"""(int): Number of datasets/detectors being fit"""
return self._num_sets
@property
def parameters(self):
"""(np.array): The parameter values at the maximum likelihood"""
if self._res is not None:
return self._res.x
@property
def statistic(self):
"""(float): The value of the fit statistic"""
if self._res is not None:
return self._res.fun
@property
def success(self):
"""(bool): True if the fit was successful, False otherwise."""
if self._res is not None:
return self._res.success
@property
def symmetric_errors(self):
"""(np.array): The 1-sigma uncertainty on the parameters assuming
higher-order terms of the Taylor expansion of the likelihood about the
maximum likelihood solution is negligible. If those terms are not
negligible, errors could be very incorrect or event NaN. See attributes
:attr:`covariance` and :attr:`hessian`."""
if self._res is not None:
id_array = np.identity(self.parameters.size, dtype=bool)
return np.sqrt(np.abs(self.covariance[id_array]))
[docs] def asymmetric_errors(self, cl=0.683):
"""Calculate the asymmetric uncertainty on each fitted parameter.
This function uses `scipy.optimize.brentq
<https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.brentq.html>`_
to find the root of:
crit = -2(LL_max-LL),
where *crit* is the critical value defined by the number of free
parameters and the confidence level, and *LL_max* is the maximum
log-likelihood. The brentq function uses a bracketing range to find the
roots, and this function uses the best-fit values of the parameters for
one side of the bracket and the min/max values defined in the fitted
function for the other side of the bracket.
Args:
cl (float, optional): The confidence level of the uncertainties.
Default is 0.683.
Returns:
(np.array): A (``nparams``, 2) array where each row reprensents the
(negative, positive) uncertainty on each parameter.
"""
# must have a fit
if self._res is None:
raise RuntimeError('Fit has not been performed')
if cl < 0.0 or cl > 1.0:
raise ValueError('confidence level must be between 0-1')
# calculate the critical value
nparams = self.parameters.size
crit = chi2.ppf(cl, nparams)
# do this only for the free parameters
idx = np.arange(self._function.nparams)[np.array(self._function.free)]
# this is the objective function for which we are finding the root
def the_func(param_v, index):
params = self.parameters.copy()
params[index] = param_v
test_like = self._fold_model(self._function.fit_eval, params)
delta = 2.0 * (-self._res.fun - test_like)
return delta - crit
# Find the lower and upper bound of the uncertainty. Here, we use
# Brent's Method to determine the bounds, which requires a bracketing
# range of the solution. One side of the bracket is the maximum
# likelihood value. If the function parameter has a finite support,
# the corresponding end of the support is used for the other side of the
# bracket, otherwise, the other side of the bracket is iteratively
# decreased (increased) until the lower (upper) bound is constrained
# by Brent's Method. This has to be done because we don't know a priori
# how far away from maximum likelihood the bound is.
# lower bound
root1 = np.zeros(nparams)
for i in range(nparams):
param = self.parameters[i]
while True:
if self._function.min_values[idx[i]] == -np.inf:
if param < 0.0:
minval = 2.0 * param
elif param > 0.1:
minval = 0.5 * param
else:
minval = param - 1.0
else:
minval = self._function.min_values[idx[i]]
# if we're at the bounds, we can't bracket the peak
try:
r, o = brentq(the_func, self.parameters[i], minval, args=(i,),
full_output=True, maxiter=1000)
break
except ValueError:
if minval == self._function.min_values[idx[i]]:
warnings.warn("Parameter exists at its lower bound")
r = minval
break
param = minval
root1[i] = r
# upper bound
root2 = np.zeros(nparams)
for i in range(nparams):
param = self.parameters[i]
while True:
if self._function.max_values[idx[i]] == np.inf:
if param < -0.1:
maxval = 0.5 * param
elif param > 0.0:
maxval = 2.0 * param
else:
maxval = 1.0 + param
else:
maxval = self._function.max_values[idx[i]]
try:
r, o = brentq(the_func, self.parameters[i], maxval, args=(i,),
full_output=True, maxiter=1000)
break
except ValueError:
# if we're at the bounds, we can't bracket the peak
if maxval == self._function.max_values[idx[i]]:
warnings.warn("Parameter exists at its upper bound")
r = maxval
break
param = maxval
root2[i] = r
# convert the bounds to -/+ uncertainties
errs = np.zeros((nparams, 2))
errs[:, 0] = np.abs(self.parameters - root1)
errs[:, 1] = np.abs(self.parameters - root2)
return errs
[docs] def data_count_spectrum(self, upper_limits_sigma=2.0):
"""The observed source count spectrum, which is (total - background).
Data bins with source counts less than the model variance are converted
to upper limits.
Args:
upper_limits_sigma (float, optional): The upper limits sigma.
Default is 2.
Returns:
(tuple): A 5-tuple containing, for each dataset:
- *list of np.array*: Energy centroids
- *list of np.array*: Energy channel half widths (in log-space)
- *list of np.array*: differential count spectrum
- *list of np.array*: 1-sigma count spectrum errors
- *list of np.array*: Upper limits boolean mask
The upper limits mask is a boolean mask where True indicates the
element is an upper limit.
"""
if upper_limits_sigma <= 0.0:
raise ValueError('upper_limits_sigma must be > 0.0')
chanwidths = self._apply_masks(
[rsp.drm.channel_widths for rsp in self._rsp])
mvar = self.model_variance()
# calculate the differential count spectrum and upper limit masks
src_spectra = []
ulmasks = []
for i in range(self.num_sets):
src_counts = (self._data[i] - self._back_rates[i] * self._exposure[i]) / (self._exposure[i] * chanwidths[i])
ulmask = src_counts < upper_limits_sigma * np.sqrt(mvar[i])
src_counts[ulmask] = upper_limits_sigma * np.sqrt(mvar[i])[ulmask]
src_spectra.append(src_counts)
ulmasks.append(ulmask)
ecents = self._apply_masks([rsp.drm.channel_centroids for rsp in self._rsp])
elo = self._apply_masks([np.array(rsp.drm.ebounds.low_edges()) for rsp in self._rsp])
ehi = self._apply_masks([np.array(rsp.drm.ebounds.high_edges()) for rsp in self._rsp])
ewidths = [np.array([ecents[i] - elo[i], ehi[i] - ecents[i]]) for i in range(len(ecents))]
src_errs = [np.sqrt(var) for var in mvar]
return ecents, ewidths, src_spectra, src_errs, ulmasks
[docs] def fit(self, function, **kwargs):
"""Fit the given the function to the data.
Args:
function (:class:`~gdt.spectra.functions.Function`):
The function object to use
**kwargs: Keyword arguments passed to the fitter
"""
self._function = function
# treat the fixed parameters
init_params = np.array(function.default_values)[function.free].tolist()
# do the fit, also returns the jacobian
if self._method == 'dogleg' or self._method == 'trust-exact':
raise RuntimeError('dogleg and trust-exact solvers not yet supported')
if self._method == 'trust-ncg' or self._method == 'trust-krylov':
hess = '3-point'
else:
hess = None
with warnings.catch_warnings():
warnings.simplefilter("ignore")
res = minimize(self._eval_stat_jac, init_params, args=(function,),
method=self._method, bounds=function.parameter_bounds(),
jac=True, hess=hess, **kwargs)
self._res = res
[docs] def model_count_spectrum(self):
"""The fitted model count spectrum.
Returns:
(list of :class:`~gdt.core.data_primitives.EnergyBins`)
"""
if self._res is None:
raise RuntimeError('Fit has not been performed')
ebins = []
for i in range(self.num_sets):
rates = self._rsp[i].drm.fold_spectrum(self._function.fit_eval,
self.parameters)
counts = rates * self._exposure[i]
emin = np.array(self._rsp[i].ebounds.low_edges())
emax = np.array(self._rsp[i].ebounds.high_edges())
ebin = EnergyBins(counts[self._chan_masks[i]],
emin[self._chan_masks[i]],
emax[self._chan_masks[i]], self._exposure[i])
ebins.append(ebin)
return ebins
[docs] def model_variance(self):
"""The differential source model variance, accounting for the data
variance and background model variance.
Returns:
(list of np.array)
"""
if self._res is None:
raise RuntimeError('Fit has not been performed')
chanwidths = self._apply_masks(
[rsp.drm.channel_widths for rsp in self._rsp])
# model variance: need positive data counts, background rate and
# variance and exposure
mvar = []
for i in range(self.num_sets):
rates = self._rsp[i].drm.fold_spectrum(self._function.fit_eval, self.parameters)
model_rate = rates[self._chan_masks[i]]
model_rate[model_rate < 0.0] = 0.0
mvar.append(self._back_var[i] / (chanwidths[i]) ** 2
+ (model_rate / chanwidths[i] + self._back_rates[i] / chanwidths[i])
/ (np.abs(self._exposure[i]) * chanwidths[i]))
return mvar
[docs] def residuals(self, sigma=True):
"""The fit residuals for each dataset in units of differential
counts or model sigma
Args:
sigma (bool, optional): If True, return in units of model sigma,
otherwise return in units of differential counts.
Default is True.
Returns:
(tuple): A 4-tuple containing:
- *list of np.array*: Energy centroids
- *list of np.array*: Energy channel half-widths (in log-space)
- *list of np.array*: Fit residuals
- *list of np.array*: Residual uncertainties
"""
if self._res is None:
raise RuntimeError('Fit has not been performed')
chanwidths = self._apply_masks(
[rsp.drm.channel_widths for rsp in self._rsp])
# calculate the count spectrum model
model = self.model_count_spectrum()
# residuals are the differential model counts subtracted from the
# differential source counts above background
resid = []
for i in range(self.num_sets):
back_rates = self._back_rates[i] / chanwidths[i]
rates = self._data[i] / (self._exposure[i] * chanwidths[i])
resid.append((rates - back_rates) - model[i].rates_per_kev)
# can calculate the residuals as a function of the model uncertainty
model_var = self.model_variance()
if sigma:
resid = [resid[i] / np.sqrt(model_var[i]) for i in
range(self.num_sets)]
resid_err = [np.ones_like(r) for r in resid]
else:
resid_err = [np.sqrt(var) for var in model_var]
# the energy centroids and widths
ecents = self._apply_masks([rsp.drm.channel_centroids for rsp in self._rsp])
elo = self._apply_masks([np.array(rsp.drm.ebounds.low_edges()) for rsp in self._rsp])
ehi = self._apply_masks([np.array(rsp.drm.ebounds.high_edges()) for rsp in self._rsp])
ewidths = [np.array([ecents[i] - elo[i], ehi[i] - ecents[i]]) for i in range(len(ecents))]
return ecents, ewidths, resid, resid_err
[docs] def sample_flux(self, erange, num_samples=1000, num_points=1000, **kwargs):
"""Produce samples of the energy-integrated photon or energy flux.
This uses the covariance matrix and assumes a multivariate normal
distribuion for the parameters. Caveat Emptor.
Args:
erange (float, float): The energy range over which to calculate the
flux
num_samples (int): The number of spectra to sample. Default is 1000.
num_points (int): The number of grid points in energy. Default is
1000.
**kwargs: Options to be passed to
:meth:`~gbm.spectra.functions.Function.integrate`.
Returns:
(np.array)
"""
if self._res is None:
raise RuntimeError('Fit has not been performed')
params = self.sample_parameters(size=num_samples)
flux = [
self._function.integrate(param_vec, erange, num_points=num_points,
**kwargs) for param_vec in params]
return flux
[docs] def sample_parameters(self, **kwargs):
"""Produce samples of the fitted parameters. This uses the covariance
matrix and assumes a multivariate normal distribuion. Caveat Emptor.
Args:
**kwargs: Options (like size) to be passed to `numpy.random.multivariate_normal \
<https://numpy.org/doc/stable/reference/random/generated/numpy.random.multivariate_normal.html>`_
Returns:
(np.array)
"""
if self._res is None:
raise RuntimeError('Fit has not been performed')
samples = self._rng.multivariate_normal(
self.parameters, self.covariance, **kwargs)
return samples
[docs] def sample_spectrum(self, which, num_samples=1000, num_points=1000,
components=False):
r"""Produce samples of the model photon, energy, or :math:`\nu F_\nu`
spectrum. This uses the covariance matrix and assumes a multivariate
normal distribuion for the parameters. Caveat Emptor.
Args:
which (str): Which spectrum to return. Either 'photon', 'energy',
or 'nufnu'
num_samples (int, optional): The number of spectrum to sample.
Default is 1000.
num_points (int, optional): The number of grid points in energy.
Default is 1000.
components (bool, optional):
If True, calculate the spectrum for each individual model
components (if there are multiple components). Default is False.
Returns:
(tuple): A 2-tuple containing:
- *np.array*: The energy grid at which the spectrum is calculated
- *np.array*: Array of shape (``num_samples``, ``num_points``) if
there is only one component or
(``num_samples``, ``num_components``, ``num_points``) if there
are multiple components.
"""
if self._res is None:
raise RuntimeError('Fit has not been performed')
params = self.sample_parameters(size=num_samples)
grid = np.logspace(*np.log10(self.energy_range), num_points)
if self.num_components > 1:
model = np.array([self._function.fit_eval(param, grid, components=components) for param in params])
else:
model = np.array([self._function.fit_eval(param, grid) for param in params])
if which == 'energy':
model *= grid[np.newaxis, :]
elif which == 'nufnu':
model *= grid[np.newaxis, :] ** 2
else:
pass
return grid, model
[docs] def save(self, filename):
"""Saves the fitter object to a compressed binary numpy file. The full
state of the fitter is serialized and saved, and it can be restored
using the :meth:`load` method.
Args:
filename (str): The complete filename to save the fitter object to
"""
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
np.savez_compressed(filename, time=now, obj=self)
[docs] def spectrum(self, which, num_points=1000, components=False):
r"""The model photon, energy, or :math:`\nu F_\nu` spectrum.
Args:
which (str): Which spectrum to return. Either 'photon', 'energy',
or 'nufnu'
num_points (int, optional): The number of grid points in energy.
Default is 1000.
components (bool, optional): If True, calculate the spectrum for
each individual model components (if
there are multiple components).
Default is False.
Returns:
(tuple): A 2-tuple containing:
- *np.array*: The energy grid at which the spectrum is calculated
- *np.array* or *list of np.array*: model spectrum values, or
spectrum values for each component
"""
if self._res is None:
raise RuntimeError('Fit has not been performed')
grid = np.logspace(*np.log10(self.energy_range), num_points)
if self.num_components > 1:
pmodel = self._function.fit_eval(self.parameters, grid,
components=components)
else:
pmodel = self._function.fit_eval(self.parameters, grid)
# multiply by energy or energy^2 if returning energy or nufnu spectrum
if which == 'energy':
pmodel *= grid
elif which == 'nufnu':
pmodel *= grid ** 2
else:
pass
return grid, pmodel
[docs] @classmethod
def load(cls, filename):
"""Loads a fitter object that has been saved to disk in a compressed
numpy file. Thanks to the cool properties of the numpy format, this
method will load the complete state of the fitter when it was saved.
Args:
filename (str): The complete filename of the saved file
"""
loaded = np.load(filename, allow_pickle=True)
obj = loaded['obj'].reshape(-1)[0]
print('Fit loaded from {}'.format(loaded['time']))
return obj
def _apply_masks(self, a_list):
"""Apply the fit masks to a list of arrays.
Args:
a_list (list of np.array): The list of arrays
Returns:
(list of np.array)
"""
return [np.asarray(one_list)[one_mask] for one_list, one_mask in zip(a_list, self._chan_masks)]
def _eval_stat(self, set_num, src_model):
"""Evaluate the statistic for a single set. This must be defined by the
derived class
Args:
set_num (int): The index number of the set
src_model (np.array): The source model rates for the set
Returns:
(float)
"""
raise NotImplementedError
def _eval_stat_jac(self, params, function):
"""Evaluate the statistic (and Jacobian of the statistic as a function
of the model parameters).
Args:
params (list): The parameters values
function (class:`~.functions.Function`) The function object to use
Returns:
(float, np.array)
"""
stat = self._fold_model(function.fit_eval, params)
jac = self._jacobian(params, function)
return -stat, -jac
def _fold_model(self, function, params):
"""Folds the model throught the spectrum and calculates the fit
statistic
Note:
This is an empty function in the base class, and the inherited class
must define this.
Args:
function (:class:`~.functions.Function`): The function object to use
params (list): The parameters values
Returns:
(float)
"""
stat = np.zeros(self.num_sets)
for i in range(self.num_sets):
# fold model through response and convert to raw model counts
model = self._rsp[i].drm.fold_spectrum(function, params, channel_mask=self._chan_masks[i])
# perform likelihood calculation for one dataset
stat[i] = self._eval_stat(i, model)
return stat.sum()
def _hessian(self, params, function):
"""Calculate the Hessian of the fit statistic as a function of the
model parameters. This is evalated numerically using finite differences.
Args:
params (list): The parameters values
function (class:`~.functions.Function`) The function object to use
Returns:
(np.array)
"""
num_params = len(params)
hess = np.zeros((num_params, num_params))
for i in range(num_params):
for j in range(num_params):
# Hessian is symmetric, so we only have to calculate either the
# upper or lower triangle
if i > j:
hess[i, j] = hess[j, i]
continue
params_temp = params.copy()
if params[i] != 0:
dx1 = np.abs((1.0 + function.delta_rel[i]) * params[i] - params[i])
else:
dx1 = np.abs((1.0 + function.delta_abs[i]) * params[i] - params[i])
if params[j] != 0:
dx2 = np.abs((1.0 + function.delta_rel[j]) * params[j] - params[j])
else:
dx2 = np.abs((1.0 + function.delta_abs[j]) * params[j] - params[j])
hess[i, j] = self._mixed_pderiv(function.fit_eval, params_temp,
i, j, dx1, dx2)
return hess
def _jacobian(self, params, function):
"""Calculate the Jacobian of the fit statistic as a function of the
model parameters. This is evalated numerically using finite differences.
Args:
params (list): The parameters values
function (class:`~.functions.Function`) The function object to use
Returns:
(np.array)
"""
num_params = len(params)
grad = np.zeros(num_params)
# for each parameter, do partial derivative using finite differences
for i in range(num_params):
params_temp = params.copy()
dx = np.abs((1.0 + function.delta_rel[i]) * params[i] - params[i])
grad[i] = self._pderiv(function.fit_eval, params_temp, index=i,
dx=dx)
return grad
def _mixed_pderiv(self, function, params, index1, index2, dx1, dx2):
"""Mixed second partial derivative of function with respect to its
parameters. This calls _pderiv.
Args:
function (class:`~.functions.Function`) The function object to use
params (list): The parameters values
index1 (int): The index into the parameter list, representing the
parameter for which the first partial derivative will be
calculated.
index2 (int): The index into the parameter list, representing the
parameter for which the second partial derivative will be
calculated.
dx1 (float): The step size for the first parameter
dx2 (flot): The step size for the second parameter
Returns:
(float)
"""
def wraps(x):
params[index2] = x
the_args = [function, params]
return self._pderiv(*the_args, index=index1, dx=dx1)
return derivative(wraps, params[index2], dx=dx2)
def _pderiv(self, function, params, index=0, **kwargs):
"""Partial derivative of function with respect to one of its parameters
This is a wrapper around scipy.misc.derivative to allow a partial
derivative to be calculated
Args:
function (class:`~.functions.Function`) The function object to use
params (list): The parameters values
index (int, optional):
The index into the parameter list, representing the parameter
for which the partial derivative will be calculated. Default is 0
**kwargs: Options to pass to scipy.misc.derivative
(such as the ``dx`` argument)
Returns:
(float)
"""
def wraps(x):
params[index] = x
the_args = [function, params]
return self._fold_model(*the_args)
return derivative(wraps, params[index], **kwargs)
[docs]class SpectralFitterChisq(SpectralFitter):
"""Jointly-fit datasets using Chi-Squared Likelihood.
Parameters:
pha_list (list of :class:`~gdt.core.pha.Pha`):
The PHA objects containg the count spectrum for each detector
bkgd_list (list of :class:`~gdt.background.primitives.BackgroundRates`, \
list of :class:`~gdt.background.primitives.BackgroundSpectrum`, \
or list of :class:`~gdt.core.pha.Bak`):
The background rates object, background spectrum, or Bak object
for each detector. If given the background rates object, the times
in the corresponding PHA object will be used for the limits of
integration.
rsp_list (list of :class:`~gdt.core.response.Rsp`):
The response object for each detector
channel_masks (list of np.array(dtype=bool), optional):
A boolean mask for each detector that indicates which energy
channels are to be fit.
Note:
The channel_mask will typically be set by the
:attr:`~gdt.core.pha.Pha.valid_channels` attribute in the PHA
object. You can set channel_masks to override the PHA
:attr:`~gdt.core.pha.Pha.valid_channels`, but this is usually
unnecessary.
method (str, optional):
The fitting algorithm, which should be one of the options for
scipy.optimize.minimize.
Note:
All solvers, with the exception of 'dogleg' and 'trust-exact',
are supported at this time.
"""
def __init__(self, pha_list, bkgd_list, rsp_list, **kwargs):
super().__init__(pha_list, bkgd_list, rsp_list, chisq, **kwargs)
def _eval_stat(self, set_num, src_model):
# perform chisq calculation for one dataset
return self._stat(self._data[set_num], self._back_rates[set_num],
self._back_var[set_num], src_model,
self._exposure[set_num])
[docs]class SpectralFitterPgstat(SpectralFitter):
"""Jointly-fit datasets using Profile Gaussian likelihood (PG-Stat).
Parameters:
pha_list (list of :class:`~gdt.core.pha.Pha`):
The PHA objects containg the count spectrum for each detector
bkgd_list (list of :class:`~gdt.background.primitives.BackgroundRates`, \
list of :class:`~gdt.background.primitives.BackgroundSpectrum`, \
or list of :class:`~gdt.core.pha.Bak`):
The background rates object, background spectrum, or Bak object
for each detector. If given the background rates object, the times
in the corresponding PHA object will be used for the limits of
integration.
rsp_list (list of :class:`~gdt.core.response.Rsp`):
The response object for each detector
channel_masks (list of np.array(dtype=bool), optional):
A boolean mask for each detector that indicates which energy
channels are to be fit.
Note:
The channel_mask will typically be set by the
:attr:`~gdt.core.pha.Pha.valid_channels` attribute in the PHA
object. You can set channel_masks to override the PHA
:attr:`~gdt.core.pha.Pha.valid_channels`, but this is usually
unnecessary.
method (str, optional):
The fitting algorithm, which should be one of the options for
scipy.optimize.minimize.
Note:
All solvers, with the exception of 'dogleg' and 'trust-exact',
are supported at this time.
"""
def __init__(self, pha_list, bkgd_list, rsp_list, back_exp=None, **kwargs):
super().__init__(pha_list, bkgd_list, rsp_list, pgstat, **kwargs)
# background exposure (may == source exposure)
if back_exp is None:
back_exp = self._exposure
self._back_exp = back_exp
def _eval_stat(self, set_num, src_model):
# perform pgstat calculation for one dataset
return self._stat(self._data[set_num], src_model, self._exposure[set_num],
self._back_rates[set_num] * self._back_exp[set_num],
self._back_var[set_num] * self._back_exp[set_num]**2,
self._back_exp[set_num])
[docs]class SpectralFitterCstat(SpectralFitter):
"""Jointly-fit datasets using C-Stat (Poisson source with Poisson background)
Parameters:
pha_list (list of :class:`~gdt.core.pha.Pha`):
The PHA objects containg the count spectrum for each detector
bkgd_list (list of :class:`~gdt.background.primitives.BackgroundRates`, \
list of :class:`~gdt.background.primitives.BackgroundSpectrum`, \
or list of :class:`~gdt.core.pha.Bak`):
The background rates object, background spectrum, or Bak object
for each detector. If given the background rates object, the times
in the corresponding PHA object will be used for the limits of
integration.
rsp_list (list of :class:`~gdt.core.response.Rsp`):
The response object for each detector
channel_masks (list of np.array(dtype=bool), optional):
A boolean mask for each detector that indicates which energy
channels are to be fit.
Note:
The channel_mask will typically be set by the
:attr:`~gdt.core.pha.Pha.valid_channels` attribute in the PHA
object. You can set channel_masks to override the PHA
:attr:`~gdt.core.pha.Pha.valid_channels`, but this is usually
unnecessary.
method (str, optional):
The fitting algorithm, which should be one of the options for
scipy.optimize.minimize.
Note:
All solvers, with the exception of 'dogleg' and 'trust-exact',
are supported at this time.
"""
def __init__(self, pha_list, bkgd_list, rsp_list, back_exp=None, **kwargs):
super().__init__(pha_list, bkgd_list, rsp_list, cstat, **kwargs)
# background exposure (may == source exposure)
if back_exp is None:
back_exp = self._exposure
self._back_exp = back_exp
def _eval_stat(self, set_num, src_model):
# perform cstat calculation for one dataset
return self._stat(self._data[set_num], src_model, self._exposure[set_num],
self._back_rates[set_num] * self._back_exp[set_num],
self._back_exp[set_num])
[docs]class SpectralFitterPstat(SpectralFitter):
"""Jointly-fit datasets using P-Stat (Poisson source with known background).
Parameters:
pha_list (list of :class:`~gdt.core.pha.Pha`):
The PHA objects containg the count spectrum for each detector
bkgd_list (list of :class:`~gdt.background.primitives.BackgroundRates`, \
list of :class:`~gdt.background.primitives.BackgroundSpectrum`, \
or list of :class:`~gdt.core.pha.Bak`):
The background rates object, background spectrum, or Bak object
for each detector. If given the background rates object, the times
in the corresponding PHA object will be used for the limits of
integration.
rsp_list (list of :class:`~gdt.core.response.Rsp`):
The response object for each detector
channel_masks (list of np.array(dtype=bool), optional):
A boolean mask for each detector that indicates which energy
channels are to be fit.
Note:
The channel_mask will typically be set by the
:attr:`~gdt.core.pha.Pha.valid_channels` attribute in the PHA
object. You can set channel_masks to override the PHA
:attr:`~gdt.core.pha.Pha.valid_channels`, but this is usually
unnecessary.
method (str, optional):
The fitting algorithm, which should be one of the options for
scipy.optimize.minimize.
Note:
All solvers, with the exception of 'dogleg' and 'trust-exact',
are supported at this time.
"""
def __init__(self, pha_list, bkgd_list, rsp_list, **kwargs):
super().__init__(pha_list, bkgd_list, rsp_list, pstat, **kwargs)
def _eval_stat(self, set_num, src_model):
# perform pstat calculation for one dataset
return self._stat(self._data[set_num], src_model,
self._exposure[set_num], self._back_rates[set_num])
# --------------------------------------------------------------------------
# FIT STATISTICS
[docs]def chisq(obs_counts, back_rates, back_var, mod_rates, exposure):
"""Chi-Square Likelihood
Args:
obs_counts (np.array): The total observed counts
back_rates (np.array): The background model count rates
back_var (np.array): The background model count rate variance
mod_rates (np.array): The model source rates
exposure (float): The source exposure
Returns:
(float)
"""
mask = (obs_counts - back_rates * exposure) > 0
like = (((obs_counts[mask] - back_rates[mask] * exposure) - mod_rates[mask] * exposure) ** 2
/ (obs_counts[mask] + back_var[mask] * exposure ** 2))
return -like.sum()
[docs]def cstat(obs_counts, mod_rates, exposure, back_counts, back_exp):
"""C-Statistic Likelihood for Poisson source + Poisson background.
The "W" statistic from the `XSPEC Statistics Appendix
<https://heasarc.gsfc.nasa.gov/xanadu/xspec/manual/XSappendixStatistics.html>`_.
Args:
obs_counts (np.array): The total observed counts
mod_rates (np.array): The model source rates
exposure (float): The source exposure
back_counts (np.array): The background model counts
back_exp (float): The background exposure
Returns:
(float)
"""
sum_exp = exposure + back_exp
# special cases for when the observed counts = 0 or when the background
# model counts = 0
mask1 = (obs_counts == 0)
mask2 = (back_counts == 0) & (mod_rates < obs_counts / sum_exp)
mask3 = (back_counts == 0) & (mod_rates >= obs_counts / sum_exp) & (mod_rates > 0)
w = np.zeros_like(obs_counts)
w[mask1] = exposure * mod_rates[mask1] - back_counts[mask1] * np.log(back_exp / sum_exp)
w[mask2] = -back_exp * mod_rates[mask2] - obs_counts[mask2] * np.log(exposure / sum_exp)
w[mask3] = (exposure * mod_rates[mask3] + obs_counts[mask3]
* (np.log(obs_counts[mask3]) - np.log(exposure * mod_rates[mask3]) - 1.0))
# for all other cases:
mask = ~(mask1 | mask2 | mask3)
mod_rates_nz = mod_rates[mask]
obs_counts_nz = obs_counts[mask]
back_counts_nz = back_counts[mask]
d = np.sqrt(
(sum_exp * mod_rates_nz - obs_counts_nz - back_counts_nz) ** 2 + 4.0 * sum_exp * back_counts_nz * mod_rates_nz)
f = (obs_counts_nz + back_counts_nz - sum_exp * mod_rates_nz + d) / (2.0 * sum_exp)
w[mask] = (exposure * mod_rates_nz + sum_exp * f - obs_counts_nz * np.log(exposure * mod_rates_nz + exposure * f)
- back_counts_nz * np.log(back_exp * f) - obs_counts_nz * (1.0 - np.log(obs_counts_nz))
- back_counts_nz * (1.0 - np.log(back_counts_nz)))
return -w.sum()
[docs]def pstat(obs_counts, mod_rates, exposure, back_rates):
"""Likelihood for Poisson source + known background.
The "pstat" statistic from the `XSPEC Statistics Appendix
<https://heasarc.gsfc.nasa.gov/xanadu/xspec/manual/XSappendixStatistics.html>`_.
Args:
obs_counts (np.array): The total observed counts
mod_rates (np.array): The model source rates
exposure (float): The source exposure
back_rates (np.array): The background model count rates
Returns:
(float)
"""
mask = (obs_counts > 0) & ((mod_rates + back_rates) > 0.0)
pstat = np.zeros_like(obs_counts, dtype=float)
pstat[mask] = (exposure * (mod_rates[mask] + back_rates[mask]) \
- obs_counts[mask] * np.log(exposure * (mod_rates[mask] + back_rates[mask])) \
- obs_counts[mask] * (1.0 - np.log(obs_counts[mask])))
# zero-count bins
mask = (obs_counts == 0) & ((mod_rates + back_rates) > 0.0)
pstat[mask] = exposure * (mod_rates[mask] + back_rates[mask])
return -pstat.sum()
[docs]def pgstat(obs_counts, mod_rates, exposure, back_counts, back_var, back_exp):
"""Profile Gaussian Likelihood. From the `XSPEC Statistics Appendix
<https://heasarc.gsfc.nasa.gov/xanadu/xspec/manual/XSappendixStatistics.html>`_.
Args:
obs_counts (np.array): The total observed counts
mod_rates (np.array): The model source rates
exposure (float): The source exposure
back_counts (np.array): The background model counts
back_var (np.array): The background model count variance
back_exp (float): The background exposure
Returns:
(float)
"""
mask = (obs_counts > 0)
pg = np.zeros_like(obs_counts, dtype=float)
# special case for zero observed counts
pg[~mask] = (exposure * mod_rates[~mask] + back_counts[~mask] * (exposure / back_exp)
- back_var[~mask] * (exposure / back_exp)**2 / 2.0)
# for all other cases:
obs_counts_nz = obs_counts[mask]
mod_rates_nz = mod_rates[mask]
back_counts_nz = back_counts[mask]
back_var_nz = back_var[mask]
# pull out some common calculations to simply and speed up
q = (exposure * back_var_nz) - (back_exp * back_counts_nz) \
+ (back_exp**2 * mod_rates_nz)
r = (exposure * back_var_nz * mod_rates_nz) - (obs_counts_nz * back_var_nz) \
- (back_exp * back_counts_nz * mod_rates_nz)
# root, and how to decide if positive/negative is used
d = np.sqrt(q**2 - 4.0 * back_exp**2 * r)
d[q < 0.0] *= -1.0
f = ( (-q + d) / (2.0 * back_exp**2))
f_alt = (2.0 * r) / (-q + d)
# if f < 0, then use f_alt
fmask = f < 0.0
f[fmask] = f_alt[fmask]
pg[mask] = (exposure * (mod_rates_nz + f) \
- obs_counts_nz * np.log(exposure * mod_rates_nz + exposure * f) \
+ (back_counts_nz - back_exp * f)**2 / (2.0 * back_var_nz) \
- obs_counts_nz * (1.0 - np.log(obs_counts_nz)))
return -pg.sum()