Source code for gdt.core.data_primitives.responses

# CONTAINS TECHNICAL DATA/COMPUTER SOFTWARE DELIVERED TO THE U.S. GOVERNMENT WITH UNLIMITED RIGHTS
#
# Contract Nos.: CA 80MSFC17M0022 / 80NSSC24M0035
# Contractor Name: Universities Space Research Association
# Contractor Address: 7178 Columbia Gateway Drive, Columbia, MD 21046
#
# Copyright 2017-2022 by Universities Space Research Association (USRA). All rights reserved.
#
# Developed by: William Cleveland and Adam Goldstein
#               Universities Space Research Association
#               Science and Technology Institute
#               https://sti.usra.edu
#
# Developed by: Daniel Kocevski
#               National Aeronautics and Space Administration (NASA)
#               Marshall Space Flight Center
#               Astrophysics Branch (ST-12)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing permissions and limitations under the
# License.
#
import numpy as np
from scipy.interpolate import interp1d
from scipy.integrate import trapezoid
from .bins import Bins
from .intervals import Ebounds


__all__ = ['ResponseMatrix']


[docs]class ResponseMatrix(): """A class defining a 2D detector response matrix, with an (input) photon axis and (output) channel axis. Parameters: matrix (np.array): The 2D matrix, of shape (num_photon_bins, num_channels) emin (np.array): The low edges of the (input) photon bins emax (np.array): The high edges of the (input) photon bins chanlo (np.array): The low edges of the (output) energy channels chanhi (np.array): The high edges of the (output) energy channels """ def __init__(self, matrix, emin, emax, chanlo, chanhi): try: iter(matrix) self._matrix = np.asarray(matrix) except: raise TypeError('matrix must be an iterable') if self._matrix.ndim != 2: raise TypeError('matrix must be a 2-dimensional array') try: iter(emin) self._emin = np.asarray(emin).flatten() except: raise TypeError('emin must be an iterable') try: iter(emax) self._emax = np.asarray(emax).flatten() except: raise TypeError('emax must be an iterable') try: iter(chanlo) self._chanlo = np.asarray(chanlo).flatten() except: raise TypeError('chanlo must be an iterable') try: iter(chanhi) self._chanhi = np.asarray(chanhi).flatten() except: raise TypeError('chanhi must be an iterable') if (self._emin.size != self._emax.size): raise ValueError('emin and emax must be the same length') if (self._chanlo.size != self._chanhi.size): raise ValueError('chanlo and chanhi must be the same length') if (self._matrix.shape[0] != self._emin.size) or \ (self._matrix.shape[1] != self._chanlo.size): raise ValueError('matrix axis 0 must have same length as emin and '\ 'emax. matrix axis 1 must have same length as '\ 'chanlo and chanhi.') @property def channel_centroids(self): """(np.array): The geometric mean of the energy channels""" return np.sqrt(self._chanlo * self._chanhi) @property def channel_widths(self): """(np.array): The energy channel widths""" return self._chanhi - self._chanlo @property def ebounds(self): """(:class:`Ebounds`): The energy bounds""" return Ebounds.from_bounds(self._chanlo, self._chanhi) @property def matrix(self): """(np.array): The raw matrix""" return self._matrix @property def num_chans(self): """(int): The number of energy channels""" return self._matrix.shape[1] @property def num_ebins(self): """(int): The number of photon bins""" return self._matrix.shape[0] @property def photon_bins(self): """(:class:`Ebounds`): The photon bins""" return Ebounds.from_bounds(self._emin, self._emax) @property def photon_bin_centroids(self): """(np.array): The geometric mean of the photon bins""" return np.sqrt(self._emin * self._emax) @property def photon_bin_widths(self): """(np.array): The photon bin widths""" return self._emax - self._emin
[docs] def channel_effective_area(self): """Returns the effective area as a function of recorded channel energy (integrated over incident photon bins). Returns: (:class:`Bins`) """ bins = Bins(self._matrix.sum(axis=0), self._chanlo, self._chanhi) return bins
[docs] def effective_area(self, energy, interp_kind='linear'): """Calculate the effective area at a given energy or an array of energies. This function interpolates the DRM (in log10(cm^2)) to provide the effective area for any energy within the photon energy bounds of the DRM. Args: energy (float or np.array): The photon energy or energies at which to calculate the effective area. interp_kind (str, optional): The kind of interpolation to be passed to scipy.interp1d. Default is 'linear'. Returns: (np.array) """ try: energy = np.asarray(energy, dtype=float) except: raise TypeError('energy must be a positive float') if np.any(energy < self.photon_bins.range[0]) or \ np.any(energy > self.photon_bins.range[1]): raise ValueError('energy must be within the photon energy bounds ' \ 'of the DRM') effarea = self.photon_effective_area() diff_effarea = effarea.counts # create the DRM interpolator x = np.append(self._emin, self._emax[-1]) y = np.append(diff_effarea, diff_effarea[-1]) nz_mask = (y > 0) effarea_interpolator = interp1d(x[nz_mask], np.log10(y[nz_mask]), kind=interp_kind, fill_value='extrapolate') # do the interpolation effarea = 10.0**effarea_interpolator(energy) return effarea
[docs] def fold_spectrum(self, function, params, channel_mask=None): """Fold a photon spectrum through a DRM to get a count spectrum Args: function (<function>): A photon spectrum function. The function must accept a list of function parameters as its first argument and an array of photon energies as its second argument. The function must return the evaluation of the photon model in units of ph/s-cm^2-keV. params (list of float): A list of parameter values to be passed to the photon spectrum function channel_mask (np.array, optional): A boolean mask where True indicates the channel is to be used for folding and False indicates the channel is to not be used for folding. If omitted, all channels are used. Returns: (np.array) """ drm = self._matrix if channel_mask is not None: drm = drm[:, channel_mask] # evaluate photon model photon_model = function(params, self.photon_bin_centroids) # fold photon model through DRM counts = np.dot(drm.T, photon_model * self.photon_bin_widths) return counts
[docs] def photon_effective_area(self): """Returns the effective area as a function of incident photon energy (integrated over recorded energy channels). Returns: (:class:`Bins`) """ bins = Bins(self._matrix.sum(axis=1), self._emin, self._emax) return bins
[docs] def rebin(self, factor=None, edge_indices=None): """Rebins the channel energy axis of a DRM and returns a new response object. Rebinning can only be used to downgrade the channel resolution and is constrained to the channel edges of the current DRM. Rebinning can be performed by either defining an integral factor of the number of current energy channels to combine (e.g. factor=4 for 128 energy channels would result in 32 energy channels), or by defining an index array into the current channel edges that will define the new set of edges. Args: factor (int, optional): The rebinning factor. Must set either this or `edge_indices` edge_indices (np.array, optional): The index array that represents which energy edges should remain in the rebinned DRM. Returns: (:class:`ResponseMatrix`) """ if (factor is None) and (edge_indices is None): raise ValueError('Either factor or edge_indices must be set') elif factor is not None: try: factor = int(factor) except: raise TypeError('factor must be a positive integer') if factor < 1: raise ValueError('factor must be a positive integer') if (self.num_chans % factor) > 0: raise ValueError('factor must be factor of numchans: '\ '{}'.format(self.num_chans)) chanlo = self._chanlo.reshape(-1, factor)[:,0] chanhi = self._chanhi.reshape(-1, factor)[:,-1] elif edge_indices is not None: try: edge_indices = np.asarray(edge_indices) except: raise TypeError('new_edges must be a list, tuple or array') edge_indices = np.unique(edge_indices.astype(int)) if (edge_indices[0] < 0) or (edge_indices[-1] > self.num_chans): raise ValueError('edge_indices outside valid range of ' \ '0-{}'.format(self.num_chans)) old_edges = np.append(self._chanlo, self._chanhi[-1]) new_edges = old_edges[edge_indices] chanlo = new_edges[:-1] chanhi = new_edges[1:] # the new effective area matrix will just be summed over the channel # axis for the bins we are combining new_matrix = np.zeros((self.num_ebins, chanlo.size)) sidx = (self._chanlo[:,np.newaxis] == chanlo[np.newaxis,:]).nonzero()[0] eidx = (self._chanhi[:,np.newaxis] == chanhi[np.newaxis,:]).nonzero()[0] for i in range(chanlo.size): new_matrix[:,i] = self._matrix[:,sidx[i]:eidx[i]+1].sum(axis=1) obj = type(self)(new_matrix, self._emin, self._emax, chanlo, chanhi) return obj
[docs] def resample(self, num_photon_bins=None, photon_bin_edges=None, num_interp_points=20, interp_kind='linear'): """Resamples the incident photon axis of a DRM and returns a new ResponseMatrix object. Resampling can be used to downgrade the photon energy resolution, upgrade the resolution, and/or redefine the edges of the incident photon bins. By definition, the resampling can only be performed within the photon energy range of the current object. The process for resampling is to create a high-resolution grid for each new photon bin, interpolate the differential effective area onto that grid (interpolation is performed on log10(effective area)), and then integrate the differential effective area over the grid points in each bin. The final effective area is then scaled by the ratio of the initial number of photon bins to the requested number of photon bins. Args: num_photon_bins (int, optional): The number of photon bins in the new DRM. The bin edges will be generated logarithmically. Only set this or `photon_bin_edges`. photon_bin_edges (np.array, optional): The array of photon bin edges. Only set this or `num_photon_bins` num_interp_points (int, optional): The number of interpolation points used to integrate over for each new photon bin. Default is 20. interp_kind (str, optional): The kind of interpolation to be passed to scipy.interp1d. Default is 'linear'. Returns: (:class:`ResponseMatrix`) """ if (num_photon_bins is None) and (photon_bin_edges is None): raise ValueError('Either num_photon_bins or photon_bin_edges must' \ ' be set') elif num_photon_bins is not None: try: num_photon_bins = int(num_photon_bins) assert num_photon_bins > 0 except: raise TypeError('num_photon_bins must be a positive integer') photon_bin_edges = np.geomspace(self._emin[0], self._emax[-1], num_photon_bins+1) elif photon_bin_edges is not None: try: photon_bin_edges = np.asarray(photon_bin_edges) except: raise TypeError('photon_bin_edges must be an iterable') badmask = (photon_bin_edges < self._emin[0]) | \ (photon_bin_edges > self._emax[-1]) if badmask.sum() > 0: raise ValueError('photon_bin_edges are beyond valid range of '\ '{0:.2f}-{1:.2f}'.format(self._emin[0], self._emax[-1])) photon_bin_edges = np.sort(photon_bin_edges) num_photon_bins = photon_bin_edges.size-1 # differential effective area diff_effarea = self._matrix/self.photon_bin_widths[:,np.newaxis] # create an interpolation array over each new bin photon_bounds = np.vstack((photon_bin_edges[:-1], photon_bin_edges[1:])) interp_arr = np.geomspace(*photon_bounds, num_interp_points+2, axis=0) # create the DRM interpolator x = np.append(self._emin, self._emax[-1]) y = np.vstack((diff_effarea, diff_effarea[-1,:])) y[y == 0] = 1e-10 effarea_interpolator = interp1d(x, np.log10(y), axis=0, kind=interp_kind, fill_value='extrapolate') # interpolate the DRM over the photon interpolation array diff_effarea_interp = 10.**effarea_interpolator(interp_arr) badmask = np.isnan(diff_effarea_interp) diff_effarea_interp[badmask] = 0.0 # integrate the DRM over the photon interpolation array diff_effarea_new = trapezoid(diff_effarea_interp, interp_arr[:,:,np.newaxis], axis=0) # scale by ratio of photon bins drm = diff_effarea_new / (self.num_ebins/num_photon_bins) # create response object obj = type(self)(drm, photon_bin_edges[:-1], photon_bin_edges[1:], self._chanlo, self._chanhi) return obj
def __repr__(self): return '<ResponseMatrix: {0} energy bins; {1} ' \ 'channels>'.format(self.num_ebins, self.num_chans)