Source code for gdt.core.background.primitives

# CONTAINS TECHNICAL DATA/COMPUTER SOFTWARE DELIVERED TO THE U.S. GOVERNMENT WITH UNLIMITED RIGHTS
#
# Contract No.: CA 80MSFC17M0022
# Contractor Name: Universities Space Research Association
# Contractor Address: 7178 Columbia Gateway Drive, Columbia, MD 21046
#
# Copyright 2017-2022 by Universities Space Research Association (USRA). All rights reserved.
#
# Developed by: William Cleveland and Adam Goldstein
#               Universities Space Research Association
#               Science and Technology Institute
#               https://sti.usra.edu
#
# Developed by: Daniel Kocevski
#               National Aeronautics and Space Administration (NASA)
#               Marshall Space Flight Center
#               Astrophysics Branch (ST-12)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing permissions and limitations under the
# License.
#
import numpy as np
from gdt.core.data_primitives import TimeEnergyBins,TimeChannelBins,EnergyBins,ChannelBins, Gti, Ebounds

__all__ = ['BackgroundRates', 'BackgroundSpectrum','BackgroundChannelRates','BackgroundChannelSpectrum']


[docs]class BackgroundRates(TimeEnergyBins): """Class containing the background rate data. Parameters: rates (np.array): The array of background rates in each bin rate_uncertainty (np.array): The array of background rate uncertainties in each bin tstart (np.array): The low-value edges of the time bins tstop (np.array): The high-value edges of the time bins emin (np.array): The low-value edges of the energy bins emax (np.array): The high-value edges of the energy bins exposure (np.array, optional): The exposure of each bin """ def __init__(self, rates, rate_uncertainty, tstart, tstop, emin, emax, exposure=None): try: iter(rates) rates = np.asarray(rates) except: raise TypeError('rates must be an iterable') if rates.ndim != 2: raise TypeError('rates must be a 2-dimensional array') try: iter(rate_uncertainty) rate_uncertainty = np.asarray(rate_uncertainty) except: raise TypeError('rate_uncertainty must be an iterable') if rate_uncertainty.ndim != 2: raise TypeError('rate_uncertainty must be a 2-dimensional array') if rate_uncertainty.shape != rates.shape: raise TypeError('rate_uncertainty must be the same shape as rates') if exposure is None: exposure = np.zeros_like(tstart) else: exposure = np.asarray(exposure) counts = np.squeeze(rates * exposure[:, np.newaxis]) if counts.ndim == 1: counts = counts.reshape(len(tstart), len(emin)) super().__init__(counts, tstart, tstop, exposure, emin, emax) self._count_uncertainty = np.squeeze(rate_uncertainty * exposure[:, np.newaxis]) self._rates = rates.squeeze() self._rate_uncertainty = rate_uncertainty.squeeze() @property def count_uncertainty(self): """(np.array): The counts uncertainty in each bin""" return self._count_uncertainty @property def rate_uncertainty(self): """(np.array): The rate uncertainty in each bin""" return self._rate_uncertainty @property def rates(self): """(np.array): The rates in each Time-Energy Bin""" return self._rates
[docs] def integrate_energy(self, emin=None, emax=None): """Integrate the over the energy axis. Limits on the integration smaller than the full range can be set. Args: emin (float, optional): The low end of the integration range. If not set, uses the lowest energy edge of the histogram emax (float, optional): The high end of the integration range. If not set, uses the highest energy edge of the histogram Returns: (:class:`BackgroundRates`) """ if emin is None: emin = self.energy_range[0] if emax is None: emax = self.energy_range[1] mask = self._slice_energy_mask(emin, emax) emin = self.emin[mask][0] emax = self.emax[mask][-1] rates = np.nansum(self.rates[:, mask], axis=1).reshape(-1,1) rate_uncert = np.sqrt( np.nansum(self.rate_uncertainty[:, mask] ** 2, axis=1)).reshape(-1,1) obj = BackgroundRates(rates, rate_uncert, self.tstart, self.tstop, np.array([emin]), np.array([emax]), exposure=self.exposure) return obj
[docs] def integrate_time(self, tstart=None, tstop=None): """Integrate the background over the time axis (producing a count rate spectrum). Limits on the integration smaller than the full range can be set. Args: tstart (float, optional): The low end of the integration range. If not set, uses the lowest time edge of the histogram tstop (float, optional): The high end of the integration range. If not set, uses the highest time edge of the histogram Returns: (:class:`BackgroundSpectrum`) """ if tstart is None: tstart = self.time_range[0] if tstop is None: tstop = self.time_range[1] mask = self._slice_time_mask(tstart, tstop) exposure = np.nansum(self.exposure[mask]) rates = np.nansum(self.counts[mask, :], axis=0) / exposure rate_uncert = np.sqrt(np.nansum(self.count_uncertainty[mask, :] ** 2, axis=0)) / exposure exposure = np.full(rates.size, exposure) obj = BackgroundSpectrum(rates, rate_uncert, self.emin, self.emax, exposure) return obj
[docs] def rebin_energy(self, method, *args, emin=None, emax=None): """Not implemented for BackgroundRates""" raise NotImplementedError
[docs] def rebin_time(self, method, *args, tstart=None, tstop=None): """Not implemented for BackgroundRates""" raise NotImplementedError
[docs] def slice_energy(self, emin, emax): """Perform a slice over an energy range and return a new BackgroundRates object. Note that emin and emax values that fall inside a bin will result in that bin being included. Args: emin (float): The start of the slice emax (float): The end of the slice Returns: (:class:`BackgroundRates`) """ mask = self._slice_energy_mask(emin, emax) cls = type(self) obj = cls(self.rates[:, mask], self.rate_uncertainty[:, mask], self.tstart, self.tstop, self.emin[mask], self.emax[mask], exposure=self.exposure) return obj
[docs] def slice_time(self, tstart, tstop): """Perform a slice over a time range and return a new BackgroundRates object. Note that tstart and tstop values that fall inside a bin will result in that bin being included. Args: tstart (float): The start of the slice tstop (float): The end of the slice Returns: (:class:`BackgroundRates`) """ mask = self._slice_time_mask(tstart, tstop) cls = type(self) obj = cls(self.rates[mask, :], self.rate_uncertainty[mask,:], self.tstart[mask], self.tstop[mask], self.emin, self.emax, exposure=self.exposure[mask]) return obj
[docs] def to_bak(self, time_range=None, **kwargs): """Integrate over the time axis and produce a BAK object Args: time_range ((float, float), optional): The time range to integrate over **kwargs: Options to pass to Bak.from_data() Returns: (:class:`~gdt.core.pha.Bak`) """ from gdt.core.pha import Bak if time_range is None: time_range = self.time_range back_spec = self.integrate_time(*time_range) gti = Gti.from_list([time_range]) bak = Bak.from_data(back_spec, gti=gti, **kwargs) return bak
[docs] @classmethod def merge_time(cls, histos): """Merge multiple BackroundRates together along the time axis. Args: histos (list of :class:`BackgroundRates`): A list containing the BackgroundRates to be merged Returns: (:class:`BackgroundRates`) """ rates = np.vstack([histo.rates for histo in histos]) rate_uncertainty = np.vstack([histo.rate_uncertainty \ for histo in histos]) bins = TimeEnergyBins.merge_time(histos) obj = cls(rates, rate_uncertainty, bins.tstart, bins.tstop, bins.emin, bins.emax, exposure=bins.exposure) return obj
[docs] @classmethod def sum_time(cls, bkgds): """Sum multiple BackgroundRates together if they have the same time range. Example use would be summing two backgrounds from two detectors. Args: bkgds (list of :class:`BackgroundRates`): A list containing the BackgroundRates to be summed Returns: (:class:`BackgroundRates`) """ rates = np.zeros_like(bkgds[0].rates) rates_var = np.zeros_like(bkgds[0].rates) for bkgd in bkgds: assert bkgd.num_times == bkgds[0].num_times, \ "The backgrounds must all have the same support" rates += bkgd.rates rates_var += bkgd.rate_uncertainty ** 2 ebounds = Ebounds.from_bounds(bkgds[0].emin, bkgds[0].emax) for bkgd in bkgds[1:]: eb = Ebounds.from_bounds(bkgd.emin, bkgd.emax) ebounds = Ebounds.merge(ebounds, eb) # averaged exposure, sampling times exposure = np.mean([bkgd.exposure for bkgd in bkgds], axis=0) tstart = np.mean([bkgd.tstart for bkgd in bkgds], axis=0) tstop = np.mean([bkgd.tstop for bkgd in bkgds], axis=0) emin = np.array(ebounds.low_edges()) emax = np.array(ebounds.high_edges()) # ensure that the rates array has 2 dimensions (time, energy) if rates.ndim == 1: rates = rates.reshape(-1, 1) rates_var = rates_var.reshape(-1, 1) sum_bkgd = cls(rates, np.sqrt(rates_var), tstart, tstop, emin, emax, exposure=exposure) return sum_bkgd
[docs]class BackgroundSpectrum(EnergyBins): """A class defining a Background Spectrum. Parameters: rates (np.array): The array of background rates in each bin rate_uncertainty (np.array): The array of background rate uncertainties in each bin lo_edges (np.array): The low-value edges of the bins hi_edges (np.array): The high-value edges of the bins exposure (np.array): The exposure of each bin """ def __init__(self, rates, rate_uncertainty, lo_edges, hi_edges, exposure): try: iter(rates) rates = np.asarray(rates) except: raise TypeError('rates must be an iterable') try: iter(rate_uncertainty) rate_uncertainty = np.asarray(rate_uncertainty) except: raise TypeError('rate_uncertainty must be an iterable') if rate_uncertainty.shape != rates.shape: raise TypeError('rate_uncertainty must be the same shape as rates') counts = rates * exposure super().__init__(counts, lo_edges, hi_edges, exposure) self._count_uncertainty = rate_uncertainty * exposure self._rates = rates self._rate_uncertainty = rate_uncertainty @property def count_uncertainty(self): """(np.array): The count uncertainty in each bin""" return self._count_uncertainty @property def rate_uncertainty(self): """(np.array): The count rate uncertainty of each bin""" return self._rate_uncertainty @property def rates(self): """(np.array): count rate of each bin""" return self._rates
[docs] @classmethod def merge(cls, histos, **kwargs): """Not implemented for BackgroundSpectrum""" raise NotImplementedError
[docs] def rebin(self, method, *args, emin=None, emax=None): """Not implemented for BackgroundSpectrum""" raise NotImplementedError
[docs] def slice(self, emin, emax): """Perform a slice over an energy range and return a new BackgroundSpectrum object. Note that the emin and emax values that fall inside a bin will result in that bin being included. Args: emin (float): The low energy edge of the slice emax (float): The high energy of the slice Returns: (:class:`BackgroundSpectrum`) """ emin_snap = self.closest_edge(emin, which='low') emax_snap = self.closest_edge(emax, which='high') mask = (self.lo_edges < emax_snap) & (self.hi_edges > emin_snap) obj = self.__class__(self.rates[mask], self.rate_uncertainty[mask], self.lo_edges[mask], self.hi_edges[mask], self.exposure[mask]) return obj
[docs] @classmethod def sum(cls, histos): """Sum multiple BackgroundSpectrums together if they have the same energy range (support). Args: histos (list of :class:`BackgroundSpectrum`): A list containing the background spectra to be summed Returns: (:class:`BackgroundSpectrum`) """ counts = np.zeros(histos[0].size) count_variance = np.zeros(histos[0].size) exposure = 0.0 for histo in histos: assert histo.size == histos[0].size, \ "The histograms must all have the same size" assert np.all(histo.lo_edges == histos[0].lo_edges), \ "The histograms must all have the same support" counts += histo.counts count_variance += histo.count_uncertainty**2 exposure += histo.exposure rates = counts/exposure rate_uncertainty = np.sqrt(count_variance)/exposure sum_bins = cls(rates, rate_uncertainty, histos[0].lo_edges, histos[0].hi_edges, exposure) return sum_bins
[docs]class BackgroundChannelRates(TimeChannelBins): """Class containing the background rate data for non-energy calibrated data. Parameters: rates (np.array): The array of background rates in each bin rate_uncertainty (np.array): The array of background rate uncertainties in each bin tstart (np.array): The low-value edges of the time bins tstop (np.array): The high-value edges of the time bins chan_nums (np.array): The channel numbers in ascending order exposure (np.array, optional): The exposure of each bin """ def __init__(self, rates, rate_uncertainty, tstart, tstop, chan_nums, exposure=None): try: iter(rates) rates = np.asarray(rates) except: raise TypeError('rates must be an iterable') #Removing this 2 dimensional check since it is possible for this to be a 1D array, if channel integrated #This scenario occurs if the user is trying to create a lightcurve and integrates the channels before slicing in time if rates.ndim > 2: raise TypeError('rates must be a 1 or 2-dimensional array') try: iter(rate_uncertainty) rate_uncertainty = np.asarray(rate_uncertainty) except: raise TypeError('rate_uncertainty must be an iterable') if rate_uncertainty.ndim > 2: raise TypeError('rate_uncertainty must be a 1 or 2-dimensional array') if rate_uncertainty.shape != rates.shape: raise TypeError('rate_uncertainty must be the same shape as rates') try: iter(chan_nums) self._chan_nums = np.asarray(chan_nums, dtype=int).flatten() except: raise TypeError('chan_nums must be an iterable') if exposure is None: exposure = np.zeros_like(tstart) else: exposure = np.asarray(exposure) #If the channels have been integrated and rates is 1d, performing the original (else case) operation #will result in an error when trying to create the TimeChannelBins object if rates.ndim == 1: counts= rates * exposure else: counts = np.squeeze(rates * exposure[:, np.newaxis]) if counts.ndim == 1: counts = counts.reshape(tstart.size, chan_nums.size) super().__init__(counts, tstart, tstop, exposure, chan_nums) self._count_uncertainty = np.squeeze(rate_uncertainty * exposure[:, np.newaxis]) self._rates = rates.squeeze() self._rate_uncertainty = rate_uncertainty.squeeze() @property def count_uncertainty(self): """(np.array): The counts uncertainty in each bin""" return self._count_uncertainty @property def rate_uncertainty(self): """(np.array): The rate uncertainty in each bin""" return self._rate_uncertainty @property def rates(self): """(np.array): The rates in each Time-Energy Bin""" return self._rates
[docs] def integrate_channels(self, chan_min=None, chan_max=None): """Integrate the rates over the energy channels. Limits on the integration smaller than the full range of channels can be set. Args: chan_min (int, optional): The low end of the integration range (inclusive). If not set, uses the lowest energy channel of the histogram chan_max (int, optional): The high end of the integration range (inclusive). If not set, uses the highest energy channel of the histogram Returns: (:class:`BackgroundChannelRates`) """ if chan_min is None: chan_min = self.chan_nums[0] if chan_max is None: chan_max = self.chan_nums[-1] mask = (self.chan_nums <= chan_max) & (self.chan_nums >= chan_min) rates = np.nansum(self.rates[:,mask], axis=1).reshape(-1,1) rate_uncert = np.sqrt( np.nansum(self.rate_uncertainty[:,mask] ** 2, axis=1)).reshape(-1,1) #Returned object only has one channel, the numbering of which is arbitrary #This makes sense since the goal is to create a bkg lightcurve #otherwise the plotting component will kick up an error obj = BackgroundChannelRates(rates, rate_uncert, self.tstart, self.tstop, np.asarray([1]), exposure=self.exposure) return obj
[docs] def integrate_time(self, tstart=None, tstop=None): """Integrate the background over the time axis (producing a count rate channel spectrum). Limits on the integration smaller than the full range can be set. Args: tstart (float, optional): The low end of the integration range. If not set, uses the lowest time edge of the histogram tstop (float, optional): The high end of the integration range. If not set, uses the highest time edge of the histogram Returns: (:class:`BackgroundChannelSpectrum`) """ if tstart is None: tstart = self.time_range[0] if tstop is None: tstop = self.time_range[1] mask = self._slice_time_mask(tstart, tstop) exposure = np.nansum(self.exposure[mask]) rates = np.nansum(self.counts[mask, :], axis=0) / exposure rate_uncert = np.sqrt(np.nansum(self.count_uncertainty[mask, :] ** 2, axis=0)) / exposure exposure = np.full(rates.size, exposure) obj = BackgroundChannelSpectrum(rates, rate_uncert, self.chan_nums, exposure) return obj
[docs] def rebin_channels(self, method, *args, chan_min=None, chan_max=None): """Not implemented for BackgroundChannelRates""" raise NotImplementedError
[docs] def rebin_time(self, method, *args, tstart=None, tstop=None): """Not implemented for BackgroundChannelRates""" raise NotImplementedError
[docs] def slice_channels(self, chan_min, chan_max): """Perform a slice over the channel range and return a new BackgroundChannelRates object. Note that chan_min and chan_max values that fall inside a bin will result in that bin being included. Args: chan_min (int): The start of the slice chan_max (int): The end of the slice Returns: (:class:`BackgroundChannelRates`) """ mask = (self.chan_nums <= chan_max) & (self.chan_nums >= chan_min) obj = BackgroundChannelRates(self.rates[:, mask], self.rate_uncertainty[:, mask], self.tstart, self.tstop, self.chan_nums[mask], exposure=self.exposure) return obj
[docs] def slice_time(self, tstart, tstop): """Perform a slice over a time range and return a new BackgroundChannelRates object. Note that tstart and tstop values that fall inside a bin will result in that bin being included. Args: tstart (float): The start of the slice tstop (float): The end of the slice Returns: (:class:`BackgroundChannelRates`) """ mask = self._slice_time_mask(tstart, tstop) cls = type(self) if self.rate_uncertainty.ndim == 1: obj=cls(self.rates[mask], self.rate_uncertainty[mask], self.tstart[mask], self.tstop[mask], self.chan_nums, exposure=self.exposure[mask]) else: obj = cls(self.rates[mask, :], self.rate_uncertainty[mask,:], self.tstart[mask], self.tstop[mask], self.chan_nums, exposure=self.exposure[mask]) return obj
[docs] def to_bak(self, time_range=None, **kwargs): """Not implemented for BackgroundChannelRates""" raise NotImplementedError("No Energy Calibration")
[docs] @classmethod def merge_time(cls, histos): """Merge multiple BackroundChannelRates together along the time axis. Args: histos (list of :class:`BackgroundChannelRates`): A list containing the BackgroundChannelRates to be merged Returns: (:class:`BackgroundChannelRates`) """ rates = np.vstack([histo.rates for histo in histos]) rate_uncertainty = np.vstack([histo.rate_uncertainty \ for histo in histos]) bins = TimeChannelBins.merge_time(histos) obj = cls(rates, rate_uncertainty, bins.tstart, bins.tstop, bins.chan_nums, exposure=bins.exposure) return obj
[docs] @classmethod def sum_time(cls, bkgds): """Sum multiple BackgroundChannelRates together if they have the same time range. Example use would be summing two backgrounds from two detectors. Args: bkgds (list of :class:`BackgroundChannelRates`): A list containing the BackgroundChannelRates to be summed Returns: (:class:`BackgroundChannelRates`) """ rates = np.zeros_like(bkgds[0].rates) rates_var = np.zeros_like(bkgds[0].rates) for bkgd in bkgds: assert bkgd.num_times == bkgds[0].num_times, \ "The backgrounds must all have the same support" rates += bkgd.rates rates_var += bkgd.rate_uncertainty ** 2 chan_nums = bkgds[0].chan_nums for bkgd in bkgds[1:]: c_n= bkgd.chan_nums for chan in c_n: if chan not in chan_nums: chan_nums = np.append(chan_nums,chan) chan_nums = np.sort(chan_nums) # averaged exposure, sampling times exposure = np.mean([bkgd.exposure for bkgd in bkgds], axis=0) tstart = np.mean([bkgd.tstart for bkgd in bkgds], axis=0) tstop = np.mean([bkgd.tstop for bkgd in bkgds], axis=0) sum_bkgd = cls(rates, np.sqrt(rates_var), tstart, tstop, chan_nums, exposure=exposure) return sum_bkgd
[docs]class BackgroundChannelSpectrum(ChannelBins): """A class defining a Background Channel Spectrum. Parameters: rates (np.array): The array of background rates in each bin rate_uncertainty (np.array): The array of background rate uncertainties in each bin chan_nums (np.array): The channel numbers in ascending order exposure (np.array): The exposure of each bin """ def __init__(self, rates, rate_uncertainty, chan_nums, exposure): try: iter(rates) rates = np.asarray(rates) except: raise TypeError('rates must be an iterable') try: iter(rate_uncertainty) rate_uncertainty = np.asarray(rate_uncertainty) except: raise TypeError('rate_uncertainty must be an iterable') if rate_uncertainty.shape != rates.shape: raise TypeError('rate_uncertainty must be the same shape as rates') try: iter(chan_nums) self._chan_nums = np.asarray(chan_nums, dtype=int).flatten() except: raise TypeError('chan_nums must be an iterable') try: iter(exposure) except: exposure = np.full(rates.shape, exposure) counts = rates * exposure super().__init__(counts,self._chan_nums, self._chan_nums+1, exposure) self._count_uncertainty = rate_uncertainty * exposure self._rates = rates self._rate_uncertainty = rate_uncertainty @property def count_uncertainty(self): """(np.array): The count uncertainty in each bin""" return self._count_uncertainty @property def rate_uncertainty(self): """(np.array): The count rate uncertainty of each bin""" return self._rate_uncertainty @property def rates(self): """(np.array): count rate of each bin""" return self._rates
[docs] @classmethod def merge(cls, histos, **kwargs): """Not implemented for BackgroundChannelSpectrum""" raise NotImplementedError
[docs] def rebin(self, method, *args, emin=None, emax=None): """Not implemented for BackgroundChannelSpectrum""" raise NotImplementedError
[docs] def slice(self, chan_min, chan_max): """Perform a slice over an energy range and return a new BackgroundChannelSpectrum object. Note inclusive of chan_min and chan_max. Args: chan_min (int): Minimum channel number to be included chan_max (int): Maximum channel number to be included Returns: (:class:`BackgroundChannelSpectrum`) """ mask = (self.chan_nums <= chan_max) & (self.chan_nums >= chan_min) obj = self.__class__(self.rates[mask], self.rate_uncertainty[mask], self.chan_nums[mask], self.exposure[mask]) return obj
[docs] @classmethod def sum(cls, histos): """Sum multiple BackgroundChannelSpectrums together if they have the same channel range (support). Args: histos (list of :class:`BackgroundChannelSpectrum`): A list containing the background channel spectra to be summed Returns: (:class:`BackgroundChannelSpectrum`) """ counts = np.zeros(histos[0].size) count_variance = np.zeros(histos[0].size) exposure = 0.0 for histo in histos: assert histo.size == histos[0].size, \ "The histograms must all have the same size" assert np.all(histo.chan_nums == histos[0].chan_nums), \ "The histograms must all have the same support" counts += histo.counts count_variance += histo.count_uncertainty**2 exposure += histo.exposure rates = counts/exposure rate_uncertainty = np.sqrt(count_variance)/exposure sum_bins = cls(rates, rate_uncertainty, histos[0].chan_nums, exposure) return sum_bins