# CONTAINS TECHNICAL DATA/COMPUTER SOFTWARE DELIVERED TO THE U.S. GOVERNMENT WITH UNLIMITED RIGHTS
#
# Developed by: Joshua Wood
# National Aeronautics and Space Administration (NASA)
# Marshall Space Flight Center
# Astrophysics Branch (ST-12)
#
# Developed by: Jacob Smith
# University of Alabama in Huntsville
# Center for Space Plasma and Aeronomic Research
#
# This software is not subject to EAR.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing permissions and limitations under the
# License.
#
import numpy as np
import matplotlib.lines
import matplotlib.pyplot as plt
import matplotlib.patches
from astropy.utils import isiterable
from rich.progress import track
from astropy import units as u
from .plot.lightcurve import Lightcurve
from .binning.unbinned import bin_by_time
from .binning.binned import rebin_by_edge_index, combine_by_factor
__all__ = ['TriggerAlgorithm', 'Trigger', 'SlidingWindowMethod']
[docs]class TriggerAlgorithm:
"""Class for holding information related to a trigger algorithm
Parameters:
timescale (int): The duration in integer milliseconds of the window
over which we count events contributing to a trigger
offset (int): The time offset in integer milliseconds between the
start time of the trigger window and the beginning of
instrument data
channels (list|tuple): The energy channel range given as
(channel_start, channel_stop) for counts contributing
to the trigger window
threshold (float): The signficance above background for including
a detector in the trigger
"""
def __init__(self, timescale: int, offset: int, channels: list | tuple, threshold: float):
# apply sanity checks
if not isinstance(timescale, int):
raise ValueError("Timescale must be an integer number of milliseconds")
if not isinstance(offset, int):
raise ValueError("Offset must be an integer number of milliseconds")
if channels[1] < channels[0]:
raise ValueError("channels[1] must be >= channels[0]")
self._timescale = timescale
self._offset = offset
self._channels = list(channels)
self._threshold = threshold
@property
def timescale(self):
"""(int): The trigger window duration in integer milliseconds"""
return self._timescale
@property
def offset(self):
"""(int): The offset of the first trigger window relative to data start time in integer milliseconds"""
return self._offset
@property
def channels(self):
"""(list): The energy channel range contributing to the trigger"""
return self._channels
@property
def threshold(self):
"""(float): The significance level above background for including a detector in the trigger"""
return self._threshold
[docs] def timescale_in_unit(self, unit):
"""Method for retrieving the trigger timescale with an astropy unit
Args:
unit (astropy.units): The desired unit for returning the trigger timescale
(e.g. astropy.units.second)
Returns:
(astropy.units)
"""
return (self._timescale * u.ms).to(unit)
[docs] def offset_in_unit(self, unit):
"""Method for retrieving the trigger offset with an astropy unit
Args:
unit (astropy.units): The desired unit for returning the trigger offset
(e.g. astropy.units.second)
Returns:
(astropy.units)
"""
return (self._offset * u.ms).to(unit)
def __repr__(self):
return "TriggerAlgorithm(timescale {:4d} ms, offset {:4d} ms, channels {}, threshold {:.2f} sigma)".format(
self._timescale, self._offset, self._channels, self._threshold)
[docs]class Trigger:
"""Class for storing trigger information
Parameters:
alg_num (int): Trigger algorithm number
alg (TriggerAlgorithm): Class defining the algorithm properties
time (float): Trigger time in seconds
sig (np.array): Significance over background in each detector
triggered_det (np.array): Array with values of True for detectors
contributing to the trigger, False otherwise
detector_names (list): List of detector names associated with
entries in triggered_det
"""
def __init__(self, alg_num: int, alg: TriggerAlgorithm, time: float,
sig: np.array, triggered_det: np.array, detector_names: list):
self.alg_num = alg_num
self.alg = alg
self.time = time
self.sig = sig
self.triggered_det = triggered_det
self.detector_names = detector_names
@property
def triggered_det_num(self):
"""(list): List of detector numbers contributing to this trigger"""
return [i for i in range(self.triggered_det.size) if self.triggered_det[i]]
@property
def triggered_det_names(self):
"""(list): List of detector names contributing to this trigger"""
return [self.detector_names[i] for i in self.triggered_det_num]
def __repr__(self):
"""(str): String representation of trigger information"""
return 'Trigger <algorithm num {}, timescale {} ms, time {} s, triggered det {}>'.format(
self.alg_num, self.alg.timescale, self.time, self.triggered_det_names)
[docs]class SlidingWindowMethod:
"""Class for applying a sliding window trigger which identifies
transients by looking for windows where n detectors to exceed
a given significance threshold above background.
Parameters:
algorithms (dict): Dictionary defining the set of trigger
algorithms applied in the trigger method
background_window (int): Length of time in integer milliseconds
used to compute background counts
background_offset (int): Time offset in integer milliseconds between
the end of the trigger and background windows
resolution (int): The time resolution in integer milliseconds
of binned PHAII data used for the trigger
method. All other timescales (windows,
offsets) should be multiples of this value
channel_edges (list): List of channel edges used to define energy
bins for PHAII data
det_names (list): Names of required detector numbers for
the trigger method
n (int): Minimum number of detectors needed for trigger
verbose (float): Show progress bars when True
"""
def __init__(self, algorithms: dict, background_window: int,
background_offset: int, resolution: int,
channel_edges: list, det_names: list,
n: int = 1, verbose: bool = True):
self.n = n
self.verbose = verbose
self.resolution = resolution
self.algorithms = algorithms
self.channel_edges = channel_edges
self.det_names = det_names
if background_window % self.resolution != 0 or background_window <= 0:
raise ValueError("Background window must be an integer multiple of resolution in ms")
if background_offset % self.resolution != 0:
raise ValueError("Background offset must be an integer multiple of resolution in ms")
self.background_window = background_window
self.background_offset = background_offset
self.background_window_bins = self.background_window // self.resolution
self.background_offset_bins = self.background_offset // self.resolution
self.phaiis = None
@property
def algorithms(self):
"""(dict): Dictionary defining the set of trigger algorithms"""
return self._algorithms
@algorithms.setter
def algorithms(self, algorithms: dict):
"""Method to set the algorithms used when applying the trigger method.
Note: This performs sanity checks to ensure the algorithms are passed
as a dictionary and are matched to the minimum resolution used to bin
data files.
Args:
algorithms (dict): Dictionary defining the set of trigger
algorithms applied in the trigger method.
"""
if not isinstance(algorithms, dict):
raise ValueError("algorithms should be a dictionary object with the format int:TriggerAlgorithm()")
for alg_num, alg in algorithms.items():
if alg.timescale % self.resolution != 0:
raise ValueError("Trigger algorithm timescale must be an integer multiple of resolution in ms")
if alg.offset % self.resolution != 0:
raise ValueError("Trigger algorithm offset must be an integer multiple of resolution in ms")
self._algorithms = algorithms
[docs] def apply_trigger(self, holdoff: float = None, debug: bool = False):
"""Applies the set of trigger algorithms to prepared data
Args:
holdoff (float, optional): Trigger holdoff time in seconds.
Set to 300 sec to replicate GCN
trigger notices.
debug (bool, optional): Show debugging information when True
Returns:
(list of Trigger)
"""
if self.phaiis is None:
raise ValueError("Need to run prepare_data() method on TTE data before applying the trigger")
# pre-compute algorithm values in terms of the phaii bin resolution
bins = {}
for alg_num, alg in self.algorithms.items():
bins[alg_num] = {"window": alg.timescale // self.resolution,
"offset": alg.offset // self.resolution}
# loop over time bins and apply all trigger algorithms whose
# time windows end at the current time bin. This allows us to
# to compute the background counts once for all algorithms
# that need to be evaluated.
triggers = []
description = "Applying algorithms"
nbins = self.phaiis[0].data.size[0] # total number of time bins
for i in track(range(nbins), description=description, disable=not self.verbose):
# i represents the index of the last bin in the trigger window.
# the stop index when summing window bins should be this +1.
window_stop_index = i + 1
# calculate background window indices for this time, which starts at
# self.background_offset before the end of the trigger window
background_stop_index = window_stop_index - self.background_offset_bins
background_start_index = background_stop_index - self.background_window_bins
# skip cases where we can't fully form the background window
if background_start_index < 0:
continue
# compute background counts and exposure
# note: we'll update background counts during the algorithm loop if the energy channels change
background_channels = list(self.algorithms.values())[0].channels
background_counts, background_exposure = [], []
for phaii in self.phaiis:
background_counts.append(phaii.data.counts[background_start_index:background_stop_index,
background_channels[0]:background_channels[1]+1].sum())
background_exposure.append(phaii.data.exposure[background_start_index:background_stop_index].sum())
# loop to check trigger window counts against background for each algorithm
for alg_num, alg in self.algorithms.items():
window_bins = bins[alg_num]['window']
offset_bins = bins[alg_num]['offset']
window_start_index = window_stop_index - window_bins
# skip times with out-of-phase or incomplete trigger window
if ((window_start_index - offset_bins) % window_bins) != 0 or i < window_bins:
continue
# update background channels if they differ from current alg
if alg.channels != background_channels:
background_channels = alg.channels
background_counts = [phaii.data.counts[background_start_index:background_stop_index,
background_channels[0]:background_channels[1]+1].sum()
for phaii in self.phaiis]
# optional debugging info
if debug:
t0 = self.phaiis[0].data.tstart[0]
tstart = self.phaiis[0].data.tstart[window_start_index]
tstop = self.phaiis[0].data.tstop[window_stop_index - 1]
bkg_tstart = self.phaiis[0].data.tstart[background_start_index]
bkg_tstop = self.phaiis[0].data.tstop[background_stop_index - 1]
print(f"Evaluating Alg #{alg_num}: Timescale {alg.timescale*0.001:.3f} s, Offset {alg.offset*0.001:.3f} s")
print(f" Window ({tstart-t0:.3f} s, {tstop-t0:.3f}) s, Delta {tstop-tstart:.3f} s")
print(f" Background Window ({bkg_tstart-t0:.3f}, {bkg_tstop-t0:.3f}) s, Background Delta {bkg_tstop-bkg_tstart:.3f} s, Offset {tstop - bkg_tstop:.3f} s")
# sum phaii bins to determine counts and exposure within trigger window
counts, exposure = [], []
for phaii in self.phaiis:
counts.append(phaii.data.counts[window_start_index:window_stop_index,
alg.channels[0]:alg.channels[1]+1].sum())
exposure.append(phaii.data.exposure[window_start_index:window_stop_index].sum())
# compute significance under a simple Gaussian approximation.
# this ignores more rigorous statistical formulations like
# Li & Ma ApJ 272 (1983). This is Ok since the thresholds
# are empirically tuned to limit the false positive rate.
scaled_background = np.array(exposure) / np.array(background_exposure) * np.array(background_counts)
excess = np.array(counts) - scaled_background
sig = excess / np.sqrt(scaled_background)
triggered_det = sig > alg.threshold
if triggered_det.sum() >= self.n:
trigtime = self.phaiis[0].data.tstop[window_stop_index - 1]
triggers.append(Trigger(alg_num, alg, trigtime, sig, triggered_det, self.det_names))
if debug:
print("Found ", str(triggers[-1]))
if holdoff:
triggers = self.apply_holdoff(triggers, holdoff)
return triggers
[docs] def apply_holdoff(self, triggers: list, holdoff: float):
"""Apply a holdoff which ignores triggers occurring
within the holdoff timescale of a prior trigger.
Args:
triggers (list): List of Trigger objects
holdoff (float): Trigger holdoff time in seconds
Returns:
(list)
"""
# ensure we're sorted by trigger time
triggers.sort(key=lambda x: x.time)
# loop to select triggers outside the holdoff timescale of prior triggers
selected_triggers = []
for trigger in triggers:
if len(selected_triggers):
# replace existing trigger with the more significant one
if trigger.time == selected_triggers[-1].time and \
selected_triggers[-1].sig.max() < trigger.sig.max():
selected_triggers[-1] = trigger
# append new trigger
if trigger.time > selected_triggers[-1].time + holdoff:
selected_triggers.append(trigger)
else:
# always append the first trigger
selected_triggers.append(trigger)
return selected_triggers
[docs] def prepare_data(self, ttes: list, time_range: list = None):
"""Method to prepare TTE data into a PHAII format for use with
trigger algorithms.
Args:
ttes (list): List of Tte objects
time_range (list): List with [tstart, tstop]
"""
if len(ttes) != len(self.det_names):
raise ValueError("Mismatch between detector names and number of TTE files")
# ensure time range is within the range of provided data
if time_range is None:
time_range = list(ttes[0].time_range)
for tte in ttes:
if tte.time_range[0] > time_range[0]:
time_range[0] = tte.time_range[0]
if tte.time_range[1] < time_range[1]:
time_range[1] = tte.time_range[1]
self.phaiis = []
description = "Binning TTE into {} energy channels".format(len(self.channel_edges)-1)
for tte in track(ttes, description=description, disable=not self.verbose):
phaii = tte.to_phaii(bin_by_time, self.resolution * 0.001, time_ref=0, time_range=time_range)
phaii = phaii.rebin_energy(rebin_by_edge_index, np.array(self.channel_edges))
self.phaiis.append(phaii)
[docs] def waterfall_plot(self, triggers: list, **kwargs):
"""Create a waterfall plot showing all trigger windows as a function
of algorithm number versus time.
Args:
triggers (list): List of triggers
kwargs (optional): Optional arguments passed to rectangular
patches drawn for each trigger
Returns:
(list)
"""
# default plotting options
if not len(kwargs):
kwargs = {'facecolor':'C0', 'edgecolor':'b', 'linewidth':3}
rects = []
for trigger in triggers:
width = trigger.alg.timescale_in_unit(u.second).value
height = 1
xy = (trigger.time - width, trigger.alg_num - 0.5)
rects.append(plt.Rectangle(xy, width, height, **kwargs))
plt.gca().add_patch(rects[-1])
plt.xlabel("Time [s]")
plt.ylabel("Algorithm Number")
plt.gca().autoscale_view()
return rects
[docs] def lightcurve_plot(self, trigger: Trigger, time_range: tuple = None, detectors: list = None,
figsize: tuple = None, resolution: int = None, legend: bool = True):
"""Plot detector light curves with trigger overlay.
Args:
trigger (Trigger): The trigger to overlay
time_range (tuple): Start and end of time range to plot in seconds
detectors (list): List of detector numbers to plot
figsize (tuple): Dimensions of the figure in inches
resolution (int): Lightcurve resolution in integer milliseconds
legend (bool): Show legend when True
Returns:
(tuple)
"""
if self.phaiis is None:
raise ValueError("Need to run prepare_data() method on TTE data before applying the trigger")
# default margins
top = 0.9
bottom = 0.1
if detectors is None:
detectors = np.arange(trigger.triggered_det.size)
elif len(detectors) == 0:
raise ValueError("Detector size must be greater than zero")
if figsize is None:
topsize = 0.5
bottomsize = 1.0 if legend else 0.5
figsize = [12, 2 * len(detectors) + topsize + bottomsize]
# fix margins to prevent excessive white space
top = 1 - topsize / figsize[1]
bottom = bottomsize / figsize[1]
if resolution is None:
resolution = trigger.alg.timescale
if resolution < self.resolution:
raise ValueError(f"Lightcurve resolution cannot be less than PHAII resolution ({self.resolution} ms)")
if time_range is None:
time_range = (self.phaiis[0].data.tstart[0], self.phaiis[0].data.tstop[-1])
fig, axes = plt.subplots(len(detectors), 1, sharex=True, sharey=False,
figsize=figsize)
fig.subplots_adjust(hspace=0.06, wspace=0.15, top=top, bottom=bottom)
try:
axes[0]
except TypeError:
axes = [axes]
[ax.tick_params(axis='both', which='both', direction='in') for ax in axes]
axes[0].set_title(trigger)
bin_factor = resolution // self.resolution
channel_range = trigger.alg.channels
# convert resolution to seconds for future calculations
resolution *= 0.001
lcplots = []
for i, det_num in enumerate(detectors):
phaii = self.phaiis[det_num]
# match lightcurve start/stop to data range
tstart = max(time_range[0], phaii.data.tstart[0])
tstop = min(time_range[1], phaii.data.tstop[-1])
# ensure start/stop is a multiple of resolution from the trigger time
tstart = trigger.time + int((tstart - trigger.time)/resolution) * resolution
tstop = trigger.time + int((tstop - trigger.time)/resolution) * resolution
# match range to nearest bins to prevent float rounding
# issues when rebinning lightcurves by factor
tstart = phaii.data.tstart[np.fabs(tstart - phaii.data.tstart).argmin()]
tstop = phaii.data.tstop[np.fabs(tstop - phaii.data.tstop).argmin()]
# create lightcurve with the appropriate resolution and range
lc = phaii.to_lightcurve(channel_range=channel_range, time_range=(tstart, tstop))
lc = lc.rebin(combine_by_factor, bin_factor, tstart=tstart, tstop=tstop)
lcplots.append(Lightcurve(data=lc, ax=axes[i]))
# trigger window bounds and exposure
trigger_stop = trigger.time
trigger_start = trigger_stop - trigger.alg.timescale * 0.001
start_bin = np.fabs(trigger_start - phaii.data.tstart).argmin()
stop_bin = np.fabs(trigger_stop - phaii.data.tstop).argmin()
exposure = phaii.data.exposure[start_bin:stop_bin+1].sum()
# background window bounds and exposure / counts
background_stop = trigger.time - self.background_offset * 0.001
background_start = background_stop - self.background_window * 0.001
start_bin = np.fabs(background_start - phaii.data.tstart).argmin()
stop_bin = np.fabs(background_stop - phaii.data.tstop).argmin()
background_counts = phaii.data.counts[start_bin:stop_bin+1, channel_range[0]:channel_range[1]+1].sum()
background_exposure = phaii.data.exposure[start_bin:stop_bin+1].sum()
# trigger threshold
scaled_background = background_counts * exposure / background_exposure
threshold = scaled_background + trigger.alg.threshold * np.sqrt(scaled_background)
# plot the background fit and window
bg, = axes[i].plot([background_start, background_stop], 2 * [background_counts / background_exposure], color='r')
bg_span = axes[i].axvspan(background_start, background_stop, color='r', alpha=0.1)
# plot the trigger threshold and window
trig_thresh, = axes[i].plot([trigger_start - 0.5 * trigger.alg.timescale * 0.001,
trigger_stop + 0.5 * trigger.alg.timescale * 0.001],
2 * [threshold / exposure], color='b')
trig_win = axes[i].axvspan(trigger_start, trigger_stop, color='b', alpha=0.2)
if not trigger.triggered_det[det_num]:
axes[i].set_facecolor('#BEC0BF')
if i != len(detectors) - 1:
axes[i].set_xlabel("")
axes[i].minorticks_on()
# detector label
x = 0.01
y = 0.87
label = self.det_names[i] + int(trigger.triggered_det[det_num]) * " (triggered)"
axes[i].annotate(label, xycoords='axes fraction', xy=(x, y), zorder=1000)
# energy range label
emin = phaii.ebounds[channel_range[0]].emin
emax = phaii.ebounds[channel_range[1]].emax
axes[i].annotate('{0:3.0f}-{1:3.0f} keV'.format(emin, emax), xy=(1-x,y),
xycoords='axes fraction', horizontalalignment='right',
zorder=1000)
if legend:
handles = [
matplotlib.patches.Rectangle((0,0), 1, 1, color='r', alpha=0.1),
matplotlib.lines.Line2D([0, 1], [0, 1], color='r'),
matplotlib.patches.Rectangle((0,0), 1, 1, color='b', alpha=0.2),
matplotlib.lines.Line2D([0, 1], [0, 1], color='b')]
labels = [
"Background Window",
"Background Fit",
"Trigger Window",
"Trigger Threshold"]
legend_ax = fig.add_axes((0, 0, 1, bottom))
legend_ax.set_axis_off()
legend_ax.legend(handles, labels,
loc=(0.15, 0.0), ncols=4, frameon=False, fontsize=12)
return fig, axes, lcplots