# CONTAINS TECHNICAL DATA/COMPUTER SOFTWARE DELIVERED TO THE U.S. GOVERNMENT WITH UNLIMITED RIGHTS
#
# Contract Nos.: CA 80MSFC17M0022 / 80NSSC24M0035
# Contractor Name: Universities Space Research Association
# Contractor Address: 7178 Columbia Gateway Drive, Columbia, MD 21046
#
# Copyright 2017-2022 by Universities Space Research Association (USRA). All rights reserved.
#
# Developed by: William Cleveland and Adam Goldstein
# Universities Space Research Association
# Science and Technology Institute
# https://sti.usra.edu
#
# Developed by: Daniel Kocevski
# National Aeronautics and Space Administration (NASA)
# Marshall Space Flight Center
# Astrophysics Branch (ST-12)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing permissions and limitations under the
# License.
#
import numpy as np
from .intervals import Ebounds
__all__ = ['Bins', 'ExposureBins', 'ChannelBins', 'TimeBins',
'EnergyBins', 'TimeChannelBins', 'TimeEnergyBins']
[docs]class Bins():
"""A primitive class defining a set of histogram bins.
The number of counts in each bin are assumed to be represented by a Poisson
distribution unless the `count_uncerts` argument is specified with the
defined count uncertainties.
Parameters:
counts (np.array): The array of counts in each bin
lo_edges (np.array): The low-value edges of the bins
hi_edges (np.array): The high-value edges of the bins
count_uncerts (np.array, optional): An array the same length as `counts`
if the uncertainty is not Poisson.
"""
def __init__(self, counts, lo_edges, hi_edges, count_uncerts=None):
try:
iter(counts)
self._counts = np.asarray(counts).flatten()
except:
raise TypeError('counts must be an iterable')
try:
iter(lo_edges)
self._lo_edges = np.asarray(lo_edges).flatten()
except:
raise TypeError('lo_edges must be an iterable')
try:
iter(hi_edges)
self._hi_edges = np.asarray(hi_edges).flatten()
except:
raise TypeError('hi_edges must be an iterable')
if (self._counts.size != self._lo_edges.size) or \
(self._lo_edges.size != self._hi_edges.size):
raise ValueError('counts, lo_edges, and hi_edges must all be ' \
'the same length')
if count_uncerts is None:
count_uncerts = np.sqrt(counts)
try:
iter(count_uncerts)
self._count_uncerts = np.asarray(count_uncerts)
except:
raise TypeError('count_uncerts must be an iterable')
if self._count_uncerts.size != self._counts.size:
raise ValueError('count_uncerts must be same size as counts')
@property
def centroids(self):
"""(np.array): The centroids of the bins"""
return (self.hi_edges + self.lo_edges) / 2.0
@property
def counts(self):
"""(np.array): The counts in each bin"""
return self._counts
@property
def count_uncertainty(self):
"""(np.array): The count uncertainty in each bin"""
return self._count_uncerts
@property
def hi_edges(self):
"""(np.array): The high-value edges of the bins"""
return self._hi_edges
@property
def lo_edges(self):
"""(np.array): The low-value edges of the bins"""
return self._lo_edges
@property
def range(self):
"""(float, float): The range of the bin edges"""
if self.size > 0:
return (self.lo_edges[0], self.hi_edges[-1])
@property
def rates(self):
"""(np.array): The count rate of each bin; counts/width"""
return self.counts / self.widths
@property
def rate_uncertainty(self):
"""(np.array): The count rate uncertainty of each bin"""
return self.count_uncertainty / self.widths
@property
def size(self):
"""(int): Number of bins"""
return self.lo_edges.size
@property
def widths(self):
"""(np.array): The widths of the bins"""
return self.hi_edges - self.lo_edges
[docs] def closest_edge(self, val, which='either'):
"""Return the closest bin edge
Args:
val (float): Input value
which (str, optional): Options are:
* 'either' - closest edge to val;
* 'low' - closest edge lower than val; or
* 'high' - closest edge higher than val. Default is 'either'
Returns:
(float)
"""
edges = np.concatenate((self.lo_edges, [self.hi_edges[-1]]))
idx = np.argmin(np.abs(val - edges))
if which == 'low' and (idx - 1) >= 0:
if edges[idx] > val:
idx -= 1
elif (which == 'high') and (idx + 1) < edges.size:
if edges[idx] < val:
idx += 1
else:
pass
return edges[idx]
[docs] @classmethod
def from_rates(cls, rates, rate_uncerts, lo_edges, hi_edges):
"""Create a Bins object from count rates and uncertainties.
Args:
rates (np.array): The count rates
rate_uncerts (np.array): The count rate uncertainties
lo_edges (np.array): The low-value edges of the bins
hi_edges (np.array): The high-value edges of the bins
Returns:
(:class:`Bins`)
"""
dt = hi_edges - lo_edges
counts = rates * dt
count_uncerts = rate_uncerts * dt
return cls(counts, lo_edges, hi_edges, count_uncerts=count_uncerts)
[docs] def slice(self, lo_edge, hi_edge):
"""Perform a slice over the range of the bins and return a new Bins
object. Note that lo_edge and hi_edge values that fall inside a bin will
result in that bin being included.
Args:
lo_edge (float): The start of the slice
hi_edge (float): The end of the slice
Returns:
(:class:`Bins`)
"""
lo_snap = self.closest_edge(lo_edge, which='low')
hi_snap = self.closest_edge(hi_edge, which='high')
if lo_snap == hi_snap:
mask = (self.lo_edges < hi_snap) & (self.hi_edges >= lo_snap)
else:
mask = (self.lo_edges < hi_snap) & (self.hi_edges > lo_snap)
count_uncerts = self.count_uncertainty[mask]
obj = Bins(self.counts[mask], self.lo_edges[mask], self.hi_edges[mask],
count_uncerts=count_uncerts)
return obj
def __repr__(self):
s = '<Bins: {0} bins;\n'.format(self.size)
s += ' range {0}>'.format(self.range)
return s
[docs]class ExposureBins(Bins):
"""A class defining a set of bins containing exposure information.
Parameters:
counts (np.array): The array of counts in each bin
lo_edges (np.array): The low-value edges of the bins
hi_edges (np.array): The high-value edges of the bins
exposure (np.array): The exposure of each bin
count_uncerts (np.array, optional): An array the same length as `counts`
if the uncertainty is not Poisson.
precalc_good_segments (bool, optional): If True, calculates contiguous
bin segments on initialization.
Default is True.
"""
def __init__(self, counts, lo_edges, hi_edges, exposure,
count_uncerts=None, precalc_good_segments=True):
super().__init__(counts, lo_edges, hi_edges, count_uncerts=count_uncerts)
try:
iter(exposure)
self._exposure = np.asarray(exposure).flatten()
except:
raise TypeError('exposure must be an iterable')
if (self._counts.size != self._exposure.size):
raise ValueError('exposure must be the same size as counts')
self._good_segments = None
if precalc_good_segments:
self._good_segments = self._calculate_good_segments()
@property
def exposure(self):
"""(np.array): The exposure of each bin"""
return self._exposure
@property
def rates(self):
"""(np.array): The count rate of each bin: counts/exposure"""
r = np.zeros_like(self.exposure)
mask = (self.exposure > 0.0)
r[mask] = self.counts[mask] / self.exposure[mask]
return r
@property
def rate_uncertainty(self):
"""(np.array): The count rate uncertainty of each bin"""
r = np.zeros_like(self.exposure)
mask = (self.exposure > 0.0)
r[mask] = self.count_uncertainty[mask] / self.exposure[mask]
return r
[docs] def contiguous_bins(self):
"""Return a list of ExposureBins, each one containing a continuous
segment of data. This is done by comparing the edges of each bin, and
if there is a gap between edges, the data is split into separate
ExposureBin objects, each containing a contiguous set of data.
Returns
(list of :class:`ExposureBins`)
"""
if self._good_segments is not None:
good_segments = self._good_segments
else:
good_segments = self._calculate_good_segments()
bins = [self.slice(seg[0], seg[1]) for seg in good_segments]
return bins
[docs] @classmethod
def from_rates(cls, rates, rate_uncerts, lo_edges, hi_edges, exposure,
**kwargs):
"""Create an ExposureBins object from count rates and uncertainties.
Args:
rates (np.array): The count rates
rate_uncerts (np.array): The count rate uncertainties
lo_edges (np.array): The low-value edges of the bins
hi_edges (np.array): The high-value edges of the bins
exposure (np.array): The exposure of each bin
precalc_good_segments (bool, optional): If True, calculates contiguous
bin segments on initialization.
Default is True.
Returns:
(:class:`ExposureBins`)
"""
counts = rates * exposure
count_uncerts = rate_uncerts * exposure
return cls(counts, lo_edges, hi_edges, exposure,
count_uncerts=count_uncerts, **kwargs)
[docs] def rebin(self, method, *args, tstart=None, tstop=None):
"""Rebin the ExposureBins object in given a binning function and return
a new ExposureBins object
The binning function should take as input an array of counts,
array of exposures, and an array of bin edges. Additional arguments
specific to the function are allowed. The function should return an
array of the new counts, new exposure, and new edges.
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
tstart (float, optional):
If set, defines the start time of the ExposureBins to be binned,
otherwise binning will begin at the time of the first bin edge.
tstop (float, optional):
If set, defines the end time of the ExposureBins to be binned,
otherwise binning will end at the time of the last bin edge.
Returns:
(:class:`ExposureBins`)
"""
# empty bins
empty = self.__class__([], [], [], [], count_uncerts=[])
# set the start and stop of the rebinning segment
trange = self.range
if tstart is None:
tstart = trange[0]
if tstop is None:
tstop = trange[1]
if tstart < trange[0]:
tstart = trange[0]
if tstop > trange[1]:
tstop = trange[1]
bins = self.contiguous_bins()
new_histos = []
for bin in bins:
trange = bin.range
# split the histogram into pieces so that we only rebin the piece
# that needs to be rebinned
pre = empty
post = empty
if (tstop < trange[0]) or (tstart > trange[1]):
histo = empty
elif tstop == trange[1]:
if tstart > trange[0]:
pre = bin.slice(trange[0], self.closest_edge(tstart,
which='low'))
histo = bin.slice(self.closest_edge(tstart, which='low'),
trange[1])
elif tstart == trange[0]:
histo = bin.slice(trange[0],
self.closest_edge(tstop, which='high'))
if tstop < trange[1]:
post = bin.slice(self.closest_edge(tstop, which='high'),
trange[1])
elif (tstart > trange[0]) or (tstop < trange[1]):
pre = bin.slice(trange[0],
self.closest_edge(tstart, which='low'))
histo = bin.slice(self.closest_edge(tstart, which='low'),
self.closest_edge(tstop, which='high'))
post = bin.slice(self.closest_edge(tstop, which='high'),
trange[1])
else:
histo = bin
# perform the rebinning and create a new histo with the
# rebinned rates
if histo.size > 0:
edges = np.append(histo.lo_edges, histo.hi_edges[-1])
new_counts, new_uncert, new_exposure, new_edges = method(histo.counts,
histo.count_uncertainty,
histo.exposure,
edges,
*args)
new_histo = self.__class__(new_counts, new_edges[:-1],
new_edges[1:], new_exposure,
count_uncerts=new_uncert)
else:
new_histo = bin
# now merge the split histo back together again
histos_to_merge = [i for i in (pre, new_histo, post) if
i.size > 0]
if len(histos_to_merge) > 0:
new_histos.append(self.__class__.merge(histos_to_merge))
new_histo = self.__class__.merge(new_histos)
return new_histo
[docs] def slice(self, tstart, tstop):
"""Perform a slice over a range and return a new ExposureBins object.
Note that tstart and tstop values that fall inside a bin will result in
that bin being included.
Args:
tstart (float): The start of the slice
tstop (float): The end of the slice
Returns:
(:class:`ExposureBins`)
"""
tstart_snap = self.closest_edge(tstart, which='low')
tstop_snap = self.closest_edge(tstop, which='high')
mask = (self.lo_edges < tstop_snap) & (self.hi_edges > tstart_snap)
obj = self.__class__(self.counts[mask], self.lo_edges[mask],
self.hi_edges[mask], self.exposure[mask],
count_uncerts=self.count_uncertainty[mask])
return obj
[docs] @classmethod
def merge(cls, histos, **kwargs):
"""Merge multiple ExposureBins together. Note that the ExposureBins
to merged must be strictly non-overlapping. The end of the range for
one ExposureBins may be equal to the start of the range for another, or
there may be a gap between the end of one or beginning of another,
but there cannot be an overlap. If you need to sum the counts between
multiple ExposureBins with the same bin edges, see the
:meth:`~ExposureBins.sum` method.
Args:
histos (list of :class:`ExposureBins`):
A list containing the ExposureBins to be merged
Returns:
(:class:`ExposureBins`)
"""
num = len(histos)
# sort by start time
tstarts = np.concatenate([[histo.lo_edges[0]] for histo in histos])
idx = np.argsort(tstarts)
# concatenate the histos in order
counts = histos[idx[0]].counts
lo_edges = histos[idx[0]].lo_edges
hi_edges = histos[idx[0]].hi_edges
exposure = histos[idx[0]].exposure
count_uncerts = histos[idx[0]].count_uncertainty
for i in range(1, num):
bin_starts = histos[idx[i]].lo_edges
# make sure there is no overlap
mask = (bin_starts >= hi_edges[-1])
if (~mask).sum() > 0:
raise ValueError('Overlapping bins cannot be merged. Only' \
'non-overlapping bins can be merged.')
counts = np.concatenate((counts, histos[idx[i]].counts[mask]))
lo_edges = np.concatenate(
(lo_edges, histos[idx[i]].lo_edges[mask]))
hi_edges = np.concatenate(
(hi_edges, histos[idx[i]].hi_edges[mask]))
exposure = np.concatenate(
(exposure, histos[idx[i]].exposure[mask]))
count_uncerts = np.concatenate(
(count_uncerts, histos[idx[i]].count_uncertainty[mask]))
# new ExposureBins object
merged_bins = cls(counts, lo_edges, hi_edges, exposure,
count_uncerts=count_uncerts, **kwargs)
return merged_bins
[docs] @classmethod
def sum(cls, histos):
"""Sum multiple ExposureBins together if they have the same bin edges.
If the exposures are different between the histograms, they will be
averaged.
Note:
Count uncertainties are summed in quadrature.
Args:
histos (list of :class:`ExposureBins`):
A list containing the ExposureBins to be summed
Returns:
(:class:`ExposureBins`)
"""
counts = np.zeros(histos[0].size)
count_var = np.zeros(histos[0].size)
for histo in histos:
assert histo.size == histos[0].size, \
"The histograms must all have the same size"
assert np.all(histo.lo_edges == histos[0].lo_edges), \
"The histograms must all have the same support"
counts += histo.counts
count_var += histo.count_uncertainty ** 2
count_uncerts = np.sqrt(count_var)
# averaged exposure
exposure = np.mean([histo.exposure for histo in histos], axis=0)
sum_bins = cls(counts, histos[0].lo_edges, histos[0].hi_edges,
exposure, count_uncerts=count_uncerts)
return sum_bins
def _calculate_good_segments(self):
"""Calculates the ranges of data that are contiguous segments
Returns:
([(float, float), ...])
"""
mask = (self.lo_edges[1:] != self.hi_edges[:-1])
if mask.sum() == 0:
return [self.range]
times = np.concatenate(([self.lo_edges[0]], self.hi_edges[:-1][mask],
self.lo_edges[1:][mask], [self.hi_edges[-1]]))
times.sort()
return times.reshape(-1, 2).tolist()
def __repr__(self):
s = '<{0}: {1} bins;\n'.format(self.__class__.__name__, self.size)
s += ' range {0};\n'.format(self.range)
if self._good_segments is not None:
s += ' {0} contiguous segments>'.format(len(self._good_segments))
return s
[docs]class ChannelBins(ExposureBins):
"""A class defining a set of Energy Channel bins.
"""
@property
def chan_nums(self):
"""(np.array): The channel numbers"""
if self.size > 0:
return self.lo_edges
[docs] @classmethod
def create(cls, counts, chan_nums, exposure, continuous=True, **kwargs):
"""Create a :class:`ChannelBins` object from a list of channel numbers.
Args:
counts (np.array): The array of counts in each bin
chan_nums (np.array): The energy channel numbers
exposure (np.array): The exposure of each bin
count_uncerts (np.array, optional): An array the same length as
`counts` if the uncertainty is
not Poisson.
continuous (bool, optional): [Experimental] Whether the bins are continuous (meaning no gaps between channels).
precalc_good_segments (bool, optional): If True, calculates contiguous
bin segments on initialization.
Default is True.
Returns:
(:class:`ChannelBins`)
"""
try:
iter(exposure)
except:
counts = np.asarray(counts)
exposure = np.full(counts.shape, exposure)
chan_nums = np.asarray(chan_nums)
if continuous and len(chan_nums) > 1:
hi_edges = np.concatenate((chan_nums[1:], [chan_nums[-1] + (chan_nums[1] - chan_nums[0])]))
else:
hi_edges = chan_nums + 1
return cls(counts, chan_nums, hi_edges, exposure, **kwargs)
@property
def range(self):
"""(int, int): The channel range"""
if self.size > 0:
return (self.chan_nums[0], self.chan_nums[-1])
[docs] @classmethod
def from_rates(cls, rates, rate_uncerts, chan_nums, exposure, **kwargs):
"""Create an ChannelBins object from count rates and uncertainties.
Args:
rates (np.array): The count rates
rate_uncerts (np.array): The count rate uncertainties
chan_nums (np.array): The energy channel numbers
exposure (np.array): The exposure of each bin
precalc_good_segments (bool, optional): If True, calculates contiguous
bin segments on initialization.
Default is True.
Returns:
(:class:`ChannelBins`)
"""
counts = rates * exposure
count_uncerts = rate_uncerts * exposure
return cls.create(counts, chan_nums, exposure,
count_uncerts=count_uncerts, **kwargs)
[docs] def rebin(self, method, *args, chan_min=None, chan_max=None):
"""Rebin the ChannelBins object given a binning function and return
a new ChannelBins object
The binning function should take as input an array of counts,
array of exposures, and an array of bin edges. Additional arguments
specific to the function are allowed. The function should return an
array of the new counts, new exposure, and new edges.
Note::
Edges for energy channels are treated as [chan_num, chan_num+1]
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
chan_min (float, optional):
If set, defines the minimum channel number of the ChannelBins to
be binned, otherwise binning will begin at the first channel
number.
chan_max (float, optional):
If set, defines the maximum channel of the ChannelBins to be
binned, otherwise binning will end at the last channel number.
Returns:
(:class:`ChannelBins`)
"""
# empty bins
empty = self.__class__([], [], [], [])
# set the start and stop of the rebinning segment
crange = self.range
if chan_min is None:
chan_min = crange[0]
if chan_max is None:
chan_max = crange[1]
if chan_min < crange[0]:
chan_min = crange[0]
if chan_max > crange[1]:
chan_max = crange[1]
bins = self.contiguous_bins()
new_histos = []
for bin in bins:
crange = bin.range
# split the histogram into pieces so that we only rebin the piece
# that needs to be rebinned
pre = empty
post = empty
if (chan_max < crange[0]) or (chan_min > crange[1]):
histo = empty
elif chan_max == crange[1]:
if chan_min > crange[0]:
pre = bin.slice(crange[0], chan_min-1)
histo = bin.slice(chan_min, crange[1])
elif chan_min == crange[0]:
histo = bin.slice(crange[0], chan_max)
if chan_max < crange[1]:
post = bin.slice(chan_max+1, crange[1])
elif (chan_min > crange[0]) or (chan_max < crange[1]):
pre = bin.slice(crange[0], chan_min-1)
histo = bin.slice(chan_min, chan_max)
post = bin.slice(chan_max+1, crange[1])
else:
histo = bin
# perform the rebinning and create a new histo with the
# rebinned rates
if histo.size > 0:
edges = np.append(histo.lo_edges, histo.hi_edges[-1])
new_counts, new_uncert, new_exposure, new_edges = method(histo.counts,
histo.count_uncertainty,
histo.exposure,
edges, *args)
new_histo = self.__class__(new_counts, new_edges[:-1],
new_edges[1:], new_exposure,
count_uncerts=new_uncert)
else:
new_histo = bin
# now merge the split histo back together again
histos_to_merge = [i for i in (pre, new_histo, post) if
i.size > 0]
if len(histos_to_merge) > 0:
new_histos.append(self.__class__.merge(histos_to_merge))
new_histo = self.__class__.merge(new_histos)
return new_histo
[docs] def slice(self, chan_min, chan_max):
"""Perform a slice over the range of the bins and return a new
ChannelBins object.
Args:
lo_edge (float): The start of the slice
hi_edge (float): The end of the slice
Returns:
(:class:`ChannelBins`)
"""
mask = (self.chan_nums <= chan_max) & (self.chan_nums >= chan_min)
obj = self.__class__.create(self.counts[mask], self.chan_nums[mask],
self.exposure[mask],
count_uncerts=self.count_uncertainty[mask])
return obj
[docs]class TimeBins(ExposureBins):
"""A class defining a set of Time History (lightcurve) bins.
Parameters:
counts (np.array): The array of counts in each bin
lo_edges (np.array): The low-value edges of the bins
hi_edges (np.array): The high-value edges of the bins
exposure (np.array): The exposure of each bin
count_uncerts (np.array, optional): An array the same length as `counts`
if the uncertainty is not Poisson.
precalc_good_segments (bool, optional): If True, calculates contiguous
bin segments on initialization.
Default is True.
"""
def __init__(self, counts, lo_edges, hi_edges, exposure, **kwargs):
super().__init__(counts, lo_edges, hi_edges, exposure, **kwargs)
[docs]class EnergyBins(ExposureBins):
"""A class defining a set of Energy (count spectra) bins.
Parameters:
counts (np.array): The array of counts in each bin
lo_edges (np.array): The low-value edges of the bins
hi_edges (np.array): The high-value edges of the bins
exposure (np.array): The exposure of each bin
count_uncerts (np.array, optional): An array the same length as `counts`
if the uncertainty is not Poisson.
precalc_good_segments (bool, optional): If True, calculates contiguous
bin segments on initialization.
Default is True.
"""
def __init__(self, counts, lo_edges, hi_edges, exposure, **kwargs):
try:
iter(exposure)
except:
counts = np.asarray(counts)
exposure = np.full(counts.shape, exposure)
super().__init__(counts, lo_edges, hi_edges, exposure, **kwargs)
@property
def centroids(self):
"""(np.array): The centroids of the bins"""
return np.sqrt(self.hi_edges * self.lo_edges)
@property
def rates_per_kev(self):
"""(np.array): Differential count rate"""
return self.rates / self.widths
@property
def rate_uncertainty_per_kev(self):
"""(np.array): Differential rate uncertainty"""
return self.rate_uncertainty / self.widths
[docs] def rebin(self, method, *args, emin=None, emax=None):
"""Rebin the EnergyBins object in given a binning function and return a
a new EnergyBins object
The binning function should take as input an array of counts,
array of exposures, and an array of bin edges. Additional arguments
specific to the function are allowed. The function should return an
array of the new counts, new exposure, and new edges.
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
emin (float, optional):
If set, defines the starting energy of the EnergyBins to be
binned, otherwise binning will begin at the first bin edge.
emax (float, optional):
If set, defines the ending energy of the EnergyBins to be binned,
otherwise binning will end at the last bin edge.
Returns:
(:class:`EnergyBins`)
"""
histo = super().rebin(method, *args, tstart=emin, tstop=emax)
histo._exposure = self.exposure[:histo.size]
return histo
[docs] @classmethod
def sum(cls, histos):
"""Sum multiple EnergyBins together if they have the same energy range
(support). Example use would be summing two count spectra.
Note:
Count uncertainties are summed in quadrature.
Args:
histos (list of :class:`EnergyBins`):
A list containing the EnergyBins to be summed
Returns:
(:class:`EnergyBins`)
"""
counts = np.zeros(histos[0].size)
uncerts = np.zeros(histos[0].size)
exposure = 0.0
for histo in histos:
assert histo.size == histos[0].size, \
"The histograms must all have the same size"
assert np.all(histo.lo_edges == histos[0].lo_edges), \
"The histograms must all have the same support"
counts += histo.counts
uncerts += histo.count_uncertainty ** 2
exposure += histo.exposure
uncerts = np.sqrt(uncerts)
sum_bins = cls(counts, histos[0].lo_edges, histos[0].hi_edges,
exposure, count_uncerts=uncerts)
return sum_bins
[docs]class TimeChannelBins():
"""A class defining a set of 2D Time/Energy-Channel bins.
Parameters:
counts (np.array): The array of counts in each bin
tstart (np.array): The low-value edges of the time bins
tstop (np.array): The high-value edges of the time bins
exposure (np.array): The exposure of each bin
chan_nums (np.array): The channel numbers in ascending order
count_uncerts (np.array, optional): An array the same length as `counts`
if the uncertainty is not Poisson.
quality (np.array, optional): The spectrum quality flag
precalc_good_segments (bool, optional): If True, calculates the
good time and channel segments
on initialization.
"""
def __init__(self, counts, tstart, tstop, exposure, chan_nums,
quality=None, count_uncerts=None, precalc_good_segments=True):
try:
iter(counts)
self._counts = np.asarray(counts)
except:
raise TypeError('counts must be an iterable')
if self._counts.ndim == 1:
self._counts = self._counts.reshape(-1, 1)
elif self._counts.ndim > 2:
raise TypeError('counts must be a 2-dimensional array')
try:
iter(tstart)
self._tstart = np.asarray(tstart).flatten()
except:
raise TypeError('tstart must be an iterable')
try:
iter(tstop)
self._tstop = np.asarray(tstop).flatten()
except:
raise TypeError('tstop must be an iterable')
try:
iter(exposure)
self._exposure = np.asarray(exposure).flatten()
except:
raise TypeError('exposure must be an iterable')
try:
iter(chan_nums)
self._chan_nums = np.asarray(chan_nums, dtype=int).flatten()
except:
raise TypeError('chan_nums must be an iterable')
if (self._tstart.size != self._tstop.size) or \
(self._exposure.size != self._tstart.size):
raise ValueError('tstart, tstop, and exposure must all be ' \
'the same length')
if (self._counts.shape[0] != self._tstart.size) or \
(self._counts.shape[1] != self._chan_nums.size):
raise ValueError('counts axis 0 must have same length as tstart, '\
'tstop, and exposure. counts axis 1 must have '\
'same length as chan_nums.')
if quality is not None:
try:
iter(quality)
self._quality = np.asarray(quality).flatten()
except:
raise TypeError('quality must be an iterable')
if self._quality.size != self._tstart.size:
raise ValueError('quality must be same length as tstart')
else:
self._quality = np.zeros_like(self._tstart)
if count_uncerts is None:
count_uncerts = np.sqrt(self._counts)
try:
iter(count_uncerts)
self._count_uncerts = np.asarray(count_uncerts)
except:
raise TypeError('count_uncerts must be an iterable')
if self._count_uncerts.shape != self._counts.shape:
raise ValueError('count_uncerts must be same shape as counts')
self._good_time_segments = None
self._good_channel_segments = None
if (self.num_times > 0) and precalc_good_segments:
self._good_time_segments = self._calculate_good_segments(
self.tstart,
self.tstop)
if (self.num_chans > 0) and precalc_good_segments:
chans0 = self.chan_nums
chans1 = self.chan_nums + 1
self._good_channel_segments = self._calculate_good_segments(chans0,
chans1)
@property
def chan_nums(self):
"""(np.array): The channel numbers"""
return self._chan_nums
@property
def channel_range(self):
"""(int, int): The channel number range"""
if self.num_chans > 0:
return (self.chan_nums[0], self.chan_nums[-1])
@property
def counts(self):
"""(np.array): The array of counts in each bin"""
return self._counts
@property
def count_uncertainty(self):
""" (np.array): The counts uncertainty in each bin"""
return self._count_uncerts
@property
def exposure(self):
"""(np.array): The exposure of each bin"""
return self._exposure
@property
def num_chans(self):
"""(int): The number of energy channels along the energy axis"""
return self._chan_nums.size
@property
def num_times(self):
"""(int): The number of bins along the time axis"""
return self._exposure.size
@property
def quality(self):
"""(np.array): The spectrum quality flag"""
return self._quality
@property
def rates(self):
"""(np.array): The rates in each Time-Channel Bin"""
return self.counts / (self.exposure[:, np.newaxis])
@property
def rate_uncertainty(self):
"""(np.array): The rate uncertainty in each bin"""
return self.count_uncertainty / (self.exposure[:, np.newaxis])
@property
def size(self):
"""(int, int): The number of bins along both axes (num_times, num_chans)"""
return self.counts.shape
@property
def time_centroids(self):
"""(np.array): The bin centroids along the time axis"""
return (self.tstop + self.tstart) / 2.0
@property
def time_range(self):
"""(float, float): The range of the data along the time axis"""
if self.num_times > 0:
return (self.tstart[0], self.tstop[-1])
@property
def time_widths(self):
"""(np.array): The bin widths along the time axis"""
return (self.tstop - self.tstart)
@property
def tstart(self):
"""(np.array): The low-value edges of the time bins"""
return self._tstart
@property
def tstop(self):
"""(np.array): The high-value edges of the time bins"""
return self._tstop
[docs] def apply_ebounds(self, ebounds):
"""Apply an energy bounds calibration and return a TimeEnergyBins
object.
Args:
ebounds (:class:`Ebounds`): The energy bounds. Must match the number
of channels as this object.
Returns:
(:class:`TimeEnergyBins`)
"""
if not isinstance(ebounds, Ebounds):
raise TypeError('ebounds must be an Ebounds object')
if ebounds.num_intervals != self.num_chans:
raise ValueError('ebounds must have the same number of channels ' \
'as this object.')
return TimeEnergyBins(self.counts, self.tstart, self.tstop,
self.exposure, ebounds.low_edges(),
ebounds.high_edges(),
count_uncerts=self.count_uncertainty,
quality=self._quality)
[docs] def closest_time_edge(self, val, which='either'):
"""Return the closest time bin edge
Args:
val (float): Input value
which (str, optional): Options are:
* 'either' - closest edge to val;
* 'low' - closest edge lower than val;
* 'high' - closest edge higher than val. Default is 'either'
Returns:
(float)
"""
edges = np.concatenate((self.tstart, [self.tstop[-1]]))
return self._closest_edge(edges, val, which=which)
[docs] def contiguous_channel_bins(self):
"""Return a list of TimeChannelBins, each one containing a contiguous
channel segment of data. This is done by comparing adjacent channel
numbers, and if there is a gap between channels, the data is split into
separate TimeChannelBins objects, each containing a
channel-contiguous set of data.
Returns:
(list of :class:`TimeChannelBins`)
"""
if self._good_channel_segments is None:
chans0 = self.chan_nums
chans1 = self.chan_nums + 1
good_segments = self._calculate_good_segments(chans0, chans1)
else:
good_segments = self._good_channel_segments
bins = [self.slice_channels(seg[0], seg[1]) for seg in good_segments]
return bins
[docs] def contiguous_time_bins(self):
"""Return a list of TimeChannelBins, each one containing a contiguous
time segment of data. This is done by comparing the edges of each time
bin, and if thereis a gap between edges, the data is split into
separate TimeChannelBins objects, each containing a time-contiguous set
of data.
Returns:
(list of :class:`TimeChannelBins`)
"""
if self._good_time_segments is None:
good_segments = self._calculate_good_segments(self.tstart,
self.tstop)
else:
good_segments = self._good_time_segments
bins = [self.slice_time(seg[0], seg[1]) for seg in good_segments]
return bins
[docs] def get_exposure(self, time_ranges=None, scale=False):
"""Calculate the total exposure of a time range or time ranges of data
Args:
time_ranges ([(float, float), ...], optional):
The time range or time ranges over which to calculate the
exposure. If omitted, calculates the total exposure of the data
scale (bool, optional):
If True and the time ranges don't match up with the data binning,
will scale the exposure based on the requested time range.
Default is False.
Returns:
(float)
"""
if time_ranges is None:
time_ranges = [self.time_range]
try:
iter(time_ranges[0])
except:
time_ranges = [time_ranges]
exposure = 0.0
for i in range(len(time_ranges)):
mask = self._slice_time_mask(*self._assert_range(time_ranges[i]))
dt = (time_ranges[i][1] - time_ranges[i][0])
data_exp = np.sum(self.exposure[mask])
dts = np.sum(self.tstop[mask] - self.tstart[mask])
if dts > 0:
if scale:
exposure += data_exp * (dt / dts)
else:
exposure += data_exp
return exposure
[docs] def integrate_channels(self, chan_min=None, chan_max=None):
"""Integrate the histogram over the channel axis (producing a
lightcurve). Limits on the integration smaller than the full range can
be set.
Args:
chan_min (int, optional):
The low end of the integration range. If not set, uses the
lowest energy channel of the histogram
chan_max (int, optional):
The high end of the integration range. If not set, uses the
highest energy channel of the histogram
Returns:
(:class:`TimeBins`)
"""
if chan_min is None:
chan_min = self.channel_range[0]
if chan_max is None:
chan_max = self.channel_range[1]
mask = (self.chan_nums <= chan_max) & (self.chan_nums >= chan_min)
counts = self.counts[:, mask].sum(axis=1)
uncert = np.sqrt( (self.count_uncertainty[:, mask] ** 2).sum(axis=1) )
obj = TimeBins(counts, self.tstart, self.tstop, self.exposure,
count_uncerts=uncert)
return obj
[docs] def integrate_time(self, tstart=None, tstop=None):
"""Integrate the histogram over the time axis (producing an energy
channel spectrum). Limits on the integration smaller than the full range
can be set.
Args:
tstart (float, optional):
The low end of the integration range. If not set, uses the
lowest time edge of the histogram
tstop (float, optional):
The high end of the integration range. If not set, uses the
highest time edge of the histogram
Returns:
(:class:`ChannelBins`)
"""
if tstart is None:
tstart = self.time_range[0]
if tstop is None:
tstop = self.time_range[1]
mask = self._slice_time_mask(tstart, tstop)
counts = self.counts[mask, :].sum(axis=0)
uncert = np.sqrt( (self.count_uncertainty[mask, :] ** 2).sum(axis=0) )
exposure = np.full(counts.size, self.exposure[mask].sum())
obj = ChannelBins.create(counts, self.chan_nums, exposure,
count_uncerts=uncert)
return obj
[docs] def rebin_channels(self, method, *args, chan_min=None, chan_max=None):
"""Rebin the TimeChannelBins object along the energy axis given a
binning function and return a new TimeChannelBins object.
The binning function should take as input an array of counts,
array of exposures, and an array of bin edges. Additional arguments
specific to the function are allowed. The function should return an
array of the new counts, new exposure, and new edges.
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
chan_min (int, optional):
If set, defines the starting channel of the TimeChannelBins
to be binned, otherwise binning will begin at the the first
channel.
chan_max (int, optional):
If set, defines the ending channel of the TimeChannelBins to
be binned, otherwise binning will end at the last channel.
Returns:
(:class:`TimeChannelBins`)
"""
# empty bins
empty = self.__class__(np.array([[]]).reshape(0,0), [], [], [], [])
# set the start and stop of the rebinning segment
chan_range = self.channel_range
if chan_min is None:
chan_min = chan_range[0]
if chan_max is None:
chan_max = chan_range[1]
if chan_min < chan_range[0]:
chan_min = chan_range[0]
if chan_max > chan_range[1]:
chan_max = chan_range[1]
bins = self.contiguous_channel_bins()
new_histos = []
for bin in bins:
# split the histogram into pieces so that we only rebin the piece
# that needs to be rebinned
pre = empty
post = empty
crange = bin.channel_range
if (chan_max < crange[0]) or (chan_min > crange[1]):
histo = empty
elif chan_max == crange[1]:
if chan_min > crange[0]:
pre = bin.slice_channels(crange[0], chan_min-1)
histo = bin.slice_channels(chan_min, crange[1])
elif chan_min == crange[0]:
histo = bin.slice_channels(crange[0], chan_max)
if chan_max < crange[1]:
post = bin.slice_channels(chan_max+1, crange[1])
elif (chan_min > crange[0]) and (chan_max < crange[1]):
pre = bin.slice_channels(crange[0], chan_min-1)
histo = bin.slice_channels(chan_min, chan_max)
post = bin.slice_channels(chan_max+1, crange[1])
else:
histo = bin
# perform the rebinning and create a new histo with the
# rebinned rates
if histo.num_chans > 0:
edges = np.append(histo.chan_nums, histo.chan_nums[-1]+1)
num_times, num_chans = histo.size
new_counts = []
new_uncert = []
for i in range(num_times):
exposure = np.full(num_chans, histo.exposure[i])
new_cts, uncert, _, new_edges = method(histo.counts[i, :],
histo.count_uncertainty[i,:],
exposure, edges, *args)
new_counts.append(new_cts)
new_uncert.append(uncert)
new_histo = TimeChannelBins(new_counts, bin.tstart, bin.tstop,
bin.exposure, new_edges[:-1],
count_uncerts=new_uncert)
else:
new_histo = bin
# now merge the split histo back together again
histos_to_merge = [i for i in (pre, new_histo, post) if
i.num_chans > 0]
if len(histos_to_merge) > 0:
new_histos.append(TimeChannelBins.merge_channels(histos_to_merge))
new_histo = TimeChannelBins.merge_channels(new_histos)
return new_histo
[docs] def rebin_time(self, method, *args, tstart=None, tstop=None):
"""Rebin the TimeChannelBins object along the time axis given a binning
function and return a new TimeChannelBins object.
The binning function should take as input an array of counts,
array of exposures, and an array of bin edges. Additional arguments
specific to the function are allowed. The function should return an
array of the new counts, new exposure, and new edges.
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
tstart (float, optional):
If set, defines the start time of the TimeChannelBins to be
binned, otherwise binning will begin at the time of the first
bin edge.
tstop (float, optional):
If set, defines the end time of the TimeChannelBins to be
binned, otherwise binning will end at the time of the last
bin edge.
Returns:
(:class:`TimeChannelBins`)
"""
# empty bins
empty = self.__class__(np.array([[]]).reshape(0,0), [], [], [], [], [])
# set the start and stop of the rebinning segment
trange = self.time_range
if tstart is None:
tstart = trange[0]
if tstop is None:
tstop = trange[1]
if tstart < trange[0]:
tstart = trange[0]
if tstop > trange[1]:
tstop = trange[1]
bins = self.contiguous_time_bins()
new_histos = []
for bin in bins:
trange = bin.time_range
# split the histogram into pieces so that we only rebin the piece
# that needs to be rebinned
pre = empty
post = empty
if (tstop < trange[0]) or (tstart > trange[1]):
histo = empty
elif tstop == trange[1]:
if tstart > trange[0]:
pre = bin.slice_time(trange[0],
self.closest_time_edge(tstart,
which='low'))
histo = bin.slice_time(self.closest_time_edge(tstart,
which='low'),
trange[1])
elif tstart == trange[0]:
histo = bin.slice_time(trange[0],
self.closest_time_edge(tstop,
which='high'))
if tstop < trange[1]:
post = bin.slice_time(self.closest_time_edge(tstop,
which='high'),
trange[1])
elif (tstart > trange[0]) and (tstop < trange[1]):
pre = bin.slice_time(trange[0],
self.closest_time_edge(tstart, which='low'))
histo = bin.slice_time(
self.closest_time_edge(tstart, which='low'),
self.closest_time_edge(tstop, which='high'))
post = bin.slice_time(self.closest_time_edge(tstop, which='high'),
trange[1])
else:
histo = bin
# perform the rebinning and create a new histo with the
# rebinned rates
if histo.num_times > 0:
edges = np.append(histo.tstart, histo.tstop[-1])
new_counts = []
new_uncert = []
for i in range(bin.num_chans):
new_cts, uncert, new_exposure, new_edges = method(
histo.counts[:, i],
histo.count_uncertainty[:, i],
histo.exposure,
edges, *args)
new_counts.append(new_cts)
new_uncert.append(uncert)
new_counts = np.array(new_counts).T
new_uncert = np.array(new_uncert).T
new_histo = TimeChannelBins(new_counts, new_edges[:-1],
new_edges[1:], new_exposure,
bin.chan_nums,
count_uncerts=new_uncert)
else:
new_histo = bin
# now merge the split histo back together again
histos_to_merge = [i for i in (pre, new_histo, post) if
i.num_times > 0]
if len(histos_to_merge) > 0:
new_histos.append(TimeChannelBins.merge_time(histos_to_merge))
new_histo = TimeChannelBins.merge_time(new_histos)
return new_histo
[docs] def slice_channels(self, chan_min, chan_max):
"""Perform a slice over an energy range and return a new TimeChannelBins
object. Note that chan_min and chan_max values that fall inside a bin will
result in that bin being included.
Args:
emin (float): The start of the slice
emax (float): The end of the slice
Returns:
(:class:`TimeChannelBins`)
"""
mask = (self.chan_nums <= chan_max) & (self.chan_nums >= chan_min)
obj = TimeChannelBins(self.counts[:, mask], self.tstart, self.tstop,
self.exposure, self.chan_nums[mask],
count_uncerts=self.count_uncertainty[:, mask],
quality=self.quality)
return obj
[docs] def slice_time(self, tstart, tstop):
"""Perform a slice over a time range and return a new TimeChannelBins
object. Note that tstart and tstop values that fall inside a bin will
result in that bin being included.
Args:
tstart (float): The start of the slice
tstop (float): The end of the slice
Returns:
(:class:`TimeChannelBins`)
"""
mask = self._slice_time_mask(tstart, tstop)
cls = type(self)
obj = cls(self.counts[mask, :], self.tstart[mask], self.tstop[mask],
self.exposure[mask], self.chan_nums,
count_uncerts=self.count_uncertainty[mask, :],
quality=self.quality[mask])
return obj
[docs] @classmethod
def from_rates(cls, rates, rate_uncerts, tstart, tstop, exposure, chan_nums,
**kwargs):
"""Create an TimeChannelBins object from count rates and uncertainties.
Args:
rates (np.array): The count rates
rate_uncerts (np.array): The count rate uncertainties
tstart (np.array): The low-value edges of the time bins
tstop (np.array): The high-value edges of the time bins
exposure (np.array): The exposure of each bin
chan_nums (np.array): The channel numbers in ascending order
quality (np.array, optional): The spectrum quality flag
precalc_good_segments (bool, optional): If True, calculates the
good time and energy
segments on initialization.
Returns:
(:class:`TimeChannelBins`)
"""
counts = rates * exposure[:, np.newaxis]
count_uncerts = rate_uncerts * exposure[:, np.newaxis]
return cls(counts, tstart, tstop, exposure, chan_nums,
count_uncerts=count_uncerts, **kwargs)
[docs] @classmethod
def merge_channels(cls, histos, **kwargs):
"""Merge multiple TimeChannelBins together along the channel axis.
Args:
histos (list of :class:`TimeChannelBins`):
A list containing the TimeChannelBins to be merged
Returns:
(:class:`TimeChannelBins`)
"""
num = len(histos)
# sort by channel edge
chan_mins = np.concatenate([[histo.chan_nums[0]] for histo in histos])
idx = np.argsort(chan_mins)
# concatenate the histos in order
counts = histos[idx[0]].counts
uncert = histos[idx[0]].count_uncertainty
tstart = histos[idx[0]].tstart
tstop = histos[idx[0]].tstop
exposure = histos[idx[0]].exposure
quality = histos[idx[0]].quality
chan_nums = histos[idx[0]].chan_nums
for i in range(1, num):
chan_starts = histos[idx[i]].chan_nums
# make sure there is no overlap
mask = (chan_starts >= chan_nums[-1])
if (~mask).sum() > 0:
raise ValueError('Overlapping bins cannot be merged. Only' \
'non-overlapping bins can be merged.')
counts = np.hstack((counts, histos[idx[i]].counts[:, mask]))
uncert = np.hstack((uncert, histos[idx[i]].count_uncertainty[:, mask]))
chan_nums = np.concatenate((chan_nums, histos[idx[i]].chan_nums[mask]))
# new TimeChannelBins object
merged_bins = cls(counts, tstart, tstop, exposure, chan_nums,
count_uncerts=uncert, quality=quality, **kwargs)
return merged_bins
[docs] @classmethod
def merge_time(cls, histos, **kwargs):
"""Merge multiple TimeChannelBins together along the time axis.
Args:
histos (list of :class:`TimeChannelBins`):
A list containing the TimeChannelBins to be merged
Returns:
(:class:`TimeChannelBins`)
"""
num = len(histos)
# sort by start time
tstarts = np.concatenate([[histo.tstart[0]] for histo in histos])
idx = np.argsort(tstarts)
# concatenate the histos in order
counts = histos[idx[0]].counts
uncert = histos[idx[0]].count_uncertainty
tstart = histos[idx[0]].tstart
tstop = histos[idx[0]].tstop
exposure = histos[idx[0]].exposure
chan_nums = histos[idx[0]].chan_nums
quality = histos[idx[0]].quality
for i in range(1, num):
bin_starts = histos[idx[i]].tstart
# make sure there is no overlap
mask = (bin_starts >= tstop[-1])
if (~mask).sum() > 0:
raise ValueError('Overlapping bins cannot be merged. Only' \
'non-overlapping bins can be merged.')
counts = np.vstack((counts, histos[idx[i]].counts[mask, :]))
uncert = np.vstack((uncert, histos[idx[i]].count_uncertainty[mask, :]))
tstart = np.concatenate((tstart, histos[idx[i]].tstart[mask]))
tstop = np.concatenate((tstop, histos[idx[i]].tstop[mask]))
exposure = np.concatenate(
(exposure, histos[idx[i]].exposure[mask]))
quality = np.concatenate((quality, histos[idx[i]].quality[mask]))
# new TimeChannelBins object
merged_bins = cls(counts, tstart, tstop, exposure, chan_nums,
count_uncerts=uncert, quality=quality, **kwargs)
return merged_bins
def _assert_range(self, valrange):
assert valrange[0] <= valrange[1], \
'Range must be in increasing order: (lo, hi)'
return valrange
def _calculate_good_segments(self, lo_edges, hi_edges):
"""Calculates the ranges of data that are contiguous segments
Args:
lo_edges (np.array): The lower bin edges
hi_edges (np.array): The upper bin edges
Returns:
([(float, float), ...])
"""
mask = (lo_edges[1:] != hi_edges[:-1])
if mask.sum() == 0:
return [(lo_edges[0], hi_edges[-1])]
edges = np.concatenate(([lo_edges[0]], hi_edges[:-1][mask],
lo_edges[1:][mask], [hi_edges[-1]]))
edges.sort()
return edges.reshape(-1, 2).tolist()
def _closest_edge(self, edges, val, which='either'):
"""Return the closest time bin edge
Args:
val (float): Input value
which (str, optional): Options are:
* 'either' - closest edge to val;
* 'low' - closest edge lower than val;
* 'high' - closest edge higher than val. Default is 'either'
Returns:
(float)
"""
idx = np.argmin(np.abs(val - edges))
if which == 'low':
if (edges[idx] > val) and (idx - 1) >= 0:
idx -= 1
elif (which == 'high') and (idx + 1) < edges.size:
if edges[idx] < val:
idx += 1
else:
pass
return edges[idx]
def _slice_time_mask(self, tstart, tstop):
tstart_snap = self.closest_time_edge(tstart, which='low')
tstop_snap = self.closest_time_edge(tstop, which='high')
mask = (self.tstart < tstop_snap) & (self.tstop > tstart_snap)
return mask
def __repr__(self):
s = '<{0}: {1} time bins;\n'.format(self.__class__.__name__,
self.num_times)
s += ' time range {0};\n'.format(self.time_range)
if self._good_time_segments is not None:
s += ' {0} time segments;\n'.format(len(self._good_time_segments))
s += ' {0} channels;\n'.format(self.num_chans)
s += ' channel range {0}'.format(self.channel_range)
if self._good_channel_segments is not None:
s += ';\n {0} channel segments'.format(len(self._good_channel_segments))
return s+'>'
[docs]class TimeEnergyBins():
"""A class defining a set of 2D Time-Energy bins.
Parameters:
counts (np.array): The array of counts in each bin
tstart (np.array): The low-value edges of the time bins
tstop (np.array): The high-value edges of the time bins
exposure (np.array): The exposure of each bin
emin (np.array): The low-value edges of the energy bins
emax (np.array): The high-value edges of the energy bins
count_uncerts (np.array, optional): An array the same length as `counts`
if the uncertainty is not Poisson.
quality (np.array, optional): The spectrum quality flag
precalc_good_segments (bool, optional): If True, calculates the
good time and energy segments on
initialization.
"""
def __init__(self, counts, tstart, tstop, exposure, emin, emax,
count_uncerts=None, quality=None, precalc_good_segments=True):
try:
iter(counts)
self._counts = np.asarray(counts)
except:
raise TypeError('counts must be an iterable')
if self._counts.ndim == 1:
self._counts = self._counts.reshape(-1, 1)
elif self._counts.ndim > 2:
raise TypeError('counts must be a 2-dimensional array')
try:
iter(tstart)
self._tstart = np.asarray(tstart).flatten()
except:
raise TypeError('tstart must be an iterable')
try:
iter(tstop)
self._tstop = np.asarray(tstop).flatten()
except:
raise TypeError('tstop must be an iterable')
try:
iter(exposure)
self._exposure = np.asarray(exposure).flatten()
except:
raise TypeError('exposure must be an iterable')
try:
iter(emin)
self._emin = np.asarray(emin).flatten()
except:
raise TypeError('emin must be an iterable')
try:
iter(emax)
self._emax = np.asarray(emax).flatten()
except:
raise TypeError('emax must be an iterable')
if (self._tstart.size != self._tstop.size) or \
(self._exposure.size != self._tstart.size):
raise ValueError('tstart, tstop, and exposure must all be ' \
'the same length')
if (self._emin.size != self._emax.size):
raise ValueError('emin and emax must be the same length')
if (self._counts.shape[0] != self._tstart.size) or \
(self._counts.shape[1] != self._emin.size):
raise ValueError('counts axis 0 must have same length as tstart, '\
'tstop, and exposure. counts axis 1 must have '\
'same length as emin and emax.')
if quality is not None:
try:
iter(quality)
self._quality = np.asarray(quality).flatten()
except:
raise TypeError('quality must be an iterable')
if self._quality.size != self._tstart.size:
raise ValueError('quality must be same length as tstart')
else:
self._quality = np.zeros_like(self._tstart)
if count_uncerts is None:
count_uncerts = np.sqrt(self._counts)
try:
iter(count_uncerts)
self._count_uncerts = np.asarray(count_uncerts)
except:
raise TypeError('count_uncerts must be an iterable')
if self._count_uncerts.shape != self._counts.shape:
raise ValueError('count_uncerts must be same shape as counts')
self._good_time_segments = None
self._good_energy_segments = None
if (self.num_times > 0) and precalc_good_segments:
self._good_time_segments = self._calculate_good_segments(
self.tstart,
self.tstop)
if (self.num_chans > 0) and precalc_good_segments:
self._good_energy_segments = self._calculate_good_segments(
self.emin,
self.emax)
@property
def chan_widths(self):
"""(np.array): The bin widths along the energy axis"""
return (self.emax - self.emin)
@property
def counts(self):
"""(np.array): The array of counts in each bin"""
return self._counts
@property
def count_uncertainty(self):
""" (np.array): The counts uncertainty in each bin"""
return self._count_uncerts
@property
def emax(self):
"""(np.array): The high-value edges of the energy bins"""
return self._emax
@property
def emin(self):
"""(np.array): The low-value edges of the energy bins"""
return self._emin
@property
def energy_centroids(self):
"""(np.array): The bin centroids along the energy axis"""
return np.sqrt(self.emin * self.emax)
@property
def energy_range(self):
"""(float, float): The range of the data along the energy axis"""
if self.num_chans > 0:
return (self.emin[0], self.emax[-1])
@property
def exposure(self):
"""(np.array): The exposure of each bin"""
return self._exposure
@property
def num_chans(self):
"""(int): The number of energy channels along the energy axis"""
return self._emin.size
@property
def num_times(self):
"""(int): The number of bins along the time axis"""
return self._exposure.size
@property
def quality(self):
"""(np.array): The spectrum quality flag"""
return self._quality
@property
def rates(self):
"""(np.array): The rates in each Time-Energy Bin"""
return self.counts / (self.exposure[:, np.newaxis])
@property
def rates_per_kev(self):
"""(np.array): The differential rates in units of counts/s/keV"""
return self.rates / self.chan_widths[np.newaxis,:]
@property
def rate_uncertainty(self):
"""(np.array): The rate uncertainty in each bin"""
return self.count_uncertainty / (self.exposure[:, np.newaxis])
@property
def rate_uncertainty_per_kev(self):
"""The differential rate uncertainty in units of counts/s/keV"""
return self.rate_uncertainty / self.chan_widths[np.newaxis, :]
@property
def size(self):
"""(int, int): The number of bins along both axes (numtimes, num_chans)"""
return self.counts.shape
@property
def time_centroids(self):
"""(np.array): The bin centroids along the time axis"""
return (self.tstop + self.tstart) / 2.0
@property
def time_range(self):
"""(float, float): The range of the data along the time axis"""
if self.num_times > 0:
return (self.tstart[0], self.tstop[-1])
@property
def time_widths(self):
"""(np.array): The bin widths along the time axis"""
return (self.tstop - self.tstart)
@property
def tstart(self):
"""(np.array): The low-value edges of the time bins"""
return self._tstart
@property
def tstop(self):
"""(np.array): The high-value edges of the time bins"""
return self._tstop
[docs] def closest_energy_edge(self, val, which='either'):
"""Return the closest energy bin edge
Args:
val (float): Input value
which (str, optional): Options are:
* 'either' - closest edge to val;
* 'low' - closest edge lower than val;
* 'high' - closest edge higher than val. Default is 'either'
Returns:
(float)
"""
edges = np.concatenate((self.emin, [self.emax[-1]]))
return self._closest_edge(edges, val, which=which)
[docs] def closest_time_edge(self, val, which='either'):
"""Return the closest time bin edge
Args:
val (float): Input value
which (str, optional): Options are:
* 'either' - closest edge to val;
* 'low' - closest edge lower than val;
* 'high' - closest edge higher than val. Default is 'either'
Returns:
(float)
"""
edges = np.concatenate((self.tstart, [self.tstop[-1]]))
return self._closest_edge(edges, val, which=which)
[docs] def contiguous_energy_bins(self):
"""Return a list of TimeEnergyBins, each one containing a contiguous
energy segment of data. This is done by comparing the edges of each
energy bin, and if thereis a gap between edges, the data is split into
separate TimeEnergyBin objects, each containing an energy-contiguous set
of data.
Returns:
(list of :class:`TimeEnergyBins`)
"""
if self._good_energy_segments is None:
good_segments = self._calculate_good_segments(self.emin, self.emax)
else:
good_segments = self._good_energy_segments
bins = [self.slice_energy(seg[0], seg[1]) for seg in
self._good_energy_segments]
return bins
[docs] def contiguous_time_bins(self):
"""Return a list of TimeEnergyBins, each one containing a contiguous
time segment of data. This is done by comparing the edges of each time
bin, and if thereis a gap between edges, the data is split into
separate TimeEnergyBin objects, each containing a time-contiguous set
of data.
Returns:
(list of :class:`TimeEnergyBins`)
"""
if self._good_time_segments is None:
good_segments = self._calculate_good_segments(self.tstart,
self.tstop)
else:
good_segments = self._good_time_segments
bins = [self.slice_time(seg[0], seg[1]) for seg in good_segments]
return bins
[docs] def get_exposure(self, time_ranges=None, scale=False):
"""Calculate the total exposure of a time range or time ranges of data
Args:
time_ranges ([(float, float), ...], optional):
The time range or time ranges over which to calculate the
exposure. If omitted, calculates the total exposure of the data
scale (bool, optional):
If True and the time ranges don't match up with the data binning,
will scale the exposure based on the requested time range.
Default is False.
Returns:
(float)
"""
if time_ranges is None:
time_ranges = [self.time_range]
try:
iter(time_ranges[0])
except:
time_ranges = [time_ranges]
exposure = 0.0
for i in range(len(time_ranges)):
mask = self._slice_time_mask(*self._assert_range(time_ranges[i]))
dt = (time_ranges[i][1] - time_ranges[i][0])
data_exp = np.sum(self.exposure[mask])
dts = np.sum(self.tstop[mask] - self.tstart[mask])
if dts > 0:
if scale:
exposure += data_exp * (dt / dts)
else:
exposure += data_exp
return exposure
[docs] def integrate_energy(self, emin=None, emax=None):
"""Integrate the histogram over the energy axis (producing a lightcurve).
Limits on the integration smaller than the full range can be set.
Args:
emin (float, optional):
The low end of the integration range. If not set, uses the
lowest energy edge of the histogram
emax (float, optional):
The high end of the integration range. If not set, uses the
highest energy edge of the histogram
Returns:
(:class:`TimeBins`)
"""
if emin is None:
emin = self.energy_range[0]
if emax is None:
emax = self.energy_range[1]
mask = self._slice_energy_mask(emin, emax)
counts = self.counts[:, mask].sum(axis=1)
uncert = np.sqrt( (self.count_uncertainty[:, mask] ** 2).sum(axis=1) )
obj = TimeBins(counts, self.tstart, self.tstop, self.exposure,
count_uncerts=uncert)
return obj
[docs] def integrate_time(self, tstart=None, tstop=None):
"""Integrate the histogram over the time axis (producing a count rate
spectrum). Limits on the integration smaller than the full range can
be set.
Args:
tstart (float, optional):
The low end of the integration range. If not set, uses the
lowest time edge of the histogram
tstop (float, optional):
The high end of the integration range. If not set, uses the
highest time edge of the histogram
Returns:
(:class:`EnergyBins`)
"""
if tstart is None:
tstart = self.time_range[0]
if tstop is None:
tstop = self.time_range[1]
mask = self._slice_time_mask(tstart, tstop)
counts = self.counts[mask, :].sum(axis=0)
uncert = np.sqrt( (self.count_uncertainty[mask, :] ** 2).sum(axis=0) )
exposure = self.exposure[mask].sum()
exposure = np.full(counts.size, exposure)
obj = EnergyBins(counts, self.emin, self.emax, exposure,
count_uncerts=uncert)
return obj
[docs] def rebin_energy(self, method, *args, emin=None, emax=None):
"""Rebin the TimeEnergyBins object along the energy axis given a binning
function and return a new TimeEnergyBins object.
The binning function should take as input an array of counts,
array of exposures, and an array of bin edges. Additional arguments
specific to the function are allowed. The function should return an
array of the new counts, new exposure, and new edges.
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
emin (float, optional):
If set, defines the starting edge of the TimeEnergyBins to be
binned, otherwise binning will begin at the the first bin edge.
emax (float, optional):
If set, defines the ending edge of the TimeEnergyBins to be
binned, otherwise binning will end at the last bin edge.
Returns:
(:class:`TimeEnergyBins`)
"""
# empty bins
empty = self.__class__(np.array([[]]).reshape(0,0), [], [], [], [], [])
# set the start and stop of the rebinning segment
erange = self.energy_range
if emin is None:
emin = erange[0]
if emax is None:
emax = erange[1]
if emin < erange[0]:
emin = erange[0]
if emax > erange[1]:
emax = erange[1]
bins = self.contiguous_energy_bins()
new_histos = []
for bin in bins:
# split the histogram into pieces so that we only rebin the piece
# that needs to be rebinned
pre = empty
post = empty
erange = bin.energy_range
if (emax < erange[0]) or (emin > erange[1]):
histo = empty
elif emax == erange[1]:
if emin > erange[0]:
pre = bin.slice_energy(erange[0],
self.closest_energy_edge(emin,
which='low'))
histo = bin.slice_energy(self.closest_energy_edge(emin,
which='low'),
erange[1])
elif emin == erange[0]:
histo = bin.slice_energy(erange[0],
self.closest_energy_edge(emax,
which='high'))
if emax < erange[1]:
post = bin.slice_energy(self.closest_energy_edge(emax,
which='high'),
erange[1])
elif (emin > erange[0]) and (emax < erange[1]):
pre = bin.slice_energy(erange[0],
self.closest_energy_edge(emin, which='low'))
histo = bin.slice_energy(
self.closest_energy_edge(emin, which='low'),
self.closest_energy_edge(emax, which='high'))
post = bin.slice_energy(self.closest_energy_edge(emax,
which='high'), erange[1])
else:
histo = bin
# perform the rebinning and create a new histo with the
# rebinned rates
if histo.num_chans > 0:
edges = np.append(histo.emin, histo.emax[-1])
num_times, num_chans = histo.size
new_counts = []
new_uncert = []
for i in range(num_times):
exposure = np.full(num_chans, histo.exposure[i])
new_cts, uncert, _, new_edges = method(histo.counts[i, :],
histo.count_uncertainty[i,:],
exposure, edges, *args)
new_counts.append(new_cts)
new_uncert.append(uncert)
new_counts = np.array(new_counts)
new_uncert = np.array(new_uncert)
new_histo = TimeEnergyBins(new_counts, bin.tstart, bin.tstop,
bin.exposure, new_edges[:-1],
new_edges[1:],
count_uncerts=new_uncert)
else:
new_histo = bin
# now merge the split histo back together again
histos_to_merge = [i for i in (pre, new_histo, post) if
i.num_chans > 0]
if len(histos_to_merge) > 0:
new_histos.append(TimeEnergyBins.merge_energy(histos_to_merge))
new_histo = TimeEnergyBins.merge_energy(new_histos)
return new_histo
[docs] def rebin_time(self, method, *args, tstart=None, tstop=None):
"""Rebin the TimeEnergyBins object along the time axis given a binning
function and return a new TimeEnergyBins object
The binning function should take as input an array of counts,
array of exposures, and an array of bin edges. Additional arguments
specific to the function are allowed. The function should return an
array of the new counts, new exposure, and new edges.
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
tstart (float, optional):
If set, defines the start time of the TimeEnergyBins to be
binned, otherwise binning will begin at the time of the first
bin edge.
tstop (float, optional):
If set, defines the end time of the TimeEnergyBins to be
binned, otherwise binning will end at the time of the last
bin edge.
Returns:
(:class:`TimeEnergyBins`)
"""
# empty bins
empty = self.__class__(np.array([[]]).reshape(0,0), [], [], [], [], [])
# set the start and stop of the rebinning segment
trange = self.time_range
if tstart is None:
tstart = trange[0]
if tstop is None:
tstop = trange[1]
if tstart < trange[0]:
tstart = trange[0]
if tstop > trange[1]:
tstop = trange[1]
bins = self.contiguous_time_bins()
new_histos = []
for bin in bins:
trange = bin.time_range
# split the histogram into pieces so that we only rebin the piece
# that needs to be rebinned
pre = empty
post = empty
if (tstop < trange[0]) or (tstart > trange[1]):
histo = empty
elif tstop == trange[1]:
if tstart > trange[0]:
pre = bin.slice_time(trange[0],
self.closest_time_edge(tstart,
which='low'))
histo = bin.slice_time(self.closest_time_edge(tstart,
which='low'),
trange[1])
elif tstart == trange[0]:
histo = bin.slice_time(trange[0],
self.closest_time_edge(tstop,
which='high'))
if tstop < trange[1]:
post = bin.slice_time(self.closest_time_edge(tstop,
which='high'),
trange[1])
elif (tstart > trange[0]) and (tstop < trange[1]):
pre = bin.slice_time(trange[0],
self.closest_time_edge(tstart, which='low'))
histo = bin.slice_time(
self.closest_time_edge(tstart, which='low'),
self.closest_time_edge(tstop, which='high'))
post = bin.slice_time(self.closest_time_edge(tstop, which='high'),
trange[1])
else:
histo = bin
# perform the rebinning and create a new histo with the
# rebinned rates
if histo.num_times > 0:
edges = np.append(histo.tstart, histo.tstop[-1])
new_counts = []
new_uncert = []
for i in range(bin.num_chans):
new_cts, uncert, new_exposure, new_edges = method(
histo.counts[:, i],
histo.count_uncertainty[:, i],
histo.exposure,
edges, *args)
new_counts.append(new_cts)
new_uncert.append(uncert)
new_counts = np.array(new_counts).T
new_uncert = np.array(new_uncert).T
new_histo = TimeEnergyBins(new_counts, new_edges[:-1],
new_edges[1:], new_exposure,
bin.emin, bin.emax,
count_uncerts=new_uncert)
else:
new_histo = bin
# now merge the split histo back together again
histos_to_merge = [i for i in (pre, new_histo, post) if
i.num_times > 0]
if len(histos_to_merge) > 0:
new_histos.append(TimeEnergyBins.merge_time(histos_to_merge))
new_histo = TimeEnergyBins.merge_time(new_histos)
return new_histo
[docs] def slice_energy(self, emin, emax):
"""Perform a slice over an energy range and return a new TimeEnergyBins
object. Note that emin and emax values that fall inside a bin will
result in that bin being included.
Args:
emin (float): The start of the slice
emax (float): The end of the slice
Returns:
(:class:`TimeEnergyBins`)
"""
mask = self._slice_energy_mask(emin, emax)
obj = TimeEnergyBins(self.counts[:, mask], self.tstart, self.tstop,
self.exposure, self.emin[mask], self.emax[mask],
count_uncerts=self.count_uncertainty[:, mask],
quality=self.quality)
return obj
[docs] def slice_time(self, tstart, tstop):
"""Perform a slice over a time range and return a new TimeEnergyBins
object. Note that tstart and tstop values that fall inside a bin will
result in that bin being included.
Args:
tstart (float): The start of the slice
tstop (float): The end of the slice
Returns:
(:class:`TimeEnergyBins`)
"""
mask = self._slice_time_mask(tstart, tstop)
cls = type(self)
obj = cls(self.counts[mask, :], self.tstart[mask], self.tstop[mask],
self.exposure[mask], self.emin, self.emax,
count_uncerts=self.count_uncertainty[mask, :],
quality=self.quality[mask])
return obj
[docs] @classmethod
def from_rates(cls, rates, rate_uncerts, tstart, tstop, exposure, emin,
emax, **kwargs):
"""Create an TimeEnergyBins object from count rates and uncertainties.
Args:
rates (np.array): The count rates
rate_uncerts (np.array): The count rate uncertainties
tstart (np.array): The low-value edges of the time bins
tstop (np.array): The high-value edges of the time bins
exposure (np.array): The exposure of each bin
emin (np.array): The low-value edges of the energy bins
emax (np.array): The high-value edges of the energy bins
quality (np.array, optional): The spectrum quality flag
precalc_good_segments (bool, optional): If True, calculates the
good time and energy
segments on initialization.
Returns:
(:class:`TimeEnergyBins`)
"""
counts = rates * exposure[:, np.newaxis]
count_uncerts = rate_uncerts * exposure[:, np.newaxis]
return cls(counts, tstart, tstop, exposure, emin, emax,
count_uncerts=count_uncerts, **kwargs)
[docs] @classmethod
def merge_energy(cls, histos, **kwargs):
"""Merge multiple TimeEnergyBins together along the energy axis.
Args:
histos (list of :class:`TimeEnergyBins`):
A list containing the TimeEnergyBins to be merged
Returns:
(:class:`TimeEnergyBins`)
"""
num = len(histos)
# sort by channel edge
emins = np.concatenate([[histo.emin[0]] for histo in histos])
idx = np.argsort(emins)
# concatenate the histos in order
counts = histos[idx[0]].counts
uncert = histos[idx[0]].count_uncertainty
tstart = histos[idx[0]].tstart
tstop = histos[idx[0]].tstop
exposure = histos[idx[0]].exposure
quality = histos[idx[0]].quality
emin = histos[idx[0]].emin
emax = histos[idx[0]].emax
for i in range(1, num):
bin_starts = histos[idx[i]].emin
# make sure there is no overlap
mask = (bin_starts >= emax[-1])
if (~mask).sum() > 0:
raise ValueError('Overlapping bins cannot be merged. Only' \
'non-overlapping bins can be merged.')
counts = np.hstack((counts, histos[idx[i]].counts[:, mask]))
uncert = np.hstack((uncert, histos[idx[i]].count_uncertainty[:, mask]))
emin = np.concatenate((emin, histos[idx[i]].emin[mask]))
emax = np.concatenate((emax, histos[idx[i]].emax[mask]))
# new TimeEnergyBins object
merged_bins = cls(counts, tstart, tstop, exposure, emin, emax,
count_uncerts=uncert, quality=quality, **kwargs)
return merged_bins
[docs] @classmethod
def merge_time(cls, histos, **kwargs):
"""Merge multiple TimeEnergyBins together along the time axis.
Args:
histos (list of :class:`TimeEnergyBins`):
A list containing the TimeEnergyBins to be merged
Returns:
(:class:`TimeEnergyBins`)
"""
num = len(histos)
# sort by start time
tstarts = np.concatenate([[histo.tstart[0]] for histo in histos])
idx = np.argsort(tstarts)
# concatenate the histos in order
counts = histos[idx[0]].counts
uncert = histos[idx[0]].count_uncertainty
tstart = histos[idx[0]].tstart
tstop = histos[idx[0]].tstop
exposure = histos[idx[0]].exposure
emin = histos[idx[0]].emin
emax = histos[idx[0]].emax
quality = histos[idx[0]].quality
for i in range(1, num):
bin_starts = histos[idx[i]].tstart
# make sure there is no overlap
mask = (bin_starts >= tstop[-1])
if (~mask).sum() > 0:
raise ValueError('Overlapping bins cannot be merged. Only' \
'non-overlapping bins can be merged.')
counts = np.vstack((counts, histos[idx[i]].counts[mask, :]))
uncert = np.vstack((uncert, histos[idx[i]].count_uncertainty[mask, :]))
tstart = np.concatenate((tstart, histos[idx[i]].tstart[mask]))
tstop = np.concatenate((tstop, histos[idx[i]].tstop[mask]))
exposure = np.concatenate(
(exposure, histos[idx[i]].exposure[mask]))
quality = np.concatenate((quality, histos[idx[i]].quality[mask]))
# new TimeEnergyBins object
merged_bins = cls(counts, tstart, tstop, exposure, emin, emax,
count_uncerts=uncert, quality=quality, **kwargs)
return merged_bins
def _assert_range(self, valrange):
assert valrange[0] <= valrange[1], \
'Range must be in increasing order: (lo, hi)'
return valrange
def _calculate_good_segments(self, lo_edges, hi_edges):
"""Calculates the ranges of data that are contiguous segments
Args:
lo_edges (np.array): The lower bin edges
hi_edges (np.array): The upper bin edges
Returns:
([(float, float), ...])
"""
mask = (lo_edges[1:] != hi_edges[:-1])
if mask.sum() == 0:
return [(lo_edges[0], hi_edges[-1])]
edges = np.concatenate(([lo_edges[0]], hi_edges[:-1][mask],
lo_edges[1:][mask], [hi_edges[-1]]))
edges.sort()
return edges.reshape(-1, 2).tolist()
def _closest_edge(self, edges, val, which='either'):
"""Return the closest time bin edge
Args:
val (float): Input value
which (str, optional): Options are:
* 'either' - closest edge to val;
* 'low' - closest edge lower than val;
* 'high' - closest edge higher than val. Default is 'either'
Returns:
(float)
"""
idx = np.argmin(np.abs(val - edges))
if which == 'low':
if (edges[idx] > val) and (idx - 1) >= 0:
idx -= 1
elif (which == 'high') and (idx + 1) < edges.size:
if edges[idx] < val:
idx += 1
else:
pass
return edges[idx]
def _slice_energy_mask(self, emin, emax):
emin_snap = self.closest_energy_edge(emin, which='low')
emax_snap = self.closest_energy_edge(emax, which='high')
mask = (self.emin < emax_snap) & (self.emax > emin_snap)
return mask
def _slice_time_mask(self, tstart, tstop):
tstart_snap = self.closest_time_edge(tstart, which='low')
tstop_snap = self.closest_time_edge(tstop, which='high')
mask = (self.tstart < tstop_snap) & (self.tstop > tstart_snap)
return mask
def __repr__(self):
s = '<{0}: {1} time bins;\n'.format(self.__class__.__name__,
self.num_times)
s += ' time range {0};\n'.format(self.time_range)
if self._good_time_segments is not None:
s += ' {0} time segments;\n'.format(len(self._good_time_segments))
s += ' {0} energy bins;\n'.format(self.num_chans)
s += ' energy range {0}'.format(self.energy_range)
if self._good_energy_segments is not None:
s += ';\n {0} energy segments'.format(len(self._good_energy_segments))
return s+'>'