# CONTAINS TECHNICAL DATA/COMPUTER SOFTWARE DELIVERED TO THE U.S. GOVERNMENT WITH UNLIMITED RIGHTS
#
# Contract No.: CA 80MSFC17M0022
# Contractor Name: Universities Space Research Association
# Contractor Address: 7178 Columbia Gateway Drive, Columbia, MD 21046
#
# Copyright 2017-2022 by Universities Space Research Association (USRA). All rights reserved.
#
# Developed by: William Cleveland and Adam Goldstein
# Universities Space Research Association
# Science and Technology Institute
# https://sti.usra.edu
#
# Developed by: Daniel Kocevski
# National Aeronautics and Space Administration (NASA)
# Marshall Space Flight Center
# Astrophysics Branch (ST-12)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing permissions and limitations under the
# License.
#
import numpy as np
from scipy.interpolate import interp1d
from scipy.integrate import trapezoid
import copy
__all__ = ['Range', 'TimeRange', 'EnergyRange', 'Intervals', 'Gti', 'Ebounds',
'EventList', 'Bins', 'ExposureBins', 'ChannelBins', 'TimeBins',
'EnergyBins', 'TimeChannelBins', 'TimeEnergyBins', 'ResponseMatrix',
'Parameter']
[docs]class Range():
"""A primitive class defining a range
Parameters:
low (float): The low end of the range
high (float): The high end of the range
"""
def __init__(self, low, high):
if high >= low:
self._low = low
self._high = high
else:
self._low = high
self._high = low
@property
def center(self):
"""(float): The center of the range"""
return (self._high + self._low) / 2.0
@property
def width(self):
"""(float): The width of the range"""
return self._high - self._low
[docs] def as_tuple(self):
"""Return the range as a tuple.
Returns:
(float, float)
"""
return (self._low, self._high)
[docs] def contains(self, value, inclusive=True):
"""Determine if the range contains a value.
Args:
value (float): The input value to check
inclusive (bool, optional):
If True, then includes the edges of the range for the check,
otherwise it is edge-exclusive. Default is True.
Returns:
bool: True if the value is in the range, False otherwise
"""
if inclusive:
test = (value <= self._high) and (value >= self._low)
else:
test = (value < self._high) and (value > self._low)
if test:
return True
else:
return False
[docs] @classmethod
def intersection(cls, range1, range2):
"""Return a new Range that is the intersection of two input Ranges.
If the input Ranges do not intersect, then None is returned.
Args:
range1 (:class:`Range`): A range used to calculate the intersection
range2 (:class:`Range`): Another range used to calculate the
intersection
Returns:
:class:`Range`: The intersected range
"""
# test if one low is inside the other range
if range1.contains(range2._low):
lower = range1
upper = range2
elif range2.contains(range1._low):
lower = range2
upper = range1
else:
return None
# do the merge
low = upper._low
high = np.min((lower._high, upper._high))
obj = cls(low, high)
return obj
[docs] @classmethod
def union(cls, range1, range2):
"""Return a new Range that is the union of two input Ranges
Args:
range1 (:class:`Range`): A range used to calculate the union
range2 (:class:`Range`): Another range used to calculate the union
Returns:
:class:`Range`: The unionized range
"""
low = np.min((range1._low, range2._low))
high = np.max((range1._high, range2._high))
obj = cls(low, high)
return obj
[docs] def translate(self, value):
"""Returns a new Range that is the translated (shifted) by the given value
Args:
value (float): The value to shift the range by
Returns:
:class:`Range`: The shifted range
"""
return self.__class__(self._low + value, self._high + value)
def __eq__(self, other):
return (self._low == other._low) and (self._high == other._high)
def __repr__(self):
return '<{0}: ({1}, {2})>'.format(self.__class__.__name__, self._low,
self._high)
def _assert_float(self, value):
try:
value = float(value)
except:
raise TypeError('value must be a float')
return value
# mark: TODO: add units to TimeRange/EnergyRange
[docs]class TimeRange(Range):
"""A primitive class defining a time range
Parameters:
tstart (float): The start time of the range
tstop (float): The end time of the range
"""
def __init__(self, tstart, tstop):
tstart = self._assert_float(tstart)
tstop = self._assert_float(tstop)
super().__init__(tstart, tstop)
@property
def duration(self):
"""(float): The duration of the time range.
Alias for ``width``."""
return self.width
@property
def tstart(self):
"""(float): The start time of the range"""
return self._low
@property
def tstop(self):
"""(float): The end time of the range"""
return self._high
[docs]class EnergyRange(Range):
"""A primitive class defining an energy range
Parameters:
emin (float): The low end of the energy range
emax (float): The high end of the energy range
"""
def __init__(self, emin, emax):
emin = self._assert_float(emin)
emax = self._assert_float(emax)
super().__init__(emin, emax)
@property
def emax(self):
"""(float): The high end of the energy range"""
return self._high
@property
def emin(self):
"""(float): The low end of the energy range"""
return self._low
@property
def log_center(self):
"""log_center (float): The logarithmic center of the energy range"""
return np.sqrt(self.emin * self.emax)
[docs]class Intervals():
"""A primitive class defining a set of intervals or ranges.
An interval may be accessed by using indices.
Parameters:
interval (:class:`Range`, optional): An interval to initialize with
"""
_range_class = Range
def __init__(self, interval=None):
self._intervals = None
if interval is not None:
self._intervals = [interval]
@property
def intervals(self):
"""(list): The list of intervals"""
return self._intervals
@property
def num_intervals(self):
"""(int): The number of intervals"""
return len(self._intervals)
@property
def range(self):
"""(float, float): The full range spanned by the intervals"""
if self.num_intervals > 0:
return (self._intervals[0].as_tuple()[0],
self._intervals[-1].as_tuple()[1])
[docs] def as_list(self):
"""Return the intervals as a list of tuples.
Returns:
[(float, float), ...]
"""
interval_list = [interval.as_tuple() for interval in self._intervals]
return interval_list
[docs] def contains(self, value, inclusive=True):
"""Determine if the intervals contains a value.
Args:
value (float): The input value to check
inclusive (bool, optional):
If True, then includes the edges of the range for the check,
otherwise it is edge-exclusive. Default is True.
Returns:
bool: True if the value is in the intervals, False otherwise
"""
test = [interval.contains(value, inclusive=inclusive) for interval \
in self._intervals]
return any(test)
[docs] def high_edges(self):
"""Return a list of the high edges.
Returns:
(list)
"""
edges = [interval._high for interval in self._intervals]
return edges
[docs] def index(self, value):
"""Return the index of the interval that contains the value.
Note :
If the value is precisely on the boundary of two intervals, the
first interval index will be returned.
Args:
value (float): The input value
Returns:
(int)
"""
for i, interval in enumerate(self._intervals):
if interval.contains(value, inclusive=True):
return i
[docs] def insert(self, interval):
"""Insert a new interval
Args:
interval (:class:`Range`): The interval to insert
"""
# if interval is a duplicate, then skip
for _interval in self._intervals:
if interval == _interval:
return
# where the new range should be inserted
idx = [i for i, j in enumerate(self._intervals) if
j._low <= interval._low]
# determine if there is overlap with the lower bounding range, and if
# so, then merge
if len(idx) != 0:
idx = idx[-1]
if self._intervals[idx].contains(interval._low, inclusive=False):
the_range = self._intervals.pop(idx)
interval = type(interval).union(the_range, interval)
else:
idx += 1
else:
idx = 0
# determine if there is overlap with the upper bounding range, and if
# so, then merge
if idx < len(self._intervals):
if self._intervals[idx].contains(interval._high):
the_range = self._intervals.pop(idx)
interval = type(interval).union(the_range, interval)
self._intervals.insert(idx, interval)
[docs] def low_edges(self):
"""Return a list of the low edges.
Returns:
(list)
"""
edges = [interval._low for interval in self._intervals]
return edges
[docs] @classmethod
def from_bounds(cls, low_bounds, high_bounds):
"""Create a new Intervals object from a list of lower and upper bounds.
Args:
low_bounds (list): The lower bounds of the intervals
high_bounds (list): The upper bounds of the intervals
Returns:
:class:`Intervals`
"""
low_bounds = np.asarray(low_bounds).flatten()
high_bounds = np.asarray(high_bounds).flatten()
num = low_bounds.size
if num != high_bounds.size:
raise ValueError('low_bounds and high_bounds must be of same size')
intervals = [cls._range_class(low_bounds[i], high_bounds[i]) \
for i in range(num)]
obj = cls(intervals[0])
obj._intervals = intervals
return obj
[docs] @classmethod
def from_list(cls, interval_list):
"""Create a new Intervals object from a list of tuples.
Args:
interval_list ([(float, float), ...]): A list of interval tuples
Returns:
:class:`Intervals`
"""
intervals = [cls._range_class(*interval) for interval in interval_list]
obj = cls(intervals[0])
obj._intervals = intervals
return obj
[docs] @classmethod
def intersection(cls, intervals1, intervals2):
"""Return a new Intervals object that is the intersection of two
existing Intervals objects.
Args:
intervals1 (:class:`Intervals`): Intervals to be intersected
intervals2 (:class:`Intervals`): Intervals to be intersected
Returns:
:class:`Intervals`
"""
if intervals1.range[0] <= intervals2.range[0]:
_intervals1 = intervals1
_intervals2 = intervals2
else:
_intervals1 = intervals2
_intervals2 = intervals1
new_intervals = []
for interval1 in _intervals1.intervals:
for interval2 in _intervals2.intervals:
if not interval1.contains(interval2._low) and \
not interval1.contains(interval2._high):
continue
new_intervals.append(cls._range_class.intersection(interval1,
interval2))
obj = cls()
obj._intervals = new_intervals
return obj
[docs] @classmethod
def merge(cls, intervals1, intervals2):
"""Return a new Intervals object that is a merge of two existing
Intervals objects.
Args:
intervals1 (:class:`Intervals`): Intervals to be merged
intervals2 (:class:`Intervals`): Intervals to be merged
Returns:
(:class:`Intervals`)
"""
interval_list = intervals1.as_list()
interval_list.extend(intervals2.as_list())
interval_list = sorted(interval_list)
obj = cls(cls._range_class(*interval_list.pop(0)))
for interval in interval_list:
obj.insert(cls._range_class(*interval))
return obj
def __repr__(self):
s = '<{0}: {1} intervals; range {2}>'.format(self.__class__.__name__,
self.num_intervals,
self.range)
return s
def __getitem__(self, index):
if index > self.num_intervals-1:
raise KeyError('Requested {0}th interval and only {1} intervals '\
'exist'.format(index, self.num_intervals))
return self._intervals[index]
[docs]class Gti(Intervals):
"""A primitive class defining a set of Good Time Intervals (GTIs).
An interval may be accessed by using indices.
Parameters:
interval (:class:`TimeRange`, optional): An interval to initialize with
"""
_range_class = TimeRange
[docs] @classmethod
def from_boolean_mask(cls, times, mask):
"""Create a new GTI object from a list of times and a Boolean mask
Splits the boolean mask into segments of contiguous values and applies
to array of times to create a GTI object.
Args:
times (np.array): An array of times
mask (np.array(dtype=bool)): The boolean array. Must be the same
size as times.
Returns:
:class:`Gti`: The new GTI object
"""
times = np.asarray(times)
mask = np.asarray(mask)
# split a boolean mask array into segments based on True/False
indices = np.nonzero(mask[1:] != mask[:-1])[0] + 1
time_segs = np.split(times, indices)
mask_segs = np.split(mask, indices)
# retrieve the start and stop times for the "on" intervals
segs = []
numsegs = len(indices) + 1
for i in range(numsegs):
if mask_segs[i][0]:
segs.append((time_segs[i][0], time_segs[i][-1]))
# if mask is all True or all False
if len(segs) == 0:
if mask[0]:
segs = [(times.min(), times.max())]
else:
return None
return cls.from_list(segs)
[docs]class Ebounds(Intervals):
"""A primitive class defining a set of energy bounds.
An interval may be accessed by using indices.
Parameters:
interval (:class:`EnergyRange`, optional): An interval to initialize with
"""
_range_class = EnergyRange
[docs]class EventList():
"""A primitive class defining an event list.
Parameters:
times (np.array): The array of event times
channels (np.array): The corresponding array of associated energy channel
ebounds (:class:`Ebounds`, optional): The energy bounds mapped to
energy channels
"""
def __init__(self, times=None, channels=None, ebounds=None):
if (times is not None) and (channels is not None):
times = np.asarray(times).flatten()
channels = np.asarray(channels).flatten()
if times.size != channels.size:
raise ValueError('times and channels arrays must be same length')
else:
times = np.array([], dtype=float)
channels = np.array([], dtype=int)
self._events = np.empty(times.size, dtype=[('TIME', times.dtype.type),
('PHA', channels.dtype.type)])
self._events['TIME'] = times
self._events['PHA'] = channels
if ebounds is not None:
if not isinstance(ebounds, Ebounds):
raise TypeError('ebounds must be of type Ebounds')
self._ebounds = ebounds
@property
def channel_range(self):
"""(int, int): The range of the channels in the list"""
if self.size > 0:
return int(np.min(self.channels)), int(np.max(self.channels))
@property
def channels(self):
"""(np.array): The PHA channel array"""
return self._events['PHA']
@property
def ebounds(self):
"""(:class:`Ebounds`): The energy bounds of the energy channels.
This property can be set.
"""
return self._ebounds
@ebounds.setter
def ebounds(self, val):
if not isinstance(val, Ebounds):
raise TypeError('ebounds must be an Ebounds object')
self._ebounds = val
@property
def emax(self):
"""(float): The maximum energy"""
if self._ebounds is not None:
return self._ebounds.range[1]
@property
def emin(self):
"""(float): The minimum energy"""
if self._ebounds is not None:
return self._ebounds.range[0]
@property
def energy_range(self):
"""(float, float): The energy range of the channels"""
if self.size > 0:
if self._ebounds is not None:
chan_min, chan_max = self.channel_range
return (self._ebounds[chan_min].emin,
self._ebounds[chan_max].emax)
@property
def num_chans(self):
"""(int): The number of energy channels.
Note that not all channels will necessarily have events, especially if a
slice is made over energy."""
if self.ebounds is not None:
return self._ebounds.num_intervals
else:
return (self._events['PHA'].max() - self._events['PHA'].min()) + 1
@property
def size(self):
"""(int): The number of events in the list"""
return self._events.size
@property
def time_range(self):
"""(float, float): The range of the times in the list"""
if self.size > 0:
return self.times.min(), self.times.max()
@property
def times(self):
"""(np.array): The event times"""
return self._events['TIME']
[docs] def bin(self, method, *args, tstart=None, tstop=None,
event_deadtime=2.6e-6, overflow_deadtime=1.0e-5, **kwargs):
"""Bin the EventList in time given a binning function and return a
2D time-energy channel histogram. If the ebounds energy calibration is
set, returns a :class:`TimeEnergyBins` object, otherwise returns a
:class:`TimeChannelBins` object.
The binning function should take as input an array of times as well
as a tstart and tstop keywords for partial list binning. Additional
arguments and keyword arguments specific to the function are allowed.
The function should return an array of time edges for the bins, such
that, for `n` bins, there are `n` + 1 edges.
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
tstart (float, optional):
If set, defines the start time of the EventList to be binned,
otherwise binning will begin at the time of the first event.
tstop (float, optional):
If set, defines the end time of the EventList to be binned,
otherwise binning will end at the time of the last event.
event_deadtime (float, optional): The deadtime per event in seconds.
Default is 2.6e-6.
overflow_deadtime (float, optional):
The deadtime per event in the overflow channel in seconds.
Default is 1e-5.
**kwargs: Options to be passed to the binning function
Returns:
(:class:`TimeEnergyBins` or :class:`TimeChannelBins`)
"""
if tstart is None:
tstart = self.time_range[0]
if tstop is None:
tstop = self.time_range[1]
# set the start and stop of the rebinning segment
mask = (self.times >= tstart) & (self.times <= tstop)
bin_times = self.times[mask]
kwargs['tstart'] = tstart
kwargs['tstop'] = tstop
# get the time edges from the binning function and then do the 2d histo
time_edges = method(bin_times, *args, **kwargs)
if self.ebounds is not None:
chan_list = np.arange(self.ebounds.num_intervals + 1)
else:
chan_list = np.arange(self.channel_range[1] + 2)
counts = np.histogram2d(bin_times, self.channels[mask],
[time_edges, chan_list])[0]
# calculate exposure
lo_edges = time_edges[:-1]
hi_edges = time_edges[1:]
overflow_counts = counts[:, -1]
deadtime = counts.sum(axis=1) * event_deadtime + \
overflow_counts * (overflow_deadtime - event_deadtime)
exposure = (hi_edges - lo_edges) - deadtime
if self.ebounds is None:
bins = TimeChannelBins(counts, lo_edges, hi_edges, exposure,
chan_list[:-1])
else:
bins = TimeEnergyBins(counts, lo_edges, hi_edges, exposure,
self.ebounds.low_edges(),
self.ebounds.high_edges())
return bins
[docs] def channel_slice(self, chanlo, chanhi):
"""Perform a slice in energy channels of the EventList and return a
new EventList
Args:
chanlo (int): The start of the channel slice
chanhi (int): The end of the channel slice
Returns:
(:class:`EventList`)
"""
mask = (self.channels >= chanlo) & (self.channels <= chanhi)
events = self._events[mask]
return EventList(times=events['TIME'], channels=events['PHA'],
ebounds=copy.deepcopy(self._ebounds))
[docs] def count_spectrum(self, **kwargs):
"""Extract a count spectrum for the EventList or for a segment of the
EventList. If the ebounds energy calibration is set, returns an
:class:`EnergyBins` object, otherwise returns a :class:`ChannelBins`
object.
Args:
time_ranges ([(float, float), ...], optional):
The time range or time ranges over which to calculate the
exposure. If omitted, calculates the total exposure of the data.
event_deadtime (float, optional): The deadtime per event in seconds.
Default is 0.
overflow_deadtime (float, optional):
The deadtime per event in the overflow channel in seconds.
Default is 0.
Returns:
(:class:`EnergyBins` or :class:`ChannelBins`)
"""
if self.ebounds is not None:
chan_list = np.arange(self.ebounds.num_intervals + 1)
else:
chan_list = np.arange(self.channel_range[1] + 2)
counts = np.histogram(self.channels, bins=chan_list)[0]
exposure = np.full(chan_list.size-1, self.get_exposure(time_ranges=None,
**kwargs))
if self.ebounds is None:
bins = ChannelBins.create(counts, chan_list[:-1], exposure)
else:
bins = EnergyBins(counts, self.ebounds.low_edges(),
self.ebounds.high_edges(), exposure)
return bins
[docs] def energy_slice(self, emin, emax):
"""Perform a slice in energy of the EventList and return a new EventList.
Since energies are binned, an emin or emax falling inside of an energy
channel bin will include that bin in the slice.
Args:
emin (float): The start of the energy slice
emax (float): The end of the energy slice
Returns:
(:class:`EventList`)
"""
if self._ebounds is None:
raise AttributeError('Ebounds has not been set, therefore the \
the EventList cannot be sliced by energy')
emins = np.array([ebound.emin for ebound in self._ebounds.intervals])
emaxs = np.array([ebound.emax for ebound in self._ebounds.intervals])
mask = (emins < emax) & (emaxs > emin)
chan_nums = np.arange(self.channel_range[0], self.channel_range[1]+1)
chan_nums = chan_nums[mask]
return self.channel_slice(chan_nums[0], chan_nums[-1])
[docs] def get_exposure(self, time_ranges=None, event_deadtime=0.0,
overflow_deadtime=0.0):
"""Calculate the total exposure, in seconds, of a time range or time
ranges of data.
Args:
time_ranges ([(float, float), ...], optional):
The time range or time ranges over which to calculate the
exposure. If omitted, calculates the total exposure of the data.
event_deadtime (float, optional): The deadtime per event in seconds.
Default is 0.
overflow_deadtime (float, optional):
The deadtime per event in the overflow channel in seconds.
Default is 0.
Returns:
(float)
"""
if time_ranges is None:
time_ranges = [self.time_range]
try:
iter(time_ranges[0])
except:
time_ranges = [time_ranges]
exposure = 0.0
for i in range(len(time_ranges)):
tstart, tstop = self._assert_range(time_ranges[i])
tcent = (tstop + tstart) / 2.0
dt = (tstop - tstart) / 2.0
mask = (np.abs(self.times - tcent) <= dt)
# mask = (self.time >= tstart) & (self.time < tstop)
deadtime = mask.sum() * event_deadtime
if self.ebounds is not None:
of_chan = self.ebounds.num_intervals-1
else:
of_chan = self.channel_range[1]
deadtime += np.sum(self.channels[mask] == of_chan) * \
(overflow_deadtime - event_deadtime)
exposure += (tstop - tstart) - deadtime
return exposure
[docs] def rebin_energy(self, method, *args, **kwargs):
"""Rebin the energy channels using the specified binning algorithm.
This does not change the number of events in the EventList, but changes
their assignment to a channel and bins the energy bounds mapping to
those channels (if applicable). A new EventList object is returned.
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
**kwargs: Deadtime arguments passed to get_exposure
Returns:
(:class:`EventList`)
"""
# exposure and counts; not really used other than for some specific
# binning algorithms
exposure = np.full(self.num_chans, self.get_exposure(**kwargs))
chans, counts = np.unique(self.channels, return_counts=True)
if chans.size != self.num_chans:
counts_fill = np.zeros(self.num_chans)
counts_fill[chans] = counts
counts = counts_fill
edges = np.arange(self.num_chans + 1)
# call the binning algorithm and get the new edges
_, _, _, new_edges = method(counts, np.sqrt(counts), exposure, edges, *args)
# re-assign the pha channels based on the new edges
# and also rebin the ebounds
new_channels = np.zeros_like(self.channels)
new_ebounds = []
for i in range(len(new_edges) - 1):
emin = new_edges[i]
emax = new_edges[i + 1]
mask = (self.channels >= emin) & (self.channels < emax)
new_channels[mask] = i
if self.ebounds is not None:
new_ebounds.append((self.ebounds[emin].emin,
self.ebounds[emax - 1].emax))
# create the new EventList object with the rebinned channels
if self.ebounds is not None:
new_ebounds = Ebounds.from_list(new_ebounds)
else:
new_ebounds = None
obj = EventList(times=np.copy(self.times), channels=new_channels,
ebounds=new_ebounds)
return obj
[docs] def sort_time(self):
"""In-place sort by time.
"""
idx = np.argsort(self._events['TIME'])
self._events = self._events[:][idx]
[docs] def sort_channels(self):
"""In-place sort by channel number.
"""
idx = np.argsort(self._events['PHA'])
self._events = self._events[:][idx]
[docs] def time_slice(self, tstart, tstop):
"""Perform a slice in time of the EventList and return a new EventList
Args:
tstart (float): The start of the time slice
tstop (float): The end of the time slice
Returns:
(:class:`EventList`)
"""
mask = (self.times >= tstart) & (self.times <= tstop)
events = self._events[mask]
return EventList(times=events['TIME'], channels=events['PHA'],
ebounds=copy.deepcopy(self._ebounds))
[docs] @classmethod
def merge(cls, eventlists, sort=False, force_unique=True):
"""Merge multiple EventLists together in time and optionally sort.
Args:
eventlist (list of :class:`EventList`):
A list containing the EventLists to be merged
sort (bool, optional):
If True, sorts by time after the merge. Default is False.
force_unique (bool, optional):
If True, force all events to be unique via brute force sorting.
If False, the EventLists will only be checked and masked for
overlapping time ranges. Events can potentially be lost if the
merged EventLists contain overlapping times (but not necessarily
duplicate events), however this method is much faster.
Default is True.
Returns:
(:class:`EventList`)
"""
# put in time order
idx = np.argsort([eventlist.times.min() for eventlist in eventlists])
eventlists = [eventlists[i] for i in idx]
# mark: TODO add check for ebounds consistency
ebounds_idx = 0
for i in range(len(eventlists)):
if eventlists[i].ebounds is None:
continue
else:
ebounds_idx = i
break
new_times = np.array([])
new_chans = np.array([])
for eventlist in eventlists:
# skip empty EventLists
if eventlist.size == 0:
continue
# if not forcing to be unique, just make sure there is no time overlap
if (not force_unique) and (new_times.size > 0):
mask = (eventlist.times > new_times.max())
temp_times = eventlist.times[mask]
temp_chans = eventlist.channels[mask]
else:
temp_times = eventlist.times
temp_chans = eventlist.channels
new_times = np.concatenate((new_times, temp_times))
new_chans = np.concatenate((new_chans, temp_chans))
# force unique: make sure that we only keep unique events (slower)
if force_unique:
new_times, uidx = np.unique(new_times, return_index=True)
new_chans = new_chans[uidx]
obj = cls(times=new_times, channels=new_chans,
ebounds=copy.deepcopy(eventlists[ebounds_idx].ebounds))
# do a sort
if sort and not force_unique:
obj.sort_time()
return obj
def _assert_range(self, valrange):
assert valrange[0] <= valrange[1], \
'Range must be in increasing order: (lo, hi)'
return valrange
def __repr__(self):
s = '<EventList: {0} events;\n'.format(self.size)
s += ' time range {0};\n channel range {1}>'.format(self.time_range,
self.channel_range)
return s
[docs]class Bins():
"""A primitive class defining a set of histogram bins.
The number of counts in each bin are assumed to be represented by a Poisson
distribution unless the `count_uncerts` argument is specified with the
defined count uncertainties.
Parameters:
counts (np.array): The array of counts in each bin
lo_edges (np.array): The low-value edges of the bins
hi_edges (np.array): The high-value edges of the bins
count_uncerts (np.array, optional): An array the same length as `counts`
if the uncertainty is not Poisson.
"""
def __init__(self, counts, lo_edges, hi_edges, count_uncerts=None):
try:
iter(counts)
self._counts = np.asarray(counts).flatten()
except:
raise TypeError('counts must be an iterable')
try:
iter(lo_edges)
self._lo_edges = np.asarray(lo_edges).flatten()
except:
raise TypeError('lo_edges must be an iterable')
try:
iter(hi_edges)
self._hi_edges = np.asarray(hi_edges).flatten()
except:
raise TypeError('hi_edges must be an iterable')
if (self._counts.size != self._lo_edges.size) or \
(self._lo_edges.size != self._hi_edges.size):
raise ValueError('counts, lo_edges, and hi_edges must all be ' \
'the same length')
if count_uncerts is None:
count_uncerts = np.sqrt(counts)
try:
iter(count_uncerts)
self._count_uncerts = np.asarray(count_uncerts)
except:
raise TypeError('count_uncerts must be an iterable')
if self._count_uncerts.size != self._counts.size:
raise ValueError('count_uncerts must be same size as counts')
@property
def centroids(self):
"""(np.array): The centroids of the bins"""
return (self.hi_edges + self.lo_edges) / 2.0
@property
def counts(self):
"""(np.array): The counts in each bin"""
return self._counts
@property
def count_uncertainty(self):
"""(np.array): The count uncertainty in each bin"""
return self._count_uncerts
@property
def hi_edges(self):
"""(np.array): The high-value edges of the bins"""
return self._hi_edges
@property
def lo_edges(self):
"""(np.array): The low-value edges of the bins"""
return self._lo_edges
@property
def range(self):
"""(float, float): The range of the bin edges"""
if self.size > 0:
return (self.lo_edges[0], self.hi_edges[-1])
@property
def rates(self):
"""(np.array): The count rate of each bin; counts/width"""
return self.counts / self.widths
@property
def rate_uncertainty(self):
"""(np.array): The count rate uncertainty of each bin"""
return self.count_uncertainty / self.widths
@property
def size(self):
"""(int): Number of bins"""
return self.lo_edges.size
@property
def widths(self):
"""(np.array): The widths of the bins"""
return self.hi_edges - self.lo_edges
[docs] def closest_edge(self, val, which='either'):
"""Return the closest bin edge
Args:
val (float): Input value
which (str, optional): Options are:
* 'either' - closest edge to val;
* 'low' - closest edge lower than val; or
* 'high' - closest edge higher than val. Default is 'either'
Returns:
(float)
"""
edges = np.concatenate((self.lo_edges, [self.hi_edges[-1]]))
idx = np.argmin(np.abs(val - edges))
if which == 'low' and (idx - 1) >= 0:
if edges[idx] > val:
idx -= 1
elif (which == 'high') and (idx + 1) < edges.size:
if edges[idx] < val:
idx += 1
else:
pass
return edges[idx]
[docs] @classmethod
def from_rates(cls, rates, rate_uncerts, lo_edges, hi_edges):
"""Create a Bins object from count rates and uncertainties.
Args:
rates (np.array): The count rates
rate_uncerts (np.array): The count rate uncertainties
lo_edges (np.array): The low-value edges of the bins
hi_edges (np.array): The high-value edges of the bins
Returns:
(:class:`Bins`)
"""
dt = hi_edges - lo_edges
counts = rates * dt
count_uncerts = rate_uncerts * dt
return cls(counts, lo_edges, hi_edges, count_uncerts=count_uncerts)
[docs] def slice(self, lo_edge, hi_edge):
"""Perform a slice over the range of the bins and return a new Bins
object. Note that lo_edge and hi_edge values that fall inside a bin will
result in that bin being included.
Args:
lo_edge (float): The start of the slice
hi_edge (float): The end of the slice
Returns:
(:class:`Bins`)
"""
lo_snap = self.closest_edge(lo_edge, which='low')
hi_snap = self.closest_edge(hi_edge, which='high')
if lo_snap == hi_snap:
mask = (self.lo_edges < hi_snap) & (self.hi_edges >= lo_snap)
else:
mask = (self.lo_edges < hi_snap) & (self.hi_edges > lo_snap)
count_uncerts = self.count_uncertainty[mask]
obj = Bins(self.counts[mask], self.lo_edges[mask], self.hi_edges[mask],
count_uncerts=count_uncerts)
return obj
def __repr__(self):
s = '<Bins: {0} bins;\n'.format(self.size)
s += ' range {0}>'.format(self.range)
return s
[docs]class ExposureBins(Bins):
"""A class defining a set of bins containing exposure information.
Parameters:
counts (np.array): The array of counts in each bin
lo_edges (np.array): The low-value edges of the bins
hi_edges (np.array): The high-value edges of the bins
exposure (np.array): The exposure of each bin
count_uncerts (np.array, optional): An array the same length as `counts`
if the uncertainty is not Poisson.
precalc_good_segments (bool, optional): If True, calculates contiguous
bin segments on initialization.
Default is True.
"""
def __init__(self, counts, lo_edges, hi_edges, exposure,
count_uncerts=None, precalc_good_segments=True):
super().__init__(counts, lo_edges, hi_edges, count_uncerts=count_uncerts)
try:
iter(exposure)
self._exposure = np.asarray(exposure).flatten()
except:
raise TypeError('exposure must be an iterable')
if (self._counts.size != self._exposure.size):
raise ValueError('exposure must be the same size as counts')
self._good_segments = None
if precalc_good_segments:
self._good_segments = self._calculate_good_segments()
@property
def exposure(self):
"""(np.array): The exposure of each bin"""
return self._exposure
@property
def rates(self):
"""(np.array): The count rate of each bin: counts/exposure"""
r = np.zeros_like(self.exposure)
mask = (self.exposure > 0.0)
r[mask] = self.counts[mask] / self.exposure[mask]
return r
@property
def rate_uncertainty(self):
"""(np.array): The count rate uncertainty of each bin"""
r = np.zeros_like(self.exposure)
mask = (self.exposure > 0.0)
r[mask] = self.count_uncertainty[mask] / self.exposure[mask]
return r
[docs] def contiguous_bins(self):
"""Return a list of ExposureBins, each one containing a continuous
segment of data. This is done by comparing the edges of each bin, and
if there is a gap between edges, the data is split into separate
ExposureBin objects, each containing a contiguous set of data.
Returns
(list of :class:`ExposureBins`)
"""
if self._good_segments is not None:
good_segments = self._good_segments
else:
good_segments = self._calculate_good_segments()
bins = [self.slice(seg[0], seg[1]) for seg in good_segments]
return bins
[docs] @classmethod
def from_rates(cls, rates, rate_uncerts, lo_edges, hi_edges, exposure,
**kwargs):
"""Create an ExposureBins object from count rates and uncertainties.
Args:
rates (np.array): The count rates
rate_uncerts (np.array): The count rate uncertainties
lo_edges (np.array): The low-value edges of the bins
hi_edges (np.array): The high-value edges of the bins
exposure (np.array): The exposure of each bin
precalc_good_segments (bool, optional): If True, calculates contiguous
bin segments on initialization.
Default is True.
Returns:
(:class:`ExposureBins`)
"""
counts = rates * exposure
count_uncerts = rate_uncerts * exposure
return cls(counts, lo_edges, hi_edges, exposure,
count_uncerts=count_uncerts, **kwargs)
[docs] def rebin(self, method, *args, tstart=None, tstop=None):
"""Rebin the ExposureBins object in given a binning function and return
a new ExposureBins object
The binning function should take as input an array of counts,
array of exposures, and an array of bin edges. Additional arguments
specific to the function are allowed. The function should return an
array of the new counts, new exposure, and new edges.
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
tstart (float, optional):
If set, defines the start time of the ExposureBins to be binned,
otherwise binning will begin at the time of the first bin edge.
tstop (float, optional):
If set, defines the end time of the ExposureBins to be binned,
otherwise binning will end at the time of the last bin edge.
Returns:
(:class:`ExposureBins`)
"""
# empty bins
empty = self.__class__([], [], [], [], count_uncerts=[])
# set the start and stop of the rebinning segment
trange = self.range
if tstart is None:
tstart = trange[0]
if tstop is None:
tstop = trange[1]
if tstart < trange[0]:
tstart = trange[0]
if tstop > trange[1]:
tstop = trange[1]
bins = self.contiguous_bins()
new_histos = []
for bin in bins:
trange = bin.range
# split the histogram into pieces so that we only rebin the piece
# that needs to be rebinned
pre = empty
post = empty
if (tstop < trange[0]) or (tstart > trange[1]):
histo = empty
elif tstop == trange[1]:
if tstart > trange[0]:
pre = bin.slice(trange[0], self.closest_edge(tstart,
which='low'))
histo = bin.slice(self.closest_edge(tstart, which='low'),
trange[1])
elif tstart == trange[0]:
histo = bin.slice(trange[0],
self.closest_edge(tstop, which='high'))
if tstop < trange[1]:
post = bin.slice(self.closest_edge(tstop, which='high'),
trange[1])
elif (tstart > trange[0]) or (tstop < trange[1]):
pre = bin.slice(trange[0],
self.closest_edge(tstart, which='low'))
histo = bin.slice(self.closest_edge(tstart, which='low'),
self.closest_edge(tstop, which='high'))
post = bin.slice(self.closest_edge(tstop, which='high'),
trange[1])
else:
histo = bin
# perform the rebinning and create a new histo with the
# rebinned rates
if histo.size > 0:
edges = np.append(histo.lo_edges, histo.hi_edges[-1])
new_counts, new_uncert, new_exposure, new_edges = method(histo.counts,
histo.count_uncertainty,
histo.exposure,
edges,
*args)
new_histo = self.__class__(new_counts, new_edges[:-1],
new_edges[1:], new_exposure,
count_uncerts=new_uncert)
else:
new_histo = bin
# now merge the split histo back together again
histos_to_merge = [i for i in (pre, new_histo, post) if
i.size > 0]
if len(histos_to_merge) > 0:
new_histos.append(self.__class__.merge(histos_to_merge))
new_histo = self.__class__.merge(new_histos)
return new_histo
[docs] def slice(self, tstart, tstop):
"""Perform a slice over a range and return a new ExposureBins object.
Note that tstart and tstop values that fall inside a bin will result in
that bin being included.
Args:
tstart (float): The start of the slice
tstop (float): The end of the slice
Returns:
(:class:`ExposureBins`)
"""
tstart_snap = self.closest_edge(tstart, which='low')
tstop_snap = self.closest_edge(tstop, which='high')
mask = (self.lo_edges < tstop_snap) & (self.hi_edges > tstart_snap)
obj = self.__class__(self.counts[mask], self.lo_edges[mask],
self.hi_edges[mask], self.exposure[mask],
count_uncerts=self.count_uncertainty[mask])
return obj
[docs] @classmethod
def merge(cls, histos, **kwargs):
"""Merge multiple ExposureBins together. Note that the ExposureBins
to merged must be strictly non-overlapping. The end of the range for
one ExposureBins may be equal to the start of the range for another, or
there may be a gap between the end of one or beginning of another,
but there cannot be an overlap. If you need to sum the counts between
multiple ExposureBins with the same bin edges, see the
:meth:`~ExposureBins.sum` method.
Args:
histos (list of :class:`ExposureBins`):
A list containing the ExposureBins to be merged
Returns:
(:class:`ExposureBins`)
"""
num = len(histos)
# sort by start time
tstarts = np.concatenate([[histo.lo_edges[0]] for histo in histos])
idx = np.argsort(tstarts)
# concatenate the histos in order
counts = histos[idx[0]].counts
lo_edges = histos[idx[0]].lo_edges
hi_edges = histos[idx[0]].hi_edges
exposure = histos[idx[0]].exposure
count_uncerts = histos[idx[0]].count_uncertainty
for i in range(1, num):
bin_starts = histos[idx[i]].lo_edges
# make sure there is no overlap
mask = (bin_starts >= hi_edges[-1])
if (~mask).sum() > 0:
raise ValueError('Overlapping bins cannot be merged. Only' \
'non-overlapping bins can be merged.')
counts = np.concatenate((counts, histos[idx[i]].counts[mask]))
lo_edges = np.concatenate(
(lo_edges, histos[idx[i]].lo_edges[mask]))
hi_edges = np.concatenate(
(hi_edges, histos[idx[i]].hi_edges[mask]))
exposure = np.concatenate(
(exposure, histos[idx[i]].exposure[mask]))
count_uncerts = np.concatenate(
(count_uncerts, histos[idx[i]].count_uncertainty[mask]))
# new ExposureBins object
merged_bins = cls(counts, lo_edges, hi_edges, exposure,
count_uncerts=count_uncerts, **kwargs)
return merged_bins
[docs] @classmethod
def sum(cls, histos):
"""Sum multiple ExposureBins together if they have the same bin edges.
If the exposures are different between the histograms, they will be
averaged.
Note:
Count uncertainties are summed in quadrature.
Args:
histos (list of :class:`ExposureBins`):
A list containing the ExposureBins to be summed
Returns:
(:class:`ExposureBins`)
"""
counts = np.zeros(histos[0].size)
count_var = np.zeros(histos[0].size)
for histo in histos:
assert histo.size == histos[0].size, \
"The histograms must all have the same size"
assert np.all(histo.lo_edges == histos[0].lo_edges), \
"The histograms must all have the same support"
counts += histo.counts
count_var += histo.count_uncertainty ** 2
count_uncerts = np.sqrt(count_var)
# averaged exposure
exposure = np.mean([histo.exposure for histo in histos], axis=0)
sum_bins = cls(counts, histos[0].lo_edges, histos[0].hi_edges,
exposure, count_uncerts=count_uncerts)
return sum_bins
def _calculate_good_segments(self):
"""Calculates the ranges of data that are contiguous segments
Returns:
([(float, float), ...])
"""
mask = (self.lo_edges[1:] != self.hi_edges[:-1])
if mask.sum() == 0:
return [self.range]
times = np.concatenate(([self.lo_edges[0]], self.hi_edges[:-1][mask],
self.lo_edges[1:][mask], [self.hi_edges[-1]]))
times.sort()
return times.reshape(-1, 2).tolist()
def __repr__(self):
s = '<{0}: {1} bins;\n'.format(self.__class__.__name__, self.size)
s += ' range {0};\n'.format(self.range)
if self._good_segments is not None:
s += ' {0} contiguous segments>'.format(len(self._good_segments))
return s
[docs]class ChannelBins(ExposureBins):
"""A class defining a set of Energy Channel bins.
"""
@property
def chan_nums(self):
"""(np.array): The channel numbers"""
if self.size > 0:
return self.lo_edges
[docs] @classmethod
def create(cls, counts, chan_nums, exposure, continuous=True, **kwargs):
"""Create a :class:`ChannelBins` object from a list of channel numbers.
Args:
counts (np.array): The array of counts in each bin
chan_nums (np.array): The energy channel numbers
exposure (np.array): The exposure of each bin
count_uncerts (np.array, optional): An array the same length as
`counts` if the uncertainty is
not Poisson.
continuous (bool, optional): [Experimental] Whether the bins are continuous (meaning no gaps between channels).
precalc_good_segments (bool, optional): If True, calculates contiguous
bin segments on initialization.
Default is True.
Returns:
(:class:`ChannelBins`)
"""
try:
iter(exposure)
except:
counts = np.asarray(counts)
exposure = np.full(counts.shape, exposure)
chan_nums = np.asarray(chan_nums)
if continuous and len(chan_nums) > 1:
hi_edges = np.concatenate((chan_nums[1:], [chan_nums[-1] + (chan_nums[1] - chan_nums[0])]))
else:
hi_edges = chan_nums + 1
return cls(counts, chan_nums, hi_edges, exposure, **kwargs)
@property
def range(self):
"""(int, int): The channel range"""
if self.size > 0:
return (self.chan_nums[0], self.chan_nums[-1])
[docs] @classmethod
def from_rates(cls, rates, rate_uncerts, chan_nums, exposure, **kwargs):
"""Create an ChannelBins object from count rates and uncertainties.
Args:
rates (np.array): The count rates
rate_uncerts (np.array): The count rate uncertainties
chan_nums (np.array): The energy channel numbers
exposure (np.array): The exposure of each bin
precalc_good_segments (bool, optional): If True, calculates contiguous
bin segments on initialization.
Default is True.
Returns:
(:class:`ChannelBins`)
"""
counts = rates * exposure
count_uncerts = rate_uncerts * exposure
return cls.create(counts, chan_nums, exposure,
count_uncerts=count_uncerts, **kwargs)
[docs] def rebin(self, method, *args, chan_min=None, chan_max=None):
"""Rebin the ChannelBins object given a binning function and return
a new ChannelBins object
The binning function should take as input an array of counts,
array of exposures, and an array of bin edges. Additional arguments
specific to the function are allowed. The function should return an
array of the new counts, new exposure, and new edges.
Note::
Edges for energy channels are treated as [chan_num, chan_num+1]
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
chan_min (float, optional):
If set, defines the minimum channel number of the ChannelBins to
be binned, otherwise binning will begin at the first channel
number.
chan_max (float, optional):
If set, defines the maximum channel of the ChannelBins to be
binned, otherwise binning will end at the last channel number.
Returns:
(:class:`ChannelBins`)
"""
# empty bins
empty = self.__class__([], [], [], [])
# set the start and stop of the rebinning segment
crange = self.range
if chan_min is None:
chan_min = crange[0]
if chan_max is None:
chan_max = crange[1]
if chan_min < crange[0]:
chan_min = crange[0]
if chan_max > crange[1]:
chan_max = crange[1]
bins = self.contiguous_bins()
new_histos = []
for bin in bins:
crange = bin.range
# split the histogram into pieces so that we only rebin the piece
# that needs to be rebinned
pre = empty
post = empty
if (chan_max < crange[0]) or (chan_min > crange[1]):
histo = empty
elif chan_max == crange[1]:
if chan_min > crange[0]:
pre = bin.slice(crange[0], chan_min-1)
histo = bin.slice(chan_min, crange[1])
elif chan_min == crange[0]:
histo = bin.slice(crange[0], chan_max)
if chan_max < crange[1]:
post = bin.slice(chan_max+1, crange[1])
elif (chan_min > crange[0]) or (chan_max < crange[1]):
pre = bin.slice(crange[0], chan_min-1)
histo = bin.slice(chan_min, chan_max)
post = bin.slice(chan_max+1, crange[1])
else:
histo = bin
# perform the rebinning and create a new histo with the
# rebinned rates
if histo.size > 0:
edges = np.append(histo.lo_edges, histo.hi_edges[-1])
new_counts, new_uncert, new_exposure, new_edges = method(histo.counts,
histo.count_uncertainty,
histo.exposure,
edges, *args)
new_histo = self.__class__(new_counts, new_edges[:-1],
new_edges[1:], new_exposure,
count_uncerts=new_uncert)
else:
new_histo = bin
# now merge the split histo back together again
histos_to_merge = [i for i in (pre, new_histo, post) if
i.size > 0]
if len(histos_to_merge) > 0:
new_histos.append(self.__class__.merge(histos_to_merge))
new_histo = self.__class__.merge(new_histos)
return new_histo
[docs] def slice(self, chan_min, chan_max):
"""Perform a slice over the range of the bins and return a new
ChannelBins object.
Args:
lo_edge (float): The start of the slice
hi_edge (float): The end of the slice
Returns:
(:class:`ChannelBins`)
"""
mask = (self.chan_nums <= chan_max) & (self.chan_nums >= chan_min)
obj = self.__class__.create(self.counts[mask], self.chan_nums[mask],
self.exposure[mask],
count_uncerts=self.count_uncertainty[mask])
return obj
[docs]class TimeBins(ExposureBins):
"""A class defining a set of Time History (lightcurve) bins.
Parameters:
counts (np.array): The array of counts in each bin
lo_edges (np.array): The low-value edges of the bins
hi_edges (np.array): The high-value edges of the bins
exposure (np.array): The exposure of each bin
count_uncerts (np.array, optional): An array the same length as `counts`
if the uncertainty is not Poisson.
precalc_good_segments (bool, optional): If True, calculates contiguous
bin segments on initialization.
Default is True.
"""
def __init__(self, counts, lo_edges, hi_edges, exposure, **kwargs):
super().__init__(counts, lo_edges, hi_edges, exposure, **kwargs)
[docs]class EnergyBins(ExposureBins):
"""A class defining a set of Energy (count spectra) bins.
Parameters:
counts (np.array): The array of counts in each bin
lo_edges (np.array): The low-value edges of the bins
hi_edges (np.array): The high-value edges of the bins
exposure (np.array): The exposure of each bin
count_uncerts (np.array, optional): An array the same length as `counts`
if the uncertainty is not Poisson.
precalc_good_segments (bool, optional): If True, calculates contiguous
bin segments on initialization.
Default is True.
"""
def __init__(self, counts, lo_edges, hi_edges, exposure, **kwargs):
try:
iter(exposure)
except:
counts = np.asarray(counts)
exposure = np.full(counts.shape, exposure)
super().__init__(counts, lo_edges, hi_edges, exposure, **kwargs)
@property
def centroids(self):
"""(np.array): The centroids of the bins"""
return np.sqrt(self.hi_edges * self.lo_edges)
@property
def rates_per_kev(self):
"""(np.array): Differential count rate"""
return self.rates / self.widths
@property
def rate_uncertainty_per_kev(self):
"""(np.array): Differential rate uncertainty"""
return self.rate_uncertainty / self.widths
[docs] def rebin(self, method, *args, emin=None, emax=None):
"""Rebin the EnergyBins object in given a binning function and return a
a new EnergyBins object
The binning function should take as input an array of counts,
array of exposures, and an array of bin edges. Additional arguments
specific to the function are allowed. The function should return an
array of the new counts, new exposure, and new edges.
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
emin (float, optional):
If set, defines the starting energy of the EnergyBins to be
binned, otherwise binning will begin at the first bin edge.
emax (float, optional):
If set, defines the ending energy of the EnergyBins to be binned,
otherwise binning will end at the last bin edge.
Returns:
(:class:`EnergyBins`)
"""
histo = super().rebin(method, *args, tstart=emin, tstop=emax)
histo._exposure = self.exposure[:histo.size]
return histo
[docs] @classmethod
def sum(cls, histos):
"""Sum multiple EnergyBins together if they have the same energy range
(support). Example use would be summing two count spectra.
Note:
Count uncertainties are summed in quadrature.
Args:
histos (list of :class:`EnergyBins`):
A list containing the EnergyBins to be summed
Returns:
(:class:`EnergyBins`)
"""
counts = np.zeros(histos[0].size)
uncerts = np.zeros(histos[0].size)
exposure = 0.0
for histo in histos:
assert histo.size == histos[0].size, \
"The histograms must all have the same size"
assert np.all(histo.lo_edges == histos[0].lo_edges), \
"The histograms must all have the same support"
counts += histo.counts
uncerts += histo.count_uncertainty ** 2
exposure += histo.exposure
uncerts = np.sqrt(uncerts)
sum_bins = cls(counts, histos[0].lo_edges, histos[0].hi_edges,
exposure, count_uncerts=uncerts)
return sum_bins
[docs]class TimeChannelBins():
"""A class defining a set of 2D Time/Energy-Channel bins.
Parameters:
counts (np.array): The array of counts in each bin
tstart (np.array): The low-value edges of the time bins
tstop (np.array): The high-value edges of the time bins
exposure (np.array): The exposure of each bin
chan_nums (np.array): The channel numbers in ascending order
count_uncerts (np.array, optional): An array the same length as `counts`
if the uncertainty is not Poisson.
quality (np.array, optional): The spectrum quality flag
precalc_good_segments (bool, optional): If True, calculates the
good time and channel segments
on initialization.
"""
def __init__(self, counts, tstart, tstop, exposure, chan_nums,
quality=None, count_uncerts=None, precalc_good_segments=True):
try:
iter(counts)
self._counts = np.asarray(counts)
except:
raise TypeError('counts must be an iterable')
if self._counts.ndim == 1:
self._counts = self._counts.reshape(-1, 1)
elif self._counts.ndim > 2:
raise TypeError('counts must be a 2-dimensional array')
try:
iter(tstart)
self._tstart = np.asarray(tstart).flatten()
except:
raise TypeError('tstart must be an iterable')
try:
iter(tstop)
self._tstop = np.asarray(tstop).flatten()
except:
raise TypeError('tstop must be an iterable')
try:
iter(exposure)
self._exposure = np.asarray(exposure).flatten()
except:
raise TypeError('exposure must be an iterable')
try:
iter(chan_nums)
self._chan_nums = np.asarray(chan_nums, dtype=int).flatten()
except:
raise TypeError('chan_nums must be an iterable')
if (self._tstart.size != self._tstop.size) or \
(self._exposure.size != self._tstart.size):
raise ValueError('tstart, tstop, and exposure must all be ' \
'the same length')
if (self._counts.shape[0] != self._tstart.size) or \
(self._counts.shape[1] != self._chan_nums.size):
raise ValueError('counts axis 0 must have same length as tstart, '\
'tstop, and exposure. counts axis 1 must have '\
'same length as chan_nums.')
if quality is not None:
try:
iter(quality)
self._quality = np.asarray(quality).flatten()
except:
raise TypeError('quality must be an iterable')
if self._quality.size != self._tstart.size:
raise ValueError('quality must be same length as tstart')
else:
self._quality = np.zeros_like(self._tstart)
if count_uncerts is None:
count_uncerts = np.sqrt(self._counts)
try:
iter(count_uncerts)
self._count_uncerts = np.asarray(count_uncerts)
except:
raise TypeError('count_uncerts must be an iterable')
if self._count_uncerts.shape != self._counts.shape:
raise ValueError('count_uncerts must be same shape as counts')
self._good_time_segments = None
self._good_channel_segments = None
if (self.num_times > 0) and precalc_good_segments:
self._good_time_segments = self._calculate_good_segments(
self.tstart,
self.tstop)
if (self.num_chans > 0) and precalc_good_segments:
chans0 = self.chan_nums
chans1 = self.chan_nums + 1
self._good_channel_segments = self._calculate_good_segments(chans0,
chans1)
@property
def chan_nums(self):
"""(np.array): The channel numbers"""
return self._chan_nums
@property
def channel_range(self):
"""(int, int): The channel number range"""
if self.num_chans > 0:
return (self.chan_nums[0], self.chan_nums[-1])
@property
def counts(self):
"""(np.array): The array of counts in each bin"""
return self._counts
@property
def count_uncertainty(self):
""" (np.array): The counts uncertainty in each bin"""
return self._count_uncerts
@property
def exposure(self):
"""(np.array): The exposure of each bin"""
return self._exposure
@property
def num_chans(self):
"""(int): The number of energy channels along the energy axis"""
return self._chan_nums.size
@property
def num_times(self):
"""(int): The number of bins along the time axis"""
return self._exposure.size
@property
def quality(self):
"""(np.array): The spectrum quality flag"""
return self._quality
@property
def rates(self):
"""(np.array): The rates in each Time-Channel Bin"""
return self.counts / (self.exposure[:, np.newaxis])
@property
def rate_uncertainty(self):
"""(np.array): The rate uncertainty in each bin"""
return self.count_uncertainty / (self.exposure[:, np.newaxis])
@property
def size(self):
"""(int, int): The number of bins along both axes (num_times, num_chans)"""
return self.counts.shape
@property
def time_centroids(self):
"""(np.array): The bin centroids along the time axis"""
return (self.tstop + self.tstart) / 2.0
@property
def time_range(self):
"""(float, float): The range of the data along the time axis"""
if self.num_times > 0:
return (self.tstart[0], self.tstop[-1])
@property
def time_widths(self):
"""(np.array): The bin widths along the time axis"""
return (self.tstop - self.tstart)
@property
def tstart(self):
"""(np.array): The low-value edges of the time bins"""
return self._tstart
@property
def tstop(self):
"""(np.array): The high-value edges of the time bins"""
return self._tstop
[docs] def apply_ebounds(self, ebounds):
"""Apply an energy bounds calibration and return a TimeEnergyBins
object.
Args:
ebounds (:class:`Ebounds`): The energy bounds. Must match the number
of channels as this object.
Returns:
(:class:`TimeEnergyBins`)
"""
if not isinstance(ebounds, Ebounds):
raise TypeError('ebounds must be an Ebounds object')
if ebounds.num_intervals != self.num_chans:
raise ValueError('ebounds must have the same number of channels ' \
'as this object.')
return TimeEnergyBins(self.counts, self.tstart, self.tstop,
self.exposure, ebounds.low_edges(),
ebounds.high_edges(),
count_uncerts=self.count_uncertainty,
quality=self._quality)
[docs] def closest_time_edge(self, val, which='either'):
"""Return the closest time bin edge
Args:
val (float): Input value
which (str, optional): Options are:
* 'either' - closest edge to val;
* 'low' - closest edge lower than val;
* 'high' - closest edge higher than val. Default is 'either'
Returns:
(float)
"""
edges = np.concatenate((self.tstart, [self.tstop[-1]]))
return self._closest_edge(edges, val, which=which)
[docs] def contiguous_channel_bins(self):
"""Return a list of TimeChannelBins, each one containing a contiguous
channel segment of data. This is done by comparing adjacent channel
numbers, and if there is a gap between channels, the data is split into
separate TimeChannelBins objects, each containing a
channel-contiguous set of data.
Returns:
(list of :class:`TimeChannelBins`)
"""
if self._good_channel_segments is None:
chans0 = self.chan_nums
chans1 = self.chan_nums + 1
good_segments = self._calculate_good_segments(chans0, chans1)
else:
good_segments = self._good_channel_segments
bins = [self.slice_channels(seg[0], seg[1]) for seg in good_segments]
return bins
[docs] def contiguous_time_bins(self):
"""Return a list of TimeChannelBins, each one containing a contiguous
time segment of data. This is done by comparing the edges of each time
bin, and if thereis a gap between edges, the data is split into
separate TimeChannelBins objects, each containing a time-contiguous set
of data.
Returns:
(list of :class:`TimeChannelBins`)
"""
if self._good_time_segments is None:
good_segments = self._calculate_good_segments(self.tstart,
self.tstop)
else:
good_segments = self._good_time_segments
bins = [self.slice_time(seg[0], seg[1]) for seg in good_segments]
return bins
[docs] def get_exposure(self, time_ranges=None, scale=False):
"""Calculate the total exposure of a time range or time ranges of data
Args:
time_ranges ([(float, float), ...], optional):
The time range or time ranges over which to calculate the
exposure. If omitted, calculates the total exposure of the data
scale (bool, optional):
If True and the time ranges don't match up with the data binning,
will scale the exposure based on the requested time range.
Default is False.
Returns:
(float)
"""
if time_ranges is None:
time_ranges = [self.time_range]
try:
iter(time_ranges[0])
except:
time_ranges = [time_ranges]
exposure = 0.0
for i in range(len(time_ranges)):
mask = self._slice_time_mask(*self._assert_range(time_ranges[i]))
dt = (time_ranges[i][1] - time_ranges[i][0])
data_exp = np.sum(self.exposure[mask])
dts = np.sum(self.tstop[mask] - self.tstart[mask])
if dts > 0:
if scale:
exposure += data_exp * (dt / dts)
else:
exposure += data_exp
return exposure
[docs] def integrate_channels(self, chan_min=None, chan_max=None):
"""Integrate the histogram over the channel axis (producing a
lightcurve). Limits on the integration smaller than the full range can
be set.
Args:
chan_min (int, optional):
The low end of the integration range. If not set, uses the
lowest energy channel of the histogram
chan_max (int, optional):
The high end of the integration range. If not set, uses the
highest energy channel of the histogram
Returns:
(:class:`TimeBins`)
"""
if chan_min is None:
chan_min = self.channel_range[0]
if chan_max is None:
chan_max = self.channel_range[1]
mask = (self.chan_nums <= chan_max) & (self.chan_nums >= chan_min)
counts = self.counts[:, mask].sum(axis=1)
uncert = np.sqrt( (self.count_uncertainty[:, mask] ** 2).sum(axis=1) )
obj = TimeBins(counts, self.tstart, self.tstop, self.exposure,
count_uncerts=uncert)
return obj
[docs] def integrate_time(self, tstart=None, tstop=None):
"""Integrate the histogram over the time axis (producing an energy
channel spectrum). Limits on the integration smaller than the full range
can be set.
Args:
tstart (float, optional):
The low end of the integration range. If not set, uses the
lowest time edge of the histogram
tstop (float, optional):
The high end of the integration range. If not set, uses the
highest time edge of the histogram
Returns:
(:class:`ChannelBins`)
"""
if tstart is None:
tstart = self.time_range[0]
if tstop is None:
tstop = self.time_range[1]
mask = self._slice_time_mask(tstart, tstop)
counts = self.counts[mask, :].sum(axis=0)
uncert = np.sqrt( (self.count_uncertainty[mask, :] ** 2).sum(axis=0) )
exposure = np.full(counts.size, self.exposure[mask].sum())
obj = ChannelBins.create(counts, self.chan_nums, exposure,
count_uncerts=uncert)
return obj
[docs] def rebin_channels(self, method, *args, chan_min=None, chan_max=None):
"""Rebin the TimeChannelBins object along the energy axis given a
binning function and return a new TimeChannelBins object.
The binning function should take as input an array of counts,
array of exposures, and an array of bin edges. Additional arguments
specific to the function are allowed. The function should return an
array of the new counts, new exposure, and new edges.
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
chan_min (int, optional):
If set, defines the starting channel of the TimeChannelBins
to be binned, otherwise binning will begin at the the first
channel.
chan_max (int, optional):
If set, defines the ending channel of the TimeChannelBins to
be binned, otherwise binning will end at the last channel.
Returns:
(:class:`TimeChannelBins`)
"""
# empty bins
empty = self.__class__(np.array([[]]).reshape(0,0), [], [], [], [])
# set the start and stop of the rebinning segment
chan_range = self.channel_range
if chan_min is None:
chan_min = chan_range[0]
if chan_max is None:
chan_max = chan_range[1]
if chan_min < chan_range[0]:
chan_min = chan_range[0]
if chan_max > chan_range[1]:
chan_max = chan_range[1]
bins = self.contiguous_channel_bins()
new_histos = []
for bin in bins:
# split the histogram into pieces so that we only rebin the piece
# that needs to be rebinned
pre = empty
post = empty
crange = bin.channel_range
if (chan_max < crange[0]) or (chan_min > crange[1]):
histo = empty
elif chan_max == crange[1]:
if chan_min > crange[0]:
pre = bin.slice_channels(crange[0], chan_min-1)
histo = bin.slice_channels(chan_min, crange[1])
elif chan_min == crange[0]:
histo = bin.slice_channels(crange[0], chan_max)
if chan_max < crange[1]:
post = bin.slice_channels(chan_max+1, crange[1])
elif (chan_min > crange[0]) and (chan_max < crange[1]):
pre = bin.slice_channels(crange[0], chan_min-1)
histo = bin.slice_channels(chan_min, chan_max)
post = bin.slice_channels(chan_max+1, crange[1])
else:
histo = bin
# perform the rebinning and create a new histo with the
# rebinned rates
if histo.num_chans > 0:
edges = np.append(histo.chan_nums, histo.chan_nums[-1]+1)
num_times, num_chans = histo.size
new_counts = []
new_uncert = []
for i in range(num_times):
exposure = np.full(num_chans, histo.exposure[i])
new_cts, uncert, _, new_edges = method(histo.counts[i, :],
histo.count_uncertainty[i,:],
exposure, edges, *args)
new_counts.append(new_cts)
new_uncert.append(uncert)
new_histo = TimeChannelBins(new_counts, bin.tstart, bin.tstop,
bin.exposure, new_edges[:-1],
count_uncerts=new_uncert)
else:
new_histo = bin
# now merge the split histo back together again
histos_to_merge = [i for i in (pre, new_histo, post) if
i.num_chans > 0]
if len(histos_to_merge) > 0:
new_histos.append(TimeChannelBins.merge_channels(histos_to_merge))
new_histo = TimeChannelBins.merge_channels(new_histos)
return new_histo
[docs] def rebin_time(self, method, *args, tstart=None, tstop=None):
"""Rebin the TimeChannelBins object along the time axis given a binning
function and return a new TimeChannelBins object.
The binning function should take as input an array of counts,
array of exposures, and an array of bin edges. Additional arguments
specific to the function are allowed. The function should return an
array of the new counts, new exposure, and new edges.
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
tstart (float, optional):
If set, defines the start time of the TimeChannelBins to be
binned, otherwise binning will begin at the time of the first
bin edge.
tstop (float, optional):
If set, defines the end time of the TimeChannelBins to be
binned, otherwise binning will end at the time of the last
bin edge.
Returns:
(:class:`TimeChannelBins`)
"""
# empty bins
empty = self.__class__(np.array([[]]).reshape(0,0), [], [], [], [], [])
# set the start and stop of the rebinning segment
trange = self.time_range
if tstart is None:
tstart = trange[0]
if tstop is None:
tstop = trange[1]
if tstart < trange[0]:
tstart = trange[0]
if tstop > trange[1]:
tstop = trange[1]
bins = self.contiguous_time_bins()
new_histos = []
for bin in bins:
trange = bin.time_range
# split the histogram into pieces so that we only rebin the piece
# that needs to be rebinned
pre = empty
post = empty
if (tstop < trange[0]) or (tstart > trange[1]):
histo = empty
elif tstop == trange[1]:
if tstart > trange[0]:
pre = bin.slice_time(trange[0],
self.closest_time_edge(tstart,
which='low'))
histo = bin.slice_time(self.closest_time_edge(tstart,
which='low'),
trange[1])
elif tstart == trange[0]:
histo = bin.slice_time(trange[0],
self.closest_time_edge(tstop,
which='high'))
if tstop < trange[1]:
post = bin.slice_time(self.closest_time_edge(tstop,
which='high'),
trange[1])
elif (tstart > trange[0]) and (tstop < trange[1]):
pre = bin.slice_time(trange[0],
self.closest_time_edge(tstart, which='low'))
histo = bin.slice_time(
self.closest_time_edge(tstart, which='low'),
self.closest_time_edge(tstop, which='high'))
post = bin.slice_time(self.closest_time_edge(tstop, which='high'),
trange[1])
else:
histo = bin
# perform the rebinning and create a new histo with the
# rebinned rates
if histo.num_times > 0:
edges = np.append(histo.tstart, histo.tstop[-1])
new_counts = []
new_uncert = []
for i in range(bin.num_chans):
new_cts, uncert, new_exposure, new_edges = method(
histo.counts[:, i],
histo.count_uncertainty[:, i],
histo.exposure,
edges, *args)
new_counts.append(new_cts)
new_uncert.append(uncert)
new_counts = np.array(new_counts).T
new_uncert = np.array(new_uncert).T
new_histo = TimeChannelBins(new_counts, new_edges[:-1],
new_edges[1:], new_exposure,
bin.chan_nums,
count_uncerts=new_uncert)
else:
new_histo = bin
# now merge the split histo back together again
histos_to_merge = [i for i in (pre, new_histo, post) if
i.num_times > 0]
if len(histos_to_merge) > 0:
new_histos.append(TimeChannelBins.merge_time(histos_to_merge))
new_histo = TimeChannelBins.merge_time(new_histos)
return new_histo
[docs] def slice_channels(self, chan_min, chan_max):
"""Perform a slice over an energy range and return a new TimeChannelBins
object. Note that chan_min and chan_max values that fall inside a bin will
result in that bin being included.
Args:
emin (float): The start of the slice
emax (float): The end of the slice
Returns:
(:class:`TimeChannelBins`)
"""
mask = (self.chan_nums <= chan_max) & (self.chan_nums >= chan_min)
obj = TimeChannelBins(self.counts[:, mask], self.tstart, self.tstop,
self.exposure, self.chan_nums[mask],
count_uncerts=self.count_uncertainty[:, mask],
quality=self.quality)
return obj
[docs] def slice_time(self, tstart, tstop):
"""Perform a slice over a time range and return a new TimeChannelBins
object. Note that tstart and tstop values that fall inside a bin will
result in that bin being included.
Args:
tstart (float): The start of the slice
tstop (float): The end of the slice
Returns:
(:class:`TimeChannelBins`)
"""
mask = self._slice_time_mask(tstart, tstop)
cls = type(self)
obj = cls(self.counts[mask, :], self.tstart[mask], self.tstop[mask],
self.exposure[mask], self.chan_nums,
count_uncerts=self.count_uncertainty[mask, :],
quality=self.quality[mask])
return obj
[docs] @classmethod
def from_rates(cls, rates, rate_uncerts, tstart, tstop, exposure, chan_nums,
**kwargs):
"""Create an TimeChannelBins object from count rates and uncertainties.
Args:
rates (np.array): The count rates
rate_uncerts (np.array): The count rate uncertainties
tstart (np.array): The low-value edges of the time bins
tstop (np.array): The high-value edges of the time bins
exposure (np.array): The exposure of each bin
chan_nums (np.array): The channel numbers in ascending order
quality (np.array, optional): The spectrum quality flag
precalc_good_segments (bool, optional): If True, calculates the
good time and energy
segments on initialization.
Returns:
(:class:`TimeChannelBins`)
"""
counts = rates * exposure[:, np.newaxis]
count_uncerts = rate_uncerts * exposure[:, np.newaxis]
return cls(counts, tstart, tstop, exposure, chan_nums,
count_uncerts=count_uncerts, **kwargs)
[docs] @classmethod
def merge_channels(cls, histos, **kwargs):
"""Merge multiple TimeChannelBins together along the channel axis.
Args:
histos (list of :class:`TimeChannelBins`):
A list containing the TimeChannelBins to be merged
Returns:
(:class:`TimeChannelBins`)
"""
num = len(histos)
# sort by channel edge
chan_mins = np.concatenate([[histo.chan_nums[0]] for histo in histos])
idx = np.argsort(chan_mins)
# concatenate the histos in order
counts = histos[idx[0]].counts
uncert = histos[idx[0]].count_uncertainty
tstart = histos[idx[0]].tstart
tstop = histos[idx[0]].tstop
exposure = histos[idx[0]].exposure
quality = histos[idx[0]].quality
chan_nums = histos[idx[0]].chan_nums
for i in range(1, num):
chan_starts = histos[idx[i]].chan_nums
# make sure there is no overlap
mask = (chan_starts >= chan_nums[-1])
if (~mask).sum() > 0:
raise ValueError('Overlapping bins cannot be merged. Only' \
'non-overlapping bins can be merged.')
counts = np.hstack((counts, histos[idx[i]].counts[:, mask]))
uncert = np.hstack((uncert, histos[idx[i]].count_uncertainty[:, mask]))
chan_nums = np.concatenate((chan_nums, histos[idx[i]].chan_nums[mask]))
# new TimeChannelBins object
merged_bins = cls(counts, tstart, tstop, exposure, chan_nums,
count_uncerts=uncert, quality=quality, **kwargs)
return merged_bins
[docs] @classmethod
def merge_time(cls, histos, **kwargs):
"""Merge multiple TimeChannelBins together along the time axis.
Args:
histos (list of :class:`TimeChannelBins`):
A list containing the TimeChannelBins to be merged
Returns:
(:class:`TimeChannelBins`)
"""
num = len(histos)
# sort by start time
tstarts = np.concatenate([[histo.tstart[0]] for histo in histos])
idx = np.argsort(tstarts)
# concatenate the histos in order
counts = histos[idx[0]].counts
uncert = histos[idx[0]].count_uncertainty
tstart = histos[idx[0]].tstart
tstop = histos[idx[0]].tstop
exposure = histos[idx[0]].exposure
chan_nums = histos[idx[0]].chan_nums
quality = histos[idx[0]].quality
for i in range(1, num):
bin_starts = histos[idx[i]].tstart
# make sure there is no overlap
mask = (bin_starts >= tstop[-1])
if (~mask).sum() > 0:
raise ValueError('Overlapping bins cannot be merged. Only' \
'non-overlapping bins can be merged.')
counts = np.vstack((counts, histos[idx[i]].counts[mask, :]))
uncert = np.vstack((uncert, histos[idx[i]].count_uncertainty[mask, :]))
tstart = np.concatenate((tstart, histos[idx[i]].tstart[mask]))
tstop = np.concatenate((tstop, histos[idx[i]].tstop[mask]))
exposure = np.concatenate(
(exposure, histos[idx[i]].exposure[mask]))
quality = np.concatenate((quality, histos[idx[i]].quality[mask]))
# new TimeChannelBins object
merged_bins = cls(counts, tstart, tstop, exposure, chan_nums,
count_uncerts=uncert, quality=quality, **kwargs)
return merged_bins
def _assert_range(self, valrange):
assert valrange[0] <= valrange[1], \
'Range must be in increasing order: (lo, hi)'
return valrange
def _calculate_good_segments(self, lo_edges, hi_edges):
"""Calculates the ranges of data that are contiguous segments
Args:
lo_edges (np.array): The lower bin edges
hi_edges (np.array): The upper bin edges
Returns:
([(float, float), ...])
"""
mask = (lo_edges[1:] != hi_edges[:-1])
if mask.sum() == 0:
return [(lo_edges[0], hi_edges[-1])]
edges = np.concatenate(([lo_edges[0]], hi_edges[:-1][mask],
lo_edges[1:][mask], [hi_edges[-1]]))
edges.sort()
return edges.reshape(-1, 2).tolist()
def _closest_edge(self, edges, val, which='either'):
"""Return the closest time bin edge
Args:
val (float): Input value
which (str, optional): Options are:
* 'either' - closest edge to val;
* 'low' - closest edge lower than val;
* 'high' - closest edge higher than val. Default is 'either'
Returns:
(float)
"""
idx = np.argmin(np.abs(val - edges))
if which == 'low':
if (edges[idx] > val) and (idx - 1) >= 0:
idx -= 1
elif (which == 'high') and (idx + 1) < edges.size:
if edges[idx] < val:
idx += 1
else:
pass
return edges[idx]
def _slice_time_mask(self, tstart, tstop):
tstart_snap = self.closest_time_edge(tstart, which='low')
tstop_snap = self.closest_time_edge(tstop, which='high')
mask = (self.tstart < tstop_snap) & (self.tstop > tstart_snap)
return mask
def __repr__(self):
s = '<{0}: {1} time bins;\n'.format(self.__class__.__name__,
self.num_times)
s += ' time range {0};\n'.format(self.time_range)
if self._good_time_segments is not None:
s += ' {0} time segments;\n'.format(len(self._good_time_segments))
s += ' {0} channels;\n'.format(self.num_chans)
s += ' channel range {0}'.format(self.channel_range)
if self._good_channel_segments is not None:
s += ';\n {0} channel segments'.format(len(self._good_channel_segments))
return s+'>'
[docs]class TimeEnergyBins():
"""A class defining a set of 2D Time-Energy bins.
Parameters:
counts (np.array): The array of counts in each bin
tstart (np.array): The low-value edges of the time bins
tstop (np.array): The high-value edges of the time bins
exposure (np.array): The exposure of each bin
emin (np.array): The low-value edges of the energy bins
emax (np.array): The high-value edges of the energy bins
count_uncerts (np.array, optional): An array the same length as `counts`
if the uncertainty is not Poisson.
quality (np.array, optional): The spectrum quality flag
precalc_good_segments (bool, optional): If True, calculates the
good time and energy segments on
initialization.
"""
def __init__(self, counts, tstart, tstop, exposure, emin, emax,
count_uncerts=None, quality=None, precalc_good_segments=True):
try:
iter(counts)
self._counts = np.asarray(counts)
except:
raise TypeError('counts must be an iterable')
if self._counts.ndim == 1:
self._counts = self._counts.reshape(-1, 1)
elif self._counts.ndim > 2:
raise TypeError('counts must be a 2-dimensional array')
try:
iter(tstart)
self._tstart = np.asarray(tstart).flatten()
except:
raise TypeError('tstart must be an iterable')
try:
iter(tstop)
self._tstop = np.asarray(tstop).flatten()
except:
raise TypeError('tstop must be an iterable')
try:
iter(exposure)
self._exposure = np.asarray(exposure).flatten()
except:
raise TypeError('exposure must be an iterable')
try:
iter(emin)
self._emin = np.asarray(emin).flatten()
except:
raise TypeError('emin must be an iterable')
try:
iter(emax)
self._emax = np.asarray(emax).flatten()
except:
raise TypeError('emax must be an iterable')
if (self._tstart.size != self._tstop.size) or \
(self._exposure.size != self._tstart.size):
raise ValueError('tstart, tstop, and exposure must all be ' \
'the same length')
if (self._emin.size != self._emax.size):
raise ValueError('emin and emax must be the same length')
if (self._counts.shape[0] != self._tstart.size) or \
(self._counts.shape[1] != self._emin.size):
raise ValueError('counts axis 0 must have same length as tstart, '\
'tstop, and exposure. counts axis 1 must have '\
'same length as emin and emax.')
if quality is not None:
try:
iter(quality)
self._quality = np.asarray(quality).flatten()
except:
raise TypeError('quality must be an iterable')
if self._quality.size != self._tstart.size:
raise ValueError('quality must be same length as tstart')
else:
self._quality = np.zeros_like(self._tstart)
if count_uncerts is None:
count_uncerts = np.sqrt(self._counts)
try:
iter(count_uncerts)
self._count_uncerts = np.asarray(count_uncerts)
except:
raise TypeError('count_uncerts must be an iterable')
if self._count_uncerts.shape != self._counts.shape:
raise ValueError('count_uncerts must be same shape as counts')
self._good_time_segments = None
self._good_energy_segments = None
if (self.num_times > 0) and precalc_good_segments:
self._good_time_segments = self._calculate_good_segments(
self.tstart,
self.tstop)
if (self.num_chans > 0) and precalc_good_segments:
self._good_energy_segments = self._calculate_good_segments(
self.emin,
self.emax)
@property
def chan_widths(self):
"""(np.array): The bin widths along the energy axis"""
return (self.emax - self.emin)
@property
def counts(self):
"""(np.array): The array of counts in each bin"""
return self._counts
@property
def count_uncertainty(self):
""" (np.array): The counts uncertainty in each bin"""
return self._count_uncerts
@property
def emax(self):
"""(np.array): The high-value edges of the energy bins"""
return self._emax
@property
def emin(self):
"""(np.array): The low-value edges of the energy bins"""
return self._emin
@property
def energy_centroids(self):
"""(np.array): The bin centroids along the energy axis"""
return np.sqrt(self.emin * self.emax)
@property
def energy_range(self):
"""(float, float): The range of the data along the energy axis"""
if self.num_chans > 0:
return (self.emin[0], self.emax[-1])
@property
def exposure(self):
"""(np.array): The exposure of each bin"""
return self._exposure
@property
def num_chans(self):
"""(int): The number of energy channels along the energy axis"""
return self._emin.size
@property
def num_times(self):
"""(int): The number of bins along the time axis"""
return self._exposure.size
@property
def quality(self):
"""(np.array): The spectrum quality flag"""
return self._quality
@property
def rates(self):
"""(np.array): The rates in each Time-Energy Bin"""
return self.counts / (self.exposure[:, np.newaxis])
@property
def rates_per_kev(self):
"""(np.array): The differential rates in units of counts/s/keV"""
return self.rates / self.chan_widths[np.newaxis,:]
@property
def rate_uncertainty(self):
"""(np.array): The rate uncertainty in each bin"""
return self.count_uncertainty / (self.exposure[:, np.newaxis])
@property
def rate_uncertainty_per_kev(self):
"""The differential rate uncertainty in units of counts/s/keV"""
return self.rate_uncertainty / self.chan_widths[np.newaxis, :]
@property
def size(self):
"""(int, int): The number of bins along both axes (numtimes, num_chans)"""
return self.counts.shape
@property
def time_centroids(self):
"""(np.array): The bin centroids along the time axis"""
return (self.tstop + self.tstart) / 2.0
@property
def time_range(self):
"""(float, float): The range of the data along the time axis"""
if self.num_times > 0:
return (self.tstart[0], self.tstop[-1])
@property
def time_widths(self):
"""(np.array): The bin widths along the time axis"""
return (self.tstop - self.tstart)
@property
def tstart(self):
"""(np.array): The low-value edges of the time bins"""
return self._tstart
@property
def tstop(self):
"""(np.array): The high-value edges of the time bins"""
return self._tstop
[docs] def closest_energy_edge(self, val, which='either'):
"""Return the closest energy bin edge
Args:
val (float): Input value
which (str, optional): Options are:
* 'either' - closest edge to val;
* 'low' - closest edge lower than val;
* 'high' - closest edge higher than val. Default is 'either'
Returns:
(float)
"""
edges = np.concatenate((self.emin, [self.emax[-1]]))
return self._closest_edge(edges, val, which=which)
[docs] def closest_time_edge(self, val, which='either'):
"""Return the closest time bin edge
Args:
val (float): Input value
which (str, optional): Options are:
* 'either' - closest edge to val;
* 'low' - closest edge lower than val;
* 'high' - closest edge higher than val. Default is 'either'
Returns:
(float)
"""
edges = np.concatenate((self.tstart, [self.tstop[-1]]))
return self._closest_edge(edges, val, which=which)
[docs] def contiguous_energy_bins(self):
"""Return a list of TimeEnergyBins, each one containing a contiguous
energy segment of data. This is done by comparing the edges of each
energy bin, and if thereis a gap between edges, the data is split into
separate TimeEnergyBin objects, each containing an energy-contiguous set
of data.
Returns:
(list of :class:`TimeEnergyBins`)
"""
if self._good_energy_segments is None:
good_segments = self._calculate_good_segments(self.emin, self.emax)
else:
good_segments = self._good_energy_segments
bins = [self.slice_energy(seg[0], seg[1]) for seg in
self._good_energy_segments]
return bins
[docs] def contiguous_time_bins(self):
"""Return a list of TimeEnergyBins, each one containing a contiguous
time segment of data. This is done by comparing the edges of each time
bin, and if thereis a gap between edges, the data is split into
separate TimeEnergyBin objects, each containing a time-contiguous set
of data.
Returns:
(list of :class:`TimeEnergyBins`)
"""
if self._good_time_segments is None:
good_segments = self._calculate_good_segments(self.tstart,
self.tstop)
else:
good_segments = self._good_time_segments
bins = [self.slice_time(seg[0], seg[1]) for seg in good_segments]
return bins
[docs] def get_exposure(self, time_ranges=None, scale=False):
"""Calculate the total exposure of a time range or time ranges of data
Args:
time_ranges ([(float, float), ...], optional):
The time range or time ranges over which to calculate the
exposure. If omitted, calculates the total exposure of the data
scale (bool, optional):
If True and the time ranges don't match up with the data binning,
will scale the exposure based on the requested time range.
Default is False.
Returns:
(float)
"""
if time_ranges is None:
time_ranges = [self.time_range]
try:
iter(time_ranges[0])
except:
time_ranges = [time_ranges]
exposure = 0.0
for i in range(len(time_ranges)):
mask = self._slice_time_mask(*self._assert_range(time_ranges[i]))
dt = (time_ranges[i][1] - time_ranges[i][0])
data_exp = np.sum(self.exposure[mask])
dts = np.sum(self.tstop[mask] - self.tstart[mask])
if dts > 0:
if scale:
exposure += data_exp * (dt / dts)
else:
exposure += data_exp
return exposure
[docs] def integrate_energy(self, emin=None, emax=None):
"""Integrate the histogram over the energy axis (producing a lightcurve).
Limits on the integration smaller than the full range can be set.
Args:
emin (float, optional):
The low end of the integration range. If not set, uses the
lowest energy edge of the histogram
emax (float, optional):
The high end of the integration range. If not set, uses the
highest energy edge of the histogram
Returns:
(:class:`TimeBins`)
"""
if emin is None:
emin = self.energy_range[0]
if emax is None:
emax = self.energy_range[1]
mask = self._slice_energy_mask(emin, emax)
counts = self.counts[:, mask].sum(axis=1)
uncert = np.sqrt( (self.count_uncertainty[:, mask] ** 2).sum(axis=1) )
obj = TimeBins(counts, self.tstart, self.tstop, self.exposure,
count_uncerts=uncert)
return obj
[docs] def integrate_time(self, tstart=None, tstop=None):
"""Integrate the histogram over the time axis (producing a count rate
spectrum). Limits on the integration smaller than the full range can
be set.
Args:
tstart (float, optional):
The low end of the integration range. If not set, uses the
lowest time edge of the histogram
tstop (float, optional):
The high end of the integration range. If not set, uses the
highest time edge of the histogram
Returns:
(:class:`EnergyBins`)
"""
if tstart is None:
tstart = self.time_range[0]
if tstop is None:
tstop = self.time_range[1]
mask = self._slice_time_mask(tstart, tstop)
counts = self.counts[mask, :].sum(axis=0)
uncert = np.sqrt( (self.count_uncertainty[mask, :] ** 2).sum(axis=0) )
exposure = self.exposure[mask].sum()
exposure = np.full(counts.size, exposure)
obj = EnergyBins(counts, self.emin, self.emax, exposure,
count_uncerts=uncert)
return obj
[docs] def rebin_energy(self, method, *args, emin=None, emax=None):
"""Rebin the TimeEnergyBins object along the energy axis given a binning
function and return a new TimeEnergyBins object.
The binning function should take as input an array of counts,
array of exposures, and an array of bin edges. Additional arguments
specific to the function are allowed. The function should return an
array of the new counts, new exposure, and new edges.
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
emin (float, optional):
If set, defines the starting edge of the TimeEnergyBins to be
binned, otherwise binning will begin at the the first bin edge.
emax (float, optional):
If set, defines the ending edge of the TimeEnergyBins to be
binned, otherwise binning will end at the last bin edge.
Returns:
(:class:`TimeEnergyBins`)
"""
# empty bins
empty = self.__class__(np.array([[]]).reshape(0,0), [], [], [], [], [])
# set the start and stop of the rebinning segment
erange = self.energy_range
if emin is None:
emin = erange[0]
if emax is None:
emax = erange[1]
if emin < erange[0]:
emin = erange[0]
if emax > erange[1]:
emax = erange[1]
bins = self.contiguous_energy_bins()
new_histos = []
for bin in bins:
# split the histogram into pieces so that we only rebin the piece
# that needs to be rebinned
pre = empty
post = empty
erange = bin.energy_range
if (emax < erange[0]) or (emin > erange[1]):
histo = empty
elif emax == erange[1]:
if emin > erange[0]:
pre = bin.slice_energy(erange[0],
self.closest_energy_edge(emin,
which='low'))
histo = bin.slice_energy(self.closest_energy_edge(emin,
which='low'),
erange[1])
elif emin == erange[0]:
histo = bin.slice_energy(erange[0],
self.closest_energy_edge(emax,
which='high'))
if emax < erange[1]:
post = bin.slice_energy(self.closest_energy_edge(emax,
which='high'),
erange[1])
elif (emin > erange[0]) and (emax < erange[1]):
pre = bin.slice_energy(erange[0],
self.closest_energy_edge(emin, which='low'))
histo = bin.slice_energy(
self.closest_energy_edge(emin, which='low'),
self.closest_energy_edge(emax, which='high'))
post = bin.slice_energy(self.closest_energy_edge(emax,
which='high'), erange[1])
else:
histo = bin
# perform the rebinning and create a new histo with the
# rebinned rates
if histo.num_chans > 0:
edges = np.append(histo.emin, histo.emax[-1])
num_times, num_chans = histo.size
new_counts = []
new_uncert = []
for i in range(num_times):
exposure = np.full(num_chans, histo.exposure[i])
new_cts, uncert, _, new_edges = method(histo.counts[i, :],
histo.count_uncertainty[i,:],
exposure, edges, *args)
new_counts.append(new_cts)
new_uncert.append(uncert)
new_counts = np.array(new_counts)
new_uncert = np.array(new_uncert)
new_histo = TimeEnergyBins(new_counts, bin.tstart, bin.tstop,
bin.exposure, new_edges[:-1],
new_edges[1:],
count_uncerts=new_uncert)
else:
new_histo = bin
# now merge the split histo back together again
histos_to_merge = [i for i in (pre, new_histo, post) if
i.num_chans > 0]
if len(histos_to_merge) > 0:
new_histos.append(TimeEnergyBins.merge_energy(histos_to_merge))
new_histo = TimeEnergyBins.merge_energy(new_histos)
return new_histo
[docs] def rebin_time(self, method, *args, tstart=None, tstop=None):
"""Rebin the TimeEnergyBins object along the time axis given a binning
function and return a new TimeEnergyBins object
The binning function should take as input an array of counts,
array of exposures, and an array of bin edges. Additional arguments
specific to the function are allowed. The function should return an
array of the new counts, new exposure, and new edges.
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
tstart (float, optional):
If set, defines the start time of the TimeEnergyBins to be
binned, otherwise binning will begin at the time of the first
bin edge.
tstop (float, optional):
If set, defines the end time of the TimeEnergyBins to be
binned, otherwise binning will end at the time of the last
bin edge.
Returns:
(:class:`TimeEnergyBins`)
"""
# empty bins
empty = self.__class__(np.array([[]]).reshape(0,0), [], [], [], [], [])
# set the start and stop of the rebinning segment
trange = self.time_range
if tstart is None:
tstart = trange[0]
if tstop is None:
tstop = trange[1]
if tstart < trange[0]:
tstart = trange[0]
if tstop > trange[1]:
tstop = trange[1]
bins = self.contiguous_time_bins()
new_histos = []
for bin in bins:
trange = bin.time_range
# split the histogram into pieces so that we only rebin the piece
# that needs to be rebinned
pre = empty
post = empty
if (tstop < trange[0]) or (tstart > trange[1]):
histo = empty
elif tstop == trange[1]:
if tstart > trange[0]:
pre = bin.slice_time(trange[0],
self.closest_time_edge(tstart,
which='low'))
histo = bin.slice_time(self.closest_time_edge(tstart,
which='low'),
trange[1])
elif tstart == trange[0]:
histo = bin.slice_time(trange[0],
self.closest_time_edge(tstop,
which='high'))
if tstop < trange[1]:
post = bin.slice_time(self.closest_time_edge(tstop,
which='high'),
trange[1])
elif (tstart > trange[0]) and (tstop < trange[1]):
pre = bin.slice_time(trange[0],
self.closest_time_edge(tstart, which='low'))
histo = bin.slice_time(
self.closest_time_edge(tstart, which='low'),
self.closest_time_edge(tstop, which='high'))
post = bin.slice_time(self.closest_time_edge(tstop, which='high'),
trange[1])
else:
histo = bin
# perform the rebinning and create a new histo with the
# rebinned rates
if histo.num_times > 0:
edges = np.append(histo.tstart, histo.tstop[-1])
new_counts = []
new_uncert = []
for i in range(bin.num_chans):
new_cts, uncert, new_exposure, new_edges = method(
histo.counts[:, i],
histo.count_uncertainty[:, i],
histo.exposure,
edges, *args)
new_counts.append(new_cts)
new_uncert.append(uncert)
new_counts = np.array(new_counts).T
new_uncert = np.array(new_uncert).T
new_histo = TimeEnergyBins(new_counts, new_edges[:-1],
new_edges[1:], new_exposure,
bin.emin, bin.emax,
count_uncerts=new_uncert)
else:
new_histo = bin
# now merge the split histo back together again
histos_to_merge = [i for i in (pre, new_histo, post) if
i.num_times > 0]
if len(histos_to_merge) > 0:
new_histos.append(TimeEnergyBins.merge_time(histos_to_merge))
new_histo = TimeEnergyBins.merge_time(new_histos)
return new_histo
[docs] def slice_energy(self, emin, emax):
"""Perform a slice over an energy range and return a new TimeEnergyBins
object. Note that emin and emax values that fall inside a bin will
result in that bin being included.
Args:
emin (float): The start of the slice
emax (float): The end of the slice
Returns:
(:class:`TimeEnergyBins`)
"""
mask = self._slice_energy_mask(emin, emax)
obj = TimeEnergyBins(self.counts[:, mask], self.tstart, self.tstop,
self.exposure, self.emin[mask], self.emax[mask],
count_uncerts=self.count_uncertainty[:, mask],
quality=self.quality)
return obj
[docs] def slice_time(self, tstart, tstop):
"""Perform a slice over a time range and return a new TimeEnergyBins
object. Note that tstart and tstop values that fall inside a bin will
result in that bin being included.
Args:
tstart (float): The start of the slice
tstop (float): The end of the slice
Returns:
(:class:`TimeEnergyBins`)
"""
mask = self._slice_time_mask(tstart, tstop)
cls = type(self)
obj = cls(self.counts[mask, :], self.tstart[mask], self.tstop[mask],
self.exposure[mask], self.emin, self.emax,
count_uncerts=self.count_uncertainty[mask, :],
quality=self.quality[mask])
return obj
[docs] @classmethod
def from_rates(cls, rates, rate_uncerts, tstart, tstop, exposure, emin,
emax, **kwargs):
"""Create an TimeEnergyBins object from count rates and uncertainties.
Args:
rates (np.array): The count rates
rate_uncerts (np.array): The count rate uncertainties
tstart (np.array): The low-value edges of the time bins
tstop (np.array): The high-value edges of the time bins
exposure (np.array): The exposure of each bin
emin (np.array): The low-value edges of the energy bins
emax (np.array): The high-value edges of the energy bins
quality (np.array, optional): The spectrum quality flag
precalc_good_segments (bool, optional): If True, calculates the
good time and energy
segments on initialization.
Returns:
(:class:`TimeEnergyBins`)
"""
counts = rates * exposure[:, np.newaxis]
count_uncerts = rate_uncerts * exposure[:, np.newaxis]
return cls(counts, tstart, tstop, exposure, emin, emax,
count_uncerts=count_uncerts, **kwargs)
[docs] @classmethod
def merge_energy(cls, histos, **kwargs):
"""Merge multiple TimeEnergyBins together along the energy axis.
Args:
histos (list of :class:`TimeEnergyBins`):
A list containing the TimeEnergyBins to be merged
Returns:
(:class:`TimeEnergyBins`)
"""
num = len(histos)
# sort by channel edge
emins = np.concatenate([[histo.emin[0]] for histo in histos])
idx = np.argsort(emins)
# concatenate the histos in order
counts = histos[idx[0]].counts
uncert = histos[idx[0]].count_uncertainty
tstart = histos[idx[0]].tstart
tstop = histos[idx[0]].tstop
exposure = histos[idx[0]].exposure
quality = histos[idx[0]].quality
emin = histos[idx[0]].emin
emax = histos[idx[0]].emax
for i in range(1, num):
bin_starts = histos[idx[i]].emin
# make sure there is no overlap
mask = (bin_starts >= emax[-1])
if (~mask).sum() > 0:
raise ValueError('Overlapping bins cannot be merged. Only' \
'non-overlapping bins can be merged.')
counts = np.hstack((counts, histos[idx[i]].counts[:, mask]))
uncert = np.hstack((uncert, histos[idx[i]].count_uncertainty[:, mask]))
emin = np.concatenate((emin, histos[idx[i]].emin[mask]))
emax = np.concatenate((emax, histos[idx[i]].emax[mask]))
# new TimeEnergyBins object
merged_bins = cls(counts, tstart, tstop, exposure, emin, emax,
count_uncerts=uncert, quality=quality, **kwargs)
return merged_bins
[docs] @classmethod
def merge_time(cls, histos, **kwargs):
"""Merge multiple TimeEnergyBins together along the time axis.
Args:
histos (list of :class:`TimeEnergyBins`):
A list containing the TimeEnergyBins to be merged
Returns:
(:class:`TimeEnergyBins`)
"""
num = len(histos)
# sort by start time
tstarts = np.concatenate([[histo.tstart[0]] for histo in histos])
idx = np.argsort(tstarts)
# concatenate the histos in order
counts = histos[idx[0]].counts
uncert = histos[idx[0]].count_uncertainty
tstart = histos[idx[0]].tstart
tstop = histos[idx[0]].tstop
exposure = histos[idx[0]].exposure
emin = histos[idx[0]].emin
emax = histos[idx[0]].emax
quality = histos[idx[0]].quality
for i in range(1, num):
bin_starts = histos[idx[i]].tstart
# make sure there is no overlap
mask = (bin_starts >= tstop[-1])
if (~mask).sum() > 0:
raise ValueError('Overlapping bins cannot be merged. Only' \
'non-overlapping bins can be merged.')
counts = np.vstack((counts, histos[idx[i]].counts[mask, :]))
uncert = np.vstack((uncert, histos[idx[i]].count_uncertainty[mask, :]))
tstart = np.concatenate((tstart, histos[idx[i]].tstart[mask]))
tstop = np.concatenate((tstop, histos[idx[i]].tstop[mask]))
exposure = np.concatenate(
(exposure, histos[idx[i]].exposure[mask]))
quality = np.concatenate((quality, histos[idx[i]].quality[mask]))
# new TimeEnergyBins object
merged_bins = cls(counts, tstart, tstop, exposure, emin, emax,
count_uncerts=uncert, quality=quality, **kwargs)
return merged_bins
def _assert_range(self, valrange):
assert valrange[0] <= valrange[1], \
'Range must be in increasing order: (lo, hi)'
return valrange
def _calculate_good_segments(self, lo_edges, hi_edges):
"""Calculates the ranges of data that are contiguous segments
Args:
lo_edges (np.array): The lower bin edges
hi_edges (np.array): The upper bin edges
Returns:
([(float, float), ...])
"""
mask = (lo_edges[1:] != hi_edges[:-1])
if mask.sum() == 0:
return [(lo_edges[0], hi_edges[-1])]
edges = np.concatenate(([lo_edges[0]], hi_edges[:-1][mask],
lo_edges[1:][mask], [hi_edges[-1]]))
edges.sort()
return edges.reshape(-1, 2).tolist()
def _closest_edge(self, edges, val, which='either'):
"""Return the closest time bin edge
Args:
val (float): Input value
which (str, optional): Options are:
* 'either' - closest edge to val;
* 'low' - closest edge lower than val;
* 'high' - closest edge higher than val. Default is 'either'
Returns:
(float)
"""
idx = np.argmin(np.abs(val - edges))
if which == 'low':
if (edges[idx] > val) and (idx - 1) >= 0:
idx -= 1
elif (which == 'high') and (idx + 1) < edges.size:
if edges[idx] < val:
idx += 1
else:
pass
return edges[idx]
def _slice_energy_mask(self, emin, emax):
emin_snap = self.closest_energy_edge(emin, which='low')
emax_snap = self.closest_energy_edge(emax, which='high')
mask = (self.emin < emax_snap) & (self.emax > emin_snap)
return mask
def _slice_time_mask(self, tstart, tstop):
tstart_snap = self.closest_time_edge(tstart, which='low')
tstop_snap = self.closest_time_edge(tstop, which='high')
mask = (self.tstart < tstop_snap) & (self.tstop > tstart_snap)
return mask
def __repr__(self):
s = '<{0}: {1} time bins;\n'.format(self.__class__.__name__,
self.num_times)
s += ' time range {0};\n'.format(self.time_range)
if self._good_time_segments is not None:
s += ' {0} time segments;\n'.format(len(self._good_time_segments))
s += ' {0} energy bins;\n'.format(self.num_chans)
s += ' energy range {0}'.format(self.energy_range)
if self._good_energy_segments is not None:
s += ';\n {0} energy segments'.format(len(self._good_energy_segments))
return s+'>'
[docs]class ResponseMatrix():
"""A class defining a 2D detector response matrix, with an (input) photon
axis and (output) channel axis.
Parameters:
matrix (np.array): The 2D matrix, of shape
(num_photon_bins, num_channels)
emin (np.array): The low edges of the (input) photon bins
emax (np.array): The high edges of the (input) photon bins
chanlo (np.array): The low edges of the (output) energy channels
chanhi (np.array): The high edges of the (output) energy channels
"""
def __init__(self, matrix, emin, emax, chanlo, chanhi):
try:
iter(matrix)
self._matrix = np.asarray(matrix)
except:
raise TypeError('matrix must be an iterable')
if self._matrix.ndim != 2:
raise TypeError('matrix must be a 2-dimensional array')
try:
iter(emin)
self._emin = np.asarray(emin).flatten()
except:
raise TypeError('emin must be an iterable')
try:
iter(emax)
self._emax = np.asarray(emax).flatten()
except:
raise TypeError('emax must be an iterable')
try:
iter(chanlo)
self._chanlo = np.asarray(chanlo).flatten()
except:
raise TypeError('chanlo must be an iterable')
try:
iter(chanhi)
self._chanhi = np.asarray(chanhi).flatten()
except:
raise TypeError('chanhi must be an iterable')
if (self._emin.size != self._emax.size):
raise ValueError('emin and emax must be the same length')
if (self._chanlo.size != self._chanhi.size):
raise ValueError('chanlo and chanhi must be the same length')
if (self._matrix.shape[0] != self._emin.size) or \
(self._matrix.shape[1] != self._chanlo.size):
raise ValueError('matrix axis 0 must have same length as emin and '\
'emax. matrix axis 1 must have same length as '\
'chanlo and chanhi.')
@property
def channel_centroids(self):
"""(np.array): The geometric mean of the energy channels"""
return np.sqrt(self._chanlo * self._chanhi)
@property
def channel_widths(self):
"""(np.array): The energy channel widths"""
return self._chanhi - self._chanlo
@property
def ebounds(self):
"""(:class:`Ebounds`): The energy bounds"""
return Ebounds.from_bounds(self._chanlo, self._chanhi)
@property
def matrix(self):
"""(np.array): The raw matrix"""
return self._matrix
@property
def num_chans(self):
"""(int): The number of energy channels"""
return self._matrix.shape[1]
@property
def num_ebins(self):
"""(int): The number of photon bins"""
return self._matrix.shape[0]
@property
def photon_bins(self):
"""(:class:`Ebounds`): The photon bins"""
return Ebounds.from_bounds(self._emin, self._emax)
@property
def photon_bin_centroids(self):
"""(np.array): The geometric mean of the photon bins"""
return np.sqrt(self._emin * self._emax)
@property
def photon_bin_widths(self):
"""(np.array): The photon bin widths"""
return self._emax - self._emin
[docs] def channel_effective_area(self):
"""Returns the effective area as a function of recorded channel energy
(integrated over incident photon bins).
Returns:
(:class:`Bins`)
"""
bins = Bins(self._matrix.sum(axis=0), self._chanlo, self._chanhi)
return bins
[docs] def effective_area(self, energy, interp_kind='linear'):
"""Calculate the effective area at a given energy or an array of
energies. This function interpolates the DRM (in log10(cm^2)) to
provide the effective area for any energy within the photon energy
bounds of the DRM.
Args:
energy (float or np.array): The photon energy or energies at which
to calculate the effective area.
interp_kind (str, optional): The kind of interpolation to be
passed to scipy.interp1d. Default is
'linear'.
Returns:
(np.array)
"""
try:
energy = np.asarray(energy, dtype=float)
except:
raise TypeError('energy must be a positive float')
if np.any(energy < self.photon_bins.range[0]) or \
np.any(energy > self.photon_bins.range[1]):
raise ValueError('energy must be within the photon energy bounds ' \
'of the DRM')
effarea = self.photon_effective_area()
diff_effarea = effarea.counts
# create the DRM interpolator
x = np.append(self._emin, self._emax[-1])
y = np.append(diff_effarea, diff_effarea[-1])
nz_mask = (y > 0)
effarea_interpolator = interp1d(x[nz_mask], np.log10(y[nz_mask]),
kind=interp_kind,
fill_value='extrapolate')
# do the interpolation
effarea = 10.0**effarea_interpolator(energy)
return effarea
[docs] def fold_spectrum(self, function, params, channel_mask=None):
"""Fold a photon spectrum through a DRM to get a count spectrum
Args:
function (<function>):
A photon spectrum function. The function must accept a list of
function parameters as its first argument and an array of photon
energies as its second argument. The function must return the
evaluation of the photon model in units of ph/s-cm^2-keV.
params (list of float): A list of parameter values to be passed to
the photon spectrum function
channel_mask (np.array, optional):
A boolean mask where True indicates the channel is to be used
for folding and False indicates the channel is to not be used
for folding. If omitted, all channels are used.
Returns:
(np.array)
"""
drm = self._matrix
if channel_mask is not None:
drm = drm[:, channel_mask]
# evaluate photon model
photon_model = function(params, self.photon_bin_centroids)
# fold photon model through DRM
counts = np.dot(drm.T, photon_model * self.photon_bin_widths)
return counts
[docs] def photon_effective_area(self):
"""Returns the effective area as a function of incident photon energy
(integrated over recorded energy channels).
Returns:
(:class:`Bins`)
"""
bins = Bins(self._matrix.sum(axis=1), self._emin, self._emax)
return bins
[docs] def rebin(self, factor=None, edge_indices=None):
"""Rebins the channel energy axis of a DRM and returns a new response
object. Rebinning can only be used to downgrade the channel resolution
and is constrained to the channel edges of the current DRM.
Rebinning can be performed by either defining an integral factor of the
number of current energy channels to combine (e.g. factor=4 for 128
energy channels would result in 32 energy channels), or by defining an
index array into the current channel edges that will define the new set
of edges.
Args:
factor (int, optional): The rebinning factor. Must set either this
or `edge_indices`
edge_indices (np.array, optional): The index array that represents
which energy edges should remain
in the rebinned DRM.
Returns:
(:class:`ResponseMatrix`)
"""
if (factor is None) and (edge_indices is None):
raise ValueError('Either factor or edge_indices must be set')
elif factor is not None:
try:
factor = int(factor)
except:
raise TypeError('factor must be a positive integer')
if factor < 1:
raise ValueError('factor must be a positive integer')
if (self.num_chans % factor) > 0:
raise ValueError('factor must be factor of numchans: '\
'{}'.format(self.num_chans))
chanlo = self._chanlo.reshape(-1, factor)[:,0]
chanhi = self._chanhi.reshape(-1, factor)[:,-1]
elif edge_indices is not None:
try:
edge_indices = np.asarray(edge_indices)
except:
raise TypeError('new_edges must be a list, tuple or array')
edge_indices = np.unique(edge_indices.astype(int))
if (edge_indices[0] < 0) or (edge_indices[-1] > self.num_chans):
raise ValueError('edge_indices outside valid range of ' \
'0-{}'.format(self.num_chans))
old_edges = np.append(self._chanlo, self._chanhi[-1])
new_edges = old_edges[edge_indices]
chanlo = new_edges[:-1]
chanhi = new_edges[1:]
# the new effective area matrix will just be summed over the channel
# axis for the bins we are combining
new_matrix = np.zeros((self.num_ebins, chanlo.size))
sidx = (self._chanlo[:,np.newaxis] == chanlo[np.newaxis,:]).nonzero()[0]
eidx = (self._chanhi[:,np.newaxis] == chanhi[np.newaxis,:]).nonzero()[0]
for i in range(chanlo.size):
new_matrix[:,i] = self._matrix[:,sidx[i]:eidx[i]+1].sum(axis=1)
obj = type(self)(new_matrix, self._emin, self._emax, chanlo, chanhi)
return obj
[docs] def resample(self, num_photon_bins=None, photon_bin_edges=None,
num_interp_points=20, interp_kind='linear'):
"""Resamples the incident photon axis of a DRM and returns a new
ResponseMatrix object. Resampling can be used to downgrade the photon
energy resolution, upgrade the resolution, and/or redefine the edges of
the incident photon bins. By definition, the resampling can only be
performed within the photon energy range of the current object.
The process for resampling is to create a high-resolution grid for each
new photon bin, interpolate the differential effective area onto that
grid (interpolation is performed on log10(effective area)), and then
integrate the differential effective area over the grid points in each
bin. The final effective area is then scaled by the ratio of the
initial number of photon bins to the requested number of photon bins.
Args:
num_photon_bins (int, optional): The number of photon bins in the
new DRM. The bin edges will be
generated logarithmically. Only set
this or `photon_bin_edges`.
photon_bin_edges (np.array, optional): The array of photon bin edges.
Only set this or
`num_photon_bins`
num_interp_points (int, optional): The number of interpolation points
used to integrate over for each
new photon bin. Default is 20.
interp_kind (str, optional): The kind of interpolation to be
passed to scipy.interp1d. Default is
'linear'.
Returns:
(:class:`ResponseMatrix`)
"""
if (num_photon_bins is None) and (photon_bin_edges is None):
raise ValueError('Either num_photon_bins or photon_bin_edges must' \
' be set')
elif num_photon_bins is not None:
try:
num_photon_bins = int(num_photon_bins)
assert num_photon_bins > 0
except:
raise TypeError('num_photon_bins must be a positive integer')
photon_bin_edges = np.geomspace(self._emin[0], self._emax[-1],
num_photon_bins+1)
elif photon_bin_edges is not None:
try:
photon_bin_edges = np.asarray(photon_bin_edges)
except:
raise TypeError('photon_bin_edges must be an iterable')
badmask = (photon_bin_edges < self._emin[0]) | \
(photon_bin_edges > self._emax[-1])
if badmask.sum() > 0:
raise ValueError('photon_bin_edges are beyond valid range of '\
'{0:.2f}-{1:.2f}'.format(self._emin[0],
self._emax[-1]))
photon_bin_edges = np.sort(photon_bin_edges)
num_photon_bins = photon_bin_edges.size-1
# differential effective area
diff_effarea = self._matrix/self.photon_bin_widths[:,np.newaxis]
# create an interpolation array over each new bin
photon_bounds = np.vstack((photon_bin_edges[:-1], photon_bin_edges[1:]))
interp_arr = np.geomspace(*photon_bounds, num_interp_points+2, axis=0)
# create the DRM interpolator
x = np.append(self._emin, self._emax[-1])
y = np.vstack((diff_effarea, diff_effarea[-1,:]))
y[y == 0] = 1e-10
effarea_interpolator = interp1d(x, np.log10(y), axis=0,
kind=interp_kind,
fill_value='extrapolate')
# interpolate the DRM over the photon interpolation array
diff_effarea_interp = 10.**effarea_interpolator(interp_arr)
badmask = np.isnan(diff_effarea_interp)
diff_effarea_interp[badmask] = 0.0
# integrate the DRM over the photon interpolation array
diff_effarea_new = trapezoid(diff_effarea_interp,
interp_arr[:,:,np.newaxis], axis=0)
# scale by ratio of photon bins
drm = diff_effarea_new / (self.num_ebins/num_photon_bins)
# create response object
obj = type(self)(drm, photon_bin_edges[:-1], photon_bin_edges[1:],
self._chanlo, self._chanhi)
return obj
def __repr__(self):
return '<ResponseMatrix: {0} energy bins; {1} ' \
'channels>'.format(self.num_ebins, self.num_chans)
[docs]class Parameter():
"""A fit parameter class
Parameters:
value (float): The central fit value
uncert (float or 2-tuple): The 1-sigma uncertainty. If a 2-tuple, then
is of the form (low, high)
name (str, optional): The name of the parameter
units (str, optional): The units of the parameter
support (2-tuple, optional): The valid support of the parameter
"""
def __init__(self, value, uncert, name='', units=None,
support=(-np.inf, np.inf)):
self._value = float(value)
if isinstance(uncert, (tuple, list)):
if len(uncert) == 2:
pass
elif len(uncert) == 1:
uncert = (uncert[0], uncert[0])
else:
raise ValueError('uncertainty must be a 1- or 2-tuple')
elif isinstance(uncert, float):
uncert = (uncert, uncert)
else:
raise TypeError('uncertainty must be a float or 1- or 2-tuple')
self._uncert = uncert
self._units = units
self._name = name
self._support = support
@property
def name(self):
"""(str): The name of the parameter"""
return self._name
@property
def support(self):
"""(2-tuple): The valid support of the parameter"""
return self._support
@property
def uncertainty(self):
"""(float, float): The 1-sigma uncertainty"""
return self._uncert
@property
def units(self):
"""(str): The units of the parameter"""
return self._units
@property
def value(self):
"""(float): The central fit value"""
return self._value
[docs] def one_sigma_range(self):
"""Return the 1 sigma range of the parameter fit
Returns:
(tuple): 2-tuple (low, high)
"""
return (self.value - self.uncertainty[0],
self.value + self.uncertainty[1])
[docs] def to_fits_value(self):
"""Return as a tuple to be used for a FITS file
Returns:
(tuple): 3-value tuple (value, +uncertainty, -uncertainty)
"""
return (self.value, *self.uncertainty[::-1])
[docs] def valid_value(self):
"""Check if the parameter value is within the allowed parameter range
Returns:
(bool)
"""
if (self.value >= self.support[0]) and \
(self.value <= self.support[1]):
return True
else:
return False
def _str_format(self):
if (self.value > 0.005) and (self.uncertainty[0] > 0.005):
value = '{0:.2f}'.format(self.value)
uncertainty = tuple(
['{0:.2f}'.format(u) for u in self.uncertainty])
else:
value = '{0:.2e}'.format(self.value)
val_coeff, val_exp = value.split('e')
val_exp = int(val_exp)
uncertainty = ['{0:.2e}'.format(u) for u in self.uncertainty]
uncert_coeff = []
uncert_exp = []
for uncert in uncertainty:
uncert_coeff.append(uncert.split('e')[0])
uncert_exp.append(int(uncert.split('e')[1]))
return (value, uncertainty)
def __repr__(self):
return '<{0}: {1}>'.format(self.__class__.__name__, self.name)
def __str__(self):
value, uncertainty = self._str_format()
if uncertainty[0] == uncertainty[1]:
s = '+/- {0}'.format(uncertainty[0])
else:
s = '+{1}/-{0}'.format(uncertainty[0], uncertainty[1])
if self.units is None:
return '{0}: {1} {2}'.format(self.name, value, s)
else:
return '{0}: {1} {2} {3}'.format(self.name, value, s, self.units)