Source code for gdt.core.data_primitives.events

# CONTAINS TECHNICAL DATA/COMPUTER SOFTWARE DELIVERED TO THE U.S. GOVERNMENT WITH UNLIMITED RIGHTS
#
# Contract Nos.: CA 80MSFC17M0022 / 80NSSC24M0035
# Contractor Name: Universities Space Research Association
# Contractor Address: 7178 Columbia Gateway Drive, Columbia, MD 21046
#
# Copyright 2017-2022 by Universities Space Research Association (USRA). All rights reserved.
#
# Developed by: William Cleveland and Adam Goldstein
#               Universities Space Research Association
#               Science and Technology Institute
#               https://sti.usra.edu
#
# Developed by: Daniel Kocevski
#               National Aeronautics and Space Administration (NASA)
#               Marshall Space Flight Center
#               Astrophysics Branch (ST-12)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing permissions and limitations under the
# License.
#
import numpy as np
import copy
from .bins import ChannelBins, EnergyBins, TimeChannelBins, TimeEnergyBins
from .intervals import Ebounds

__all__ = ['EventList']


[docs]class EventList(): """A primitive class defining an event list. Parameters: times (np.array): The array of event times channels (np.array): The corresponding array of associated energy channel ebounds (:class:`Ebounds`, optional): The energy bounds mapped to energy channels """ def __init__(self, times=None, channels=None, ebounds=None): if (times is not None) and (channels is not None): times = np.asarray(times).flatten() channels = np.asarray(channels).flatten() if times.size != channels.size: raise ValueError('times and channels arrays must be same length') else: times = np.array([], dtype=float) channels = np.array([], dtype=int) self._events = np.empty(times.size, dtype=[('TIME', times.dtype.type), ('PHA', channels.dtype.type)]) self._events['TIME'] = times self._events['PHA'] = channels if ebounds is not None: if not isinstance(ebounds, Ebounds): raise TypeError('ebounds must be of type Ebounds') self._ebounds = ebounds @property def channel_range(self): """(int, int): The range of the channels in the list""" if self.size > 0: return int(np.min(self.channels)), int(np.max(self.channels)) @property def channels(self): """(np.array): The PHA channel array""" return self._events['PHA'] @property def ebounds(self): """(:class:`Ebounds`): The energy bounds of the energy channels. This property can be set. """ return self._ebounds @ebounds.setter def ebounds(self, val): if not isinstance(val, Ebounds): raise TypeError('ebounds must be an Ebounds object') self._ebounds = val @property def emax(self): """(float): The maximum energy""" if self._ebounds is not None: return self._ebounds.range[1] @property def emin(self): """(float): The minimum energy""" if self._ebounds is not None: return self._ebounds.range[0] @property def energy_range(self): """(float, float): The energy range of the channels""" if self.size > 0: if self._ebounds is not None: chan_min, chan_max = self.channel_range return (self._ebounds[chan_min].emin, self._ebounds[chan_max].emax) @property def num_chans(self): """(int): The number of energy channels. Note that not all channels will necessarily have events, especially if a slice is made over energy.""" if self.ebounds is not None: return self._ebounds.num_intervals else: return (self._events['PHA'].max() - self._events['PHA'].min()) + 1 @property def size(self): """(int): The number of events in the list""" return self._events.size @property def time_range(self): """(float, float): The range of the times in the list""" if self.size > 0: return self.times.min(), self.times.max() @property def times(self): """(np.array): The event times""" return self._events['TIME']
[docs] def bin(self, method, *args, tstart=None, tstop=None, event_deadtime=2.6e-6, overflow_deadtime=1.0e-5, **kwargs): """Bin the EventList in time given a binning function and return a 2D time-energy channel histogram. If the ebounds energy calibration is set, returns a :class:`TimeEnergyBins` object, otherwise returns a :class:`TimeChannelBins` object. The binning function should take as input an array of times as well as a tstart and tstop keywords for partial list binning. Additional arguments and keyword arguments specific to the function are allowed. The function should return an array of time edges for the bins, such that, for `n` bins, there are `n` + 1 edges. Args: method (<function>): A binning function *args: Arguments to be passed to the binning function tstart (float, optional): If set, defines the start time of the EventList to be binned, otherwise binning will begin at the time of the first event. tstop (float, optional): If set, defines the end time of the EventList to be binned, otherwise binning will end at the time of the last event. event_deadtime (float, optional): The deadtime per event in seconds. Default is 2.6e-6. overflow_deadtime (float, optional): The deadtime per event in the overflow channel in seconds. Default is 1e-5. **kwargs: Options to be passed to the binning function Returns: (:class:`TimeEnergyBins` or :class:`TimeChannelBins`) """ if tstart is None: tstart = self.time_range[0] if tstop is None: tstop = self.time_range[1] # set the start and stop of the rebinning segment mask = (self.times >= tstart) & (self.times <= tstop) bin_times = self.times[mask] kwargs['tstart'] = tstart kwargs['tstop'] = tstop # get the time edges from the binning function and then do the 2d histo time_edges = method(bin_times, *args, **kwargs) if self.ebounds is not None: chan_list = np.arange(self.ebounds.num_intervals + 1) else: chan_list = np.arange(self.channel_range[1] + 2) counts = np.histogram2d(bin_times, self.channels[mask], [time_edges, chan_list])[0] # calculate exposure lo_edges = time_edges[:-1] hi_edges = time_edges[1:] overflow_counts = counts[:, -1] deadtime = counts.sum(axis=1) * event_deadtime + \ overflow_counts * (overflow_deadtime - event_deadtime) exposure = (hi_edges - lo_edges) - deadtime if self.ebounds is None: bins = TimeChannelBins(counts, lo_edges, hi_edges, exposure, chan_list[:-1]) else: bins = TimeEnergyBins(counts, lo_edges, hi_edges, exposure, self.ebounds.low_edges(), self.ebounds.high_edges()) return bins
[docs] def channel_slice(self, chanlo, chanhi): """Perform a slice in energy channels of the EventList and return a new EventList Args: chanlo (int): The start of the channel slice chanhi (int): The end of the channel slice Returns: (:class:`EventList`) """ mask = (self.channels >= chanlo) & (self.channels <= chanhi) events = self._events[mask] return EventList(times=events['TIME'], channels=events['PHA'], ebounds=copy.deepcopy(self._ebounds))
[docs] def count_spectrum(self, **kwargs): """Extract a count spectrum for the EventList or for a segment of the EventList. If the ebounds energy calibration is set, returns an :class:`EnergyBins` object, otherwise returns a :class:`ChannelBins` object. Args: time_ranges ([(float, float), ...], optional): The time range or time ranges over which to calculate the exposure. If omitted, calculates the total exposure of the data. event_deadtime (float, optional): The deadtime per event in seconds. Default is 0. overflow_deadtime (float, optional): The deadtime per event in the overflow channel in seconds. Default is 0. Returns: (:class:`EnergyBins` or :class:`ChannelBins`) """ if self.ebounds is not None: chan_list = np.arange(self.ebounds.num_intervals + 1) else: chan_list = np.arange(self.channel_range[1] + 2) counts = np.histogram(self.channels, bins=chan_list)[0] exposure = np.full(chan_list.size-1, self.get_exposure(time_ranges=None, **kwargs)) if self.ebounds is None: bins = ChannelBins.create(counts, chan_list[:-1], exposure) else: bins = EnergyBins(counts, self.ebounds.low_edges(), self.ebounds.high_edges(), exposure) return bins
[docs] def energy_slice(self, emin, emax): """Perform a slice in energy of the EventList and return a new EventList. Since energies are binned, an emin or emax falling inside of an energy channel bin will include that bin in the slice. Args: emin (float): The start of the energy slice emax (float): The end of the energy slice Returns: (:class:`EventList`) """ if self._ebounds is None: raise AttributeError('Ebounds has not been set, therefore the \ the EventList cannot be sliced by energy') emins = np.array([ebound.emin for ebound in self._ebounds.intervals]) emaxs = np.array([ebound.emax for ebound in self._ebounds.intervals]) mask = (emins < emax) & (emaxs > emin) chan_nums = np.arange(self.channel_range[0], self.channel_range[1]+1) chan_nums = chan_nums[mask] return self.channel_slice(chan_nums[0], chan_nums[-1])
[docs] def get_exposure(self, time_ranges=None, event_deadtime=0.0, overflow_deadtime=0.0): """Calculate the total exposure, in seconds, of a time range or time ranges of data. Args: time_ranges ([(float, float), ...], optional): The time range or time ranges over which to calculate the exposure. If omitted, calculates the total exposure of the data. event_deadtime (float, optional): The deadtime per event in seconds. Default is 0. overflow_deadtime (float, optional): The deadtime per event in the overflow channel in seconds. Default is 0. Returns: (float) """ if time_ranges is None: time_ranges = [self.time_range] try: iter(time_ranges[0]) except: time_ranges = [time_ranges] exposure = 0.0 for i in range(len(time_ranges)): tstart, tstop = self._assert_range(time_ranges[i]) tcent = (tstop + tstart) / 2.0 dt = (tstop - tstart) / 2.0 mask = (np.abs(self.times - tcent) <= dt) # mask = (self.time >= tstart) & (self.time < tstop) deadtime = mask.sum() * event_deadtime if self.ebounds is not None: of_chan = self.ebounds.num_intervals-1 else: of_chan = self.channel_range[1] deadtime += np.sum(self.channels[mask] == of_chan) * \ (overflow_deadtime - event_deadtime) exposure += (tstop - tstart) - deadtime return exposure
[docs] def rebin_energy(self, method, *args, **kwargs): """Rebin the energy channels using the specified binning algorithm. This does not change the number of events in the EventList, but changes their assignment to a channel and bins the energy bounds mapping to those channels (if applicable). A new EventList object is returned. Args: method (<function>): A binning function *args: Arguments to be passed to the binning function **kwargs: Deadtime arguments passed to get_exposure Returns: (:class:`EventList`) """ # exposure and counts; not really used other than for some specific # binning algorithms exposure = np.full(self.num_chans, self.get_exposure(**kwargs)) chans, counts = np.unique(self.channels, return_counts=True) if chans.size != self.num_chans: counts_fill = np.zeros(self.num_chans) counts_fill[chans] = counts counts = counts_fill edges = np.arange(self.num_chans + 1) # call the binning algorithm and get the new edges _, _, _, new_edges = method(counts, np.sqrt(counts), exposure, edges, *args) # re-assign the pha channels based on the new edges # and also rebin the ebounds new_channels = np.zeros_like(self.channels) new_ebounds = [] for i in range(len(new_edges) - 1): emin = new_edges[i] emax = new_edges[i + 1] mask = (self.channels >= emin) & (self.channels < emax) new_channels[mask] = i if self.ebounds is not None: new_ebounds.append((self.ebounds[emin].emin, self.ebounds[emax - 1].emax)) # create the new EventList object with the rebinned channels if self.ebounds is not None: new_ebounds = Ebounds.from_list(new_ebounds) else: new_ebounds = None obj = EventList(times=np.copy(self.times), channels=new_channels, ebounds=new_ebounds) return obj
[docs] def sort_time(self): """In-place sort by time. """ idx = np.argsort(self._events['TIME']) self._events = self._events[:][idx]
[docs] def sort_channels(self): """In-place sort by channel number. """ idx = np.argsort(self._events['PHA']) self._events = self._events[:][idx]
[docs] def time_slice(self, tstart, tstop): """Perform a slice in time of the EventList and return a new EventList Args: tstart (float): The start of the time slice tstop (float): The end of the time slice Returns: (:class:`EventList`) """ mask = (self.times >= tstart) & (self.times <= tstop) events = self._events[mask] return EventList(times=events['TIME'], channels=events['PHA'], ebounds=copy.deepcopy(self._ebounds))
[docs] @classmethod def merge(cls, eventlists, sort=False, force_unique=True): """Merge multiple EventLists together in time and optionally sort. Args: eventlist (list of :class:`EventList`): A list containing the EventLists to be merged sort (bool, optional): If True, sorts by time after the merge. Default is False. force_unique (bool, optional): If True, force all events to be unique via brute force sorting. If False, the EventLists will only be checked and masked for overlapping time ranges. Events can potentially be lost if the merged EventLists contain overlapping times (but not necessarily duplicate events), however this method is much faster. Default is True. Returns: (:class:`EventList`) """ # put in time order idx = np.argsort([eventlist.times.min() for eventlist in eventlists]) eventlists = [eventlists[i] for i in idx] # mark: TODO add check for ebounds consistency ebounds_idx = 0 for i in range(len(eventlists)): if eventlists[i].ebounds is None: continue else: ebounds_idx = i break new_times = np.array([]) new_chans = np.array([]) for eventlist in eventlists: # skip empty EventLists if eventlist.size == 0: continue # if not forcing to be unique, just make sure there is no time overlap if (not force_unique) and (new_times.size > 0): mask = (eventlist.times > new_times.max()) temp_times = eventlist.times[mask] temp_chans = eventlist.channels[mask] else: temp_times = eventlist.times temp_chans = eventlist.channels new_times = np.concatenate((new_times, temp_times)) new_chans = np.concatenate((new_chans, temp_chans)) # force unique: make sure that we only keep unique events (slower) if force_unique: new_times, uidx = np.unique(new_times, return_index=True) new_chans = new_chans[uidx] obj = cls(times=new_times, channels=new_chans, ebounds=copy.deepcopy(eventlists[ebounds_idx].ebounds)) # do a sort if sort and not force_unique: obj.sort_time() return obj
def _assert_range(self, valrange): assert valrange[0] <= valrange[1], \ 'Range must be in increasing order: (lo, hi)' return valrange def __repr__(self): s = '<EventList: {0} events;\n'.format(self.size) s += ' time range {0};\n channel range {1}>'.format(self.time_range, self.channel_range) return s