# CONTAINS TECHNICAL DATA/COMPUTER SOFTWARE DELIVERED TO THE U.S. GOVERNMENT WITH UNLIMITED RIGHTS
#
# Contract No.: CA 80MSFC17M0022
# Contractor Name: Universities Space Research Association
# Contractor Address: 7178 Columbia Gateway Drive, Columbia, MD 21046
#
# Copyright 2017-2022 by Universities Space Research Association (USRA). All rights reserved.
#
# Developed by: William Cleveland and Adam Goldstein
# Universities Space Research Association
# Science and Technology Institute
# https://sti.usra.edu
#
# Developed by: Daniel Kocevski
# National Aeronautics and Space Administration (NASA)
# Marshall Space Flight Center
# Astrophysics Branch (ST-12)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing permissions and limitations under the
# License.
#
import numpy as np
from gdt.core.data_primitives import TimeEnergyBins,TimeChannelBins,EnergyBins,ChannelBins, Gti, Ebounds
__all__ = ['BackgroundRates', 'BackgroundSpectrum','BackgroundChannelRates','BackgroundChannelSpectrum']
[docs]class BackgroundRates(TimeEnergyBins):
"""Class containing the background rate data.
Parameters:
rates (np.array): The array of background rates in each bin
rate_uncertainty (np.array): The array of background rate uncertainties
in each bin
tstart (np.array): The low-value edges of the time bins
tstop (np.array): The high-value edges of the time bins
emin (np.array): The low-value edges of the energy bins
emax (np.array): The high-value edges of the energy bins
exposure (np.array, optional): The exposure of each bin
"""
def __init__(self, rates, rate_uncertainty, tstart, tstop, emin, emax,
exposure=None):
try:
iter(rates)
rates = np.asarray(rates)
except:
raise TypeError('rates must be an iterable')
if rates.ndim != 2:
raise TypeError('rates must be a 2-dimensional array')
try:
iter(rate_uncertainty)
rate_uncertainty = np.asarray(rate_uncertainty)
except:
raise TypeError('rate_uncertainty must be an iterable')
if rate_uncertainty.ndim != 2:
raise TypeError('rate_uncertainty must be a 2-dimensional array')
if rate_uncertainty.shape != rates.shape:
raise TypeError('rate_uncertainty must be the same shape as rates')
if exposure is None:
exposure = np.zeros_like(tstart)
else:
exposure = np.asarray(exposure)
counts = np.squeeze(rates * exposure[:, np.newaxis])
if counts.ndim == 1:
counts = counts.reshape(len(tstart), len(emin))
super().__init__(counts, tstart, tstop, exposure, emin, emax)
self._count_uncertainty = np.squeeze(rate_uncertainty * exposure[:, np.newaxis])
self._rates = rates.squeeze()
self._rate_uncertainty = rate_uncertainty.squeeze()
@property
def count_uncertainty(self):
"""(np.array): The counts uncertainty in each bin"""
return self._count_uncertainty
@property
def rate_uncertainty(self):
"""(np.array): The rate uncertainty in each bin"""
return self._rate_uncertainty
@property
def rates(self):
"""(np.array): The rates in each Time-Energy Bin"""
return self._rates
[docs] def integrate_energy(self, emin=None, emax=None):
"""Integrate the over the energy axis.
Limits on the integration smaller than the full range can be set.
Args:
emin (float, optional): The low end of the integration range. If not
set, uses the lowest energy edge of the histogram
emax (float, optional): The high end of the integration range. If not
set, uses the highest energy edge of the histogram
Returns:
(:class:`BackgroundRates`)
"""
if emin is None:
emin = self.energy_range[0]
if emax is None:
emax = self.energy_range[1]
mask = self._slice_energy_mask(emin, emax)
emin = self.emin[mask][0]
emax = self.emax[mask][-1]
rates = np.nansum(self.rates[:, mask], axis=1).reshape(-1,1)
rate_uncert = np.sqrt(
np.nansum(self.rate_uncertainty[:, mask] ** 2, axis=1)).reshape(-1,1)
obj = BackgroundRates(rates, rate_uncert, self.tstart, self.tstop,
np.array([emin]), np.array([emax]),
exposure=self.exposure)
return obj
[docs] def integrate_time(self, tstart=None, tstop=None):
"""Integrate the background over the time axis (producing a count rate
spectrum). Limits on the integration smaller than the full range can
be set.
Args:
tstart (float, optional): The low end of the integration range.
If not set, uses the lowest time edge of the histogram
tstop (float, optional): The high end of the integration range.
If not set, uses the highest time edge of the histogram
Returns:
(:class:`BackgroundSpectrum`)
"""
if tstart is None:
tstart = self.time_range[0]
if tstop is None:
tstop = self.time_range[1]
mask = self._slice_time_mask(tstart, tstop)
exposure = np.nansum(self.exposure[mask])
rates = np.nansum(self.counts[mask, :], axis=0) / exposure
rate_uncert = np.sqrt(np.nansum(self.count_uncertainty[mask, :] ** 2,
axis=0)) / exposure
exposure = np.full(rates.size, exposure)
obj = BackgroundSpectrum(rates, rate_uncert, self.emin, self.emax,
exposure)
return obj
[docs] def rebin_energy(self, method, *args, emin=None, emax=None):
"""Not implemented for BackgroundRates"""
raise NotImplementedError
[docs] def rebin_time(self, method, *args, tstart=None, tstop=None):
"""Not implemented for BackgroundRates"""
raise NotImplementedError
[docs] def slice_energy(self, emin, emax):
"""Perform a slice over an energy range and return a new BackgroundRates
object. Note that emin and emax values that fall inside a bin will
result in that bin being included.
Args:
emin (float): The start of the slice
emax (float): The end of the slice
Returns:
(:class:`BackgroundRates`)
"""
mask = self._slice_energy_mask(emin, emax)
cls = type(self)
obj = cls(self.rates[:, mask], self.rate_uncertainty[:, mask],
self.tstart, self.tstop, self.emin[mask], self.emax[mask],
exposure=self.exposure)
return obj
[docs] def slice_time(self, tstart, tstop):
"""Perform a slice over a time range and return a new BackgroundRates
object. Note that tstart and tstop values that fall inside a bin will
result in that bin being included.
Args:
tstart (float): The start of the slice
tstop (float): The end of the slice
Returns:
(:class:`BackgroundRates`)
"""
mask = self._slice_time_mask(tstart, tstop)
cls = type(self)
obj = cls(self.rates[mask, :], self.rate_uncertainty[mask,:],
self.tstart[mask], self.tstop[mask], self.emin, self.emax,
exposure=self.exposure[mask])
return obj
[docs] def to_bak(self, time_range=None, **kwargs):
"""Integrate over the time axis and produce a BAK object
Args:
time_range ((float, float), optional):
The time range to integrate over
**kwargs: Options to pass to Bak.from_data()
Returns:
(:class:`~gdt.core.pha.Bak`)
"""
from gdt.core.pha import Bak
if time_range is None:
time_range = self.time_range
back_spec = self.integrate_time(*time_range)
gti = Gti.from_list([time_range])
bak = Bak.from_data(back_spec, gti=gti, **kwargs)
return bak
[docs] @classmethod
def merge_time(cls, histos):
"""Merge multiple BackroundRates together along the time axis.
Args:
histos (list of :class:`BackgroundRates`):
A list containing the BackgroundRates to be merged
Returns:
(:class:`BackgroundRates`)
"""
rates = np.vstack([histo.rates for histo in histos])
rate_uncertainty = np.vstack([histo.rate_uncertainty \
for histo in histos])
bins = TimeEnergyBins.merge_time(histos)
obj = cls(rates, rate_uncertainty, bins.tstart, bins.tstop,
bins.emin, bins.emax, exposure=bins.exposure)
return obj
[docs] @classmethod
def sum_time(cls, bkgds):
"""Sum multiple BackgroundRates together if they have the same time
range. Example use would be summing two backgrounds from two detectors.
Args:
bkgds (list of :class:`BackgroundRates`):
A list containing the BackgroundRates to be summed
Returns:
(:class:`BackgroundRates`)
"""
rates = np.zeros_like(bkgds[0].rates)
rates_var = np.zeros_like(bkgds[0].rates)
for bkgd in bkgds:
assert bkgd.num_times == bkgds[0].num_times, \
"The backgrounds must all have the same support"
rates += bkgd.rates
rates_var += bkgd.rate_uncertainty ** 2
ebounds = Ebounds.from_bounds(bkgds[0].emin, bkgds[0].emax)
for bkgd in bkgds[1:]:
eb = Ebounds.from_bounds(bkgd.emin, bkgd.emax)
ebounds = Ebounds.merge(ebounds, eb)
# averaged exposure, sampling times
exposure = np.mean([bkgd.exposure for bkgd in bkgds], axis=0)
tstart = np.mean([bkgd.tstart for bkgd in bkgds], axis=0)
tstop = np.mean([bkgd.tstop for bkgd in bkgds], axis=0)
emin = np.array(ebounds.low_edges())
emax = np.array(ebounds.high_edges())
# ensure that the rates array has 2 dimensions (time, energy)
if rates.ndim == 1:
rates = rates.reshape(-1, 1)
rates_var = rates_var.reshape(-1, 1)
sum_bkgd = cls(rates, np.sqrt(rates_var), tstart, tstop, emin, emax,
exposure=exposure)
return sum_bkgd
[docs]class BackgroundSpectrum(EnergyBins):
"""A class defining a Background Spectrum.
Parameters:
rates (np.array): The array of background rates in each bin
rate_uncertainty (np.array): The array of background rate uncertainties
in each bin
lo_edges (np.array): The low-value edges of the bins
hi_edges (np.array): The high-value edges of the bins
exposure (np.array): The exposure of each bin
"""
def __init__(self, rates, rate_uncertainty, lo_edges, hi_edges, exposure):
try:
iter(rates)
rates = np.asarray(rates)
except:
raise TypeError('rates must be an iterable')
try:
iter(rate_uncertainty)
rate_uncertainty = np.asarray(rate_uncertainty)
except:
raise TypeError('rate_uncertainty must be an iterable')
if rate_uncertainty.shape != rates.shape:
raise TypeError('rate_uncertainty must be the same shape as rates')
counts = rates * exposure
super().__init__(counts, lo_edges, hi_edges, exposure)
self._count_uncertainty = rate_uncertainty * exposure
self._rates = rates
self._rate_uncertainty = rate_uncertainty
@property
def count_uncertainty(self):
"""(np.array): The count uncertainty in each bin"""
return self._count_uncertainty
@property
def rate_uncertainty(self):
"""(np.array): The count rate uncertainty of each bin"""
return self._rate_uncertainty
@property
def rates(self):
"""(np.array): count rate of each bin"""
return self._rates
[docs] @classmethod
def merge(cls, histos, **kwargs):
"""Not implemented for BackgroundSpectrum"""
raise NotImplementedError
[docs] def rebin(self, method, *args, emin=None, emax=None):
"""Not implemented for BackgroundSpectrum"""
raise NotImplementedError
[docs] def slice(self, emin, emax):
"""Perform a slice over an energy range and return a new
BackgroundSpectrum object. Note that the emin and emax values that fall
inside a bin will result in that bin being included.
Args:
emin (float): The low energy edge of the slice
emax (float): The high energy of the slice
Returns:
(:class:`BackgroundSpectrum`)
"""
emin_snap = self.closest_edge(emin, which='low')
emax_snap = self.closest_edge(emax, which='high')
mask = (self.lo_edges < emax_snap) & (self.hi_edges > emin_snap)
obj = self.__class__(self.rates[mask], self.rate_uncertainty[mask],
self.lo_edges[mask], self.hi_edges[mask],
self.exposure[mask])
return obj
[docs] @classmethod
def sum(cls, histos):
"""Sum multiple BackgroundSpectrums together if they have the same
energy range (support).
Args:
histos (list of :class:`BackgroundSpectrum`):
A list containing the background spectra to be summed
Returns:
(:class:`BackgroundSpectrum`)
"""
counts = np.zeros(histos[0].size)
count_variance = np.zeros(histos[0].size)
exposure = 0.0
for histo in histos:
assert histo.size == histos[0].size, \
"The histograms must all have the same size"
assert np.all(histo.lo_edges == histos[0].lo_edges), \
"The histograms must all have the same support"
counts += histo.counts
count_variance += histo.count_uncertainty**2
exposure += histo.exposure
rates = counts/exposure
rate_uncertainty = np.sqrt(count_variance)/exposure
sum_bins = cls(rates, rate_uncertainty, histos[0].lo_edges,
histos[0].hi_edges, exposure)
return sum_bins
[docs]class BackgroundChannelRates(TimeChannelBins):
"""Class containing the background rate data for non-energy calibrated data.
Parameters:
rates (np.array): The array of background rates in each bin
rate_uncertainty (np.array): The array of background rate uncertainties
in each bin
tstart (np.array): The low-value edges of the time bins
tstop (np.array): The high-value edges of the time bins
chan_nums (np.array): The channel numbers in ascending order
exposure (np.array, optional): The exposure of each bin
"""
def __init__(self, rates, rate_uncertainty, tstart, tstop, chan_nums,
exposure=None):
try:
iter(rates)
rates = np.asarray(rates)
except:
raise TypeError('rates must be an iterable')
#Removing this 2 dimensional check since it is possible for this to be a 1D array, if channel integrated
#This scenario occurs if the user is trying to create a lightcurve and integrates the channels before slicing in time
if rates.ndim > 2:
raise TypeError('rates must be a 1 or 2-dimensional array')
try:
iter(rate_uncertainty)
rate_uncertainty = np.asarray(rate_uncertainty)
except:
raise TypeError('rate_uncertainty must be an iterable')
if rate_uncertainty.ndim > 2:
raise TypeError('rate_uncertainty must be a 1 or 2-dimensional array')
if rate_uncertainty.shape != rates.shape:
raise TypeError('rate_uncertainty must be the same shape as rates')
try:
iter(chan_nums)
self._chan_nums = np.asarray(chan_nums, dtype=int).flatten()
except:
raise TypeError('chan_nums must be an iterable')
if exposure is None:
exposure = np.zeros_like(tstart)
else:
exposure = np.asarray(exposure)
#If the channels have been integrated and rates is 1d, performing the original (else case) operation
#will result in an error when trying to create the TimeChannelBins object
if rates.ndim == 1:
counts= rates * exposure
else:
counts = np.squeeze(rates * exposure[:, np.newaxis])
if counts.ndim == 1:
counts = counts.reshape(tstart.size, chan_nums.size)
super().__init__(counts, tstart, tstop, exposure, chan_nums)
self._count_uncertainty = np.squeeze(rate_uncertainty * exposure[:, np.newaxis])
self._rates = rates.squeeze()
self._rate_uncertainty = rate_uncertainty.squeeze()
@property
def count_uncertainty(self):
"""(np.array): The counts uncertainty in each bin"""
return self._count_uncertainty
@property
def rate_uncertainty(self):
"""(np.array): The rate uncertainty in each bin"""
return self._rate_uncertainty
@property
def rates(self):
"""(np.array): The rates in each Time-Energy Bin"""
return self._rates
[docs] def integrate_channels(self, chan_min=None, chan_max=None):
"""Integrate the rates over the energy channels.
Limits on the integration smaller than the full range of channels can be set.
Args:
chan_min (int, optional):
The low end of the integration range (inclusive).
If not set, uses the lowest energy channel of the histogram
chan_max (int, optional):
The high end of the integration range (inclusive).
If not set, uses the highest energy channel of the histogram
Returns:
(:class:`BackgroundChannelRates`)
"""
if chan_min is None:
chan_min = self.chan_nums[0]
if chan_max is None:
chan_max = self.chan_nums[-1]
mask = (self.chan_nums <= chan_max) & (self.chan_nums >= chan_min)
rates = np.nansum(self.rates[:,mask], axis=1).reshape(-1,1)
rate_uncert = np.sqrt(
np.nansum(self.rate_uncertainty[:,mask] ** 2, axis=1)).reshape(-1,1)
#Returned object only has one channel, the numbering of which is arbitrary
#This makes sense since the goal is to create a bkg lightcurve
#otherwise the plotting component will kick up an error
obj = BackgroundChannelRates(rates, rate_uncert, self.tstart, self.tstop,
np.asarray([1]), exposure=self.exposure)
return obj
[docs] def integrate_time(self, tstart=None, tstop=None):
"""Integrate the background over the time axis (producing a count rate
channel spectrum). Limits on the integration smaller than the full range
can be set.
Args:
tstart (float, optional): The low end of the integration range.
If not set, uses the lowest time edge of the histogram
tstop (float, optional): The high end of the integration range.
If not set, uses the highest time edge of the histogram
Returns:
(:class:`BackgroundChannelSpectrum`)
"""
if tstart is None:
tstart = self.time_range[0]
if tstop is None:
tstop = self.time_range[1]
mask = self._slice_time_mask(tstart, tstop)
exposure = np.nansum(self.exposure[mask])
rates = np.nansum(self.counts[mask, :], axis=0) / exposure
rate_uncert = np.sqrt(np.nansum(self.count_uncertainty[mask, :] ** 2,
axis=0)) / exposure
exposure = np.full(rates.size, exposure)
obj = BackgroundChannelSpectrum(rates, rate_uncert, self.chan_nums, exposure)
return obj
[docs] def rebin_channels(self, method, *args, chan_min=None, chan_max=None):
"""Not implemented for BackgroundChannelRates"""
raise NotImplementedError
[docs] def rebin_time(self, method, *args, tstart=None, tstop=None):
"""Not implemented for BackgroundChannelRates"""
raise NotImplementedError
[docs] def slice_channels(self, chan_min, chan_max):
"""Perform a slice over the channel range and return a new BackgroundChannelRates
object. Note that chan_min and chan_max values that fall inside a bin will
result in that bin being included.
Args:
chan_min (int): The start of the slice
chan_max (int): The end of the slice
Returns:
(:class:`BackgroundChannelRates`)
"""
mask = (self.chan_nums <= chan_max) & (self.chan_nums >= chan_min)
obj = BackgroundChannelRates(self.rates[:, mask], self.rate_uncertainty[:, mask],
self.tstart, self.tstop, self.chan_nums[mask],
exposure=self.exposure)
return obj
[docs] def slice_time(self, tstart, tstop):
"""Perform a slice over a time range and return a new BackgroundChannelRates
object. Note that tstart and tstop values that fall inside a bin will
result in that bin being included.
Args:
tstart (float): The start of the slice
tstop (float): The end of the slice
Returns:
(:class:`BackgroundChannelRates`)
"""
mask = self._slice_time_mask(tstart, tstop)
cls = type(self)
if self.rate_uncertainty.ndim == 1:
obj=cls(self.rates[mask], self.rate_uncertainty[mask],
self.tstart[mask], self.tstop[mask], self.chan_nums,
exposure=self.exposure[mask])
else:
obj = cls(self.rates[mask, :], self.rate_uncertainty[mask,:],
self.tstart[mask], self.tstop[mask], self.chan_nums,
exposure=self.exposure[mask])
return obj
[docs] def to_bak(self, time_range=None, **kwargs):
"""Not implemented for BackgroundChannelRates"""
raise NotImplementedError("No Energy Calibration")
[docs] @classmethod
def merge_time(cls, histos):
"""Merge multiple BackroundChannelRates together along the time axis.
Args:
histos (list of :class:`BackgroundChannelRates`):
A list containing the BackgroundChannelRates to be merged
Returns:
(:class:`BackgroundChannelRates`)
"""
rates = np.vstack([histo.rates for histo in histos])
rate_uncertainty = np.vstack([histo.rate_uncertainty \
for histo in histos])
bins = TimeChannelBins.merge_time(histos)
obj = cls(rates, rate_uncertainty, bins.tstart, bins.tstop,
bins.chan_nums, exposure=bins.exposure)
return obj
[docs] @classmethod
def sum_time(cls, bkgds):
"""Sum multiple BackgroundChannelRates together if they have the same time
range. Example use would be summing two backgrounds from two detectors.
Args:
bkgds (list of :class:`BackgroundChannelRates`):
A list containing the BackgroundChannelRates to be summed
Returns:
(:class:`BackgroundChannelRates`)
"""
rates = np.zeros_like(bkgds[0].rates)
rates_var = np.zeros_like(bkgds[0].rates)
for bkgd in bkgds:
assert bkgd.num_times == bkgds[0].num_times, \
"The backgrounds must all have the same support"
rates += bkgd.rates
rates_var += bkgd.rate_uncertainty ** 2
chan_nums = bkgds[0].chan_nums
for bkgd in bkgds[1:]:
c_n= bkgd.chan_nums
for chan in c_n:
if chan not in chan_nums:
chan_nums = np.append(chan_nums,chan)
chan_nums = np.sort(chan_nums)
# averaged exposure, sampling times
exposure = np.mean([bkgd.exposure for bkgd in bkgds], axis=0)
tstart = np.mean([bkgd.tstart for bkgd in bkgds], axis=0)
tstop = np.mean([bkgd.tstop for bkgd in bkgds], axis=0)
sum_bkgd = cls(rates, np.sqrt(rates_var), tstart, tstop, chan_nums,
exposure=exposure)
return sum_bkgd
[docs]class BackgroundChannelSpectrum(ChannelBins):
"""A class defining a Background Channel Spectrum.
Parameters:
rates (np.array): The array of background rates in each bin
rate_uncertainty (np.array): The array of background rate uncertainties
in each bin
chan_nums (np.array): The channel numbers in ascending order
exposure (np.array): The exposure of each bin
"""
def __init__(self, rates, rate_uncertainty, chan_nums, exposure):
try:
iter(rates)
rates = np.asarray(rates)
except:
raise TypeError('rates must be an iterable')
try:
iter(rate_uncertainty)
rate_uncertainty = np.asarray(rate_uncertainty)
except:
raise TypeError('rate_uncertainty must be an iterable')
if rate_uncertainty.shape != rates.shape:
raise TypeError('rate_uncertainty must be the same shape as rates')
try:
iter(chan_nums)
self._chan_nums = np.asarray(chan_nums, dtype=int).flatten()
except:
raise TypeError('chan_nums must be an iterable')
try:
iter(exposure)
except:
exposure = np.full(rates.shape, exposure)
counts = rates * exposure
super().__init__(counts,self._chan_nums, self._chan_nums+1, exposure)
self._count_uncertainty = rate_uncertainty * exposure
self._rates = rates
self._rate_uncertainty = rate_uncertainty
@property
def count_uncertainty(self):
"""(np.array): The count uncertainty in each bin"""
return self._count_uncertainty
@property
def rate_uncertainty(self):
"""(np.array): The count rate uncertainty of each bin"""
return self._rate_uncertainty
@property
def rates(self):
"""(np.array): count rate of each bin"""
return self._rates
[docs] @classmethod
def merge(cls, histos, **kwargs):
"""Not implemented for BackgroundChannelSpectrum"""
raise NotImplementedError
[docs] def rebin(self, method, *args, emin=None, emax=None):
"""Not implemented for BackgroundChannelSpectrum"""
raise NotImplementedError
[docs] def slice(self, chan_min, chan_max):
"""Perform a slice over an energy range and return a new
BackgroundChannelSpectrum object. Note inclusive of chan_min and chan_max.
Args:
chan_min (int): Minimum channel number to be included
chan_max (int): Maximum channel number to be included
Returns:
(:class:`BackgroundChannelSpectrum`)
"""
mask = (self.chan_nums <= chan_max) & (self.chan_nums >= chan_min)
obj = self.__class__(self.rates[mask], self.rate_uncertainty[mask],
self.chan_nums[mask], self.exposure[mask])
return obj
[docs] @classmethod
def sum(cls, histos):
"""Sum multiple BackgroundChannelSpectrums together if they have the same
channel range (support).
Args:
histos (list of :class:`BackgroundChannelSpectrum`):
A list containing the background channel spectra to be summed
Returns:
(:class:`BackgroundChannelSpectrum`)
"""
counts = np.zeros(histos[0].size)
count_variance = np.zeros(histos[0].size)
exposure = 0.0
for histo in histos:
assert histo.size == histos[0].size, \
"The histograms must all have the same size"
assert np.all(histo.chan_nums == histos[0].chan_nums), \
"The histograms must all have the same support"
counts += histo.counts
count_variance += histo.count_uncertainty**2
exposure += histo.exposure
rates = counts/exposure
rate_uncertainty = np.sqrt(count_variance)/exposure
sum_bins = cls(rates, rate_uncertainty, histos[0].chan_nums, exposure)
return sum_bins