Source code for gdt.missions.fermi.gbm.trigdat

# CONTAINS TECHNICAL DATA/COMPUTER SOFTWARE DELIVERED TO THE U.S. GOVERNMENT WITH UNLIMITED RIGHTS
#
# Contract No.: CA 80MSFC17M0022
# Contractor Name: Universities Space Research Association
# Contractor Address: 7178 Columbia Gateway Drive, Columbia, MD 21046
#
# Copyright 2017-2022 by Universities Space Research Association (USRA). All rights reserved.
#
# Developed by: William Cleveland and Adam Goldstein
#               Universities Space Research Association
#               Science and Technology Institute
#               https://sti.usra.edu
#
# Developed by: Daniel Kocevski
#               National Aeronautics and Space Administration (NASA)
#               Marshall Space Flight Center
#               Astrophysics Branch (ST-12)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing permissions and limitations under the
# License.
#
import os
import astropy.io.fits as fits
import astropy.units as u
import astropy.coordinates.representation as r
import numpy as np
import warnings

from ..frame import FermiFrame
from ..time import *
from .detectors import GbmDetectors
from .headers import TrigdatHeaders, PhaiiTriggerHeaders
from .phaii import GbmPhaii
from gdt.core.coords.quaternion import Quaternion
from gdt.core.data_primitives import TimeEnergyBins, TimeRange, Gti
from gdt.core.file import FitsFileContextManager

__all__ = ['Trigdat', 'MaxRates', 'BackRates', 'FswLocation']

# Map the classification numbers to the string names
classifications = {0: 'ERROR', 1: 'UNRELOC', 2: 'LOCLPAR', 3: 'BELOWHZ',
                   4: 'GRB', 5: 'SGR', 6: 'TRANSNT', 7: 'DISTPAR',
                   8: 'SFL', 9: 'CYGX1', 10: 'SGR1806', 11: 'GROJ422',
                   19: 'TGF', 20: 'UNCERT', 21: 'GALBIN'}
# localization spectra
spectrum = ['hard', 'normal', 'soft']


[docs]class Trigdat(FitsFileContextManager): """Class for the GBM Trigger Data """ def __init__(self): super().__init__() self._data = None self._trigrates = None self._maxrates = None self._backrates = None self._fsw_locations = None self._emin = np.array([3.4, 10.0, 22.0, 44.0, 95.0, 300., 500., 800.]) self._emax = np.array([10., 22.0, 44.0, 95.0, 300., 500., 800., 2000.]) self._time_range = None self._detectors = [det.name for det in GbmDetectors] self._trigtime = None @property def backrates(self): """(:class:`BackRates`): A BackRates object containing the info from the on-board background estimate""" return self._backrates @property def fsw_locations(self): """(list of :class:`FswLocation`): A list of flight-software-determined locations for the trigger""" return self._fsw_locations @property def gti(self): """(:class:`~gdt.core.data_primitives.Gti`): The Good Time Intervals""" return self._gti @property def maxrates(self): """(list of :class:`MaxRates`): A list of MaxRates objects, each containing maxrates info""" return self._maxrates @property def num_maxrates(self): """(int): The number of MaxRates issued by the flight software""" return len(self._maxrates) @property def poshist(self): """(:class:`~.poshist.GbmPosHist`): The position/attitude history""" return self._poshist @property def time_range(self): """(:class:`~gdt.core.data_primitives.TimeRange`): The time range of the data""" return self._time_range @property def triggered_detectors(self): """(list of str): The detectors that were triggered""" try: detmask = self.headers['PRIMARY']['DET_MASK'] detmask = np.array(list(detmask)).astype(int) == 1 except: return None if detmask.size == 14: return (np.array(self._detectors)[detmask]).tolist() elif detmask.size == 12: return (np.array(self._detectors[:-2])[detmask]).tolist() else: return None @property def trigrates(self): """(:class:`MaxRates`): The trigger information and rates""" return self._trigrates @property def trigtime(self): """(float): The trigger time""" return self._trigtime
[docs] @classmethod def open(cls, file_path, **kwargs): """Open and read a Trigdat file Args: file_path (str): The file path of the trigdat file Returns: (:class:`Trigdat`) """ obj = super().open(file_path, **kwargs) # get the headers hdrs = [hdu.header for hdu in obj.hdulist] obj._headers = TrigdatHeaders.from_headers(hdrs) obj._trigtime = obj._headers[0]['TRIGTIME'] # store trigrate, maxrates, backrates, and fsw location trigrate_data = obj.hdulist['TRIGRATE'].data if trigrate_data.size > 0: obj._trigrates = MaxRates.from_recarray(trigrate_data[0]) obj._maxrates = [MaxRates.from_recarray(maxrate) for maxrate in obj.hdulist['MAXRATES'].data] try: obj._backrates = BackRates.from_recarray(obj.hdulist['BCKRATES'].data[0]) except: warnings.warn('No BCKRATES data in this file.', RuntimeWarning) obj._fsw_locations = [FswLocation.from_recarray(ob_calc) \ for ob_calc in obj.hdulist['OB_CALC'].data] # store the data obj._data = obj.hdulist['EVNTRATE'].data obj._data.sort(order='TIME') # incorrectly shaped in the file, so we have to fix that here obj._rates = obj._data['RATE'].reshape(-1, 14, 8) # store the position history idx, dt = obj._time_indices(1024) eic = obj._data['EIC'][idx] quat = obj._data['SCATTITD'][idx] times = obj._data['TIME'][idx] obj._poshist = FermiFrame( obsgeoloc=r.CartesianRepresentation(x=eic[:,0], y=eic[:,1], z=eic[:,2], unit=u.km), quaternion=Quaternion(quat), obstime=Time(times, format='fermi'), detectors=GbmDetectors) obj._time_range = TimeRange(obj._data['ENDTIME'][0] - dt[0], obj._data['ENDTIME'][-1]) obj._gti = Gti.from_list(obj._gti_from_times(obj._data['TIME'], obj._data['ENDTIME'])) return obj
[docs] @classmethod def from_data(cls, phaii_list, poshist, trigtime, time_range=None, trigrate=None, maxrates=None, backrates=None, fswlocations=None, filename=None, headers=None): """Create a Trigdat object from a list of GbmPhaii objects and a FermiFrame object. Note: The list of PHAIIs does not need to include every detector, as missing detectors will be assumed to have zero rates. However, do not include more than one file from the same detector in the list. The PHAII list can be in any order. Args: phaii_list (list of :class:`~.phaii.GbmPhaii`): A list of GbmPhaii objects poshist (:class:`~gdt.core.coords.FermiFrame`): The spacecraft frame trigtime (float): The trigger time time_range (tuple, optional): The time range of the Trigdat. If not specified, uses the time range of the GbmPhaiis trigrate (:class:`MaxRates`, optional): The trigger rates object maxrates (list of :class:`MaxRates`, optional): The maxrates objects backrates (:class:`BackRates`, optional): The background rates object fswlocations (list of :class:`FswLocation`, optional): The flight software location objects Returns: (:class:`Trigdat`) """ # make sure phaii_list contains the correct objects and desired time # is contained within all files for phaii in phaii_list: if not isinstance(phaii, GbmPhaii): raise TypeError('phaii_list must be a list of GbmPhaii objects') phaii_time_range = TimeRange(*phaii.time_range) # make sure the poshist is a PosHist object and desired time is # contained within the file if not isinstance(poshist, FermiFrame): raise TypeError('poshist must be a FermiFrame object') poshist_time_range = TimeRange(poshist.obstime[0].fermi, poshist.obstime[-1].fermi) if not poshist_time_range.contains(trigtime): raise ValueError('{} does not contain the specified ' \ 'trigtime'.format(poshist)) # slice the files by time, if requested trange = (time_range[0]-trigtime, time_range[1]-trigtime) if time_range is not None: phaii_list = [phaii.slice_time(trange) for phaii in phaii_list] obj = cls() obj._poshist = poshist obj._trigtime = trigtime # create the data record array num_chans = phaii_list[0].num_chans num_dets = len(obj._detectors) num_bins = phaii_list[0].data.num_times dtypes = [('TIME', '>f8'), ('ENDTIME', '>f8'), ('SCATTITD', '>f4', (4,)), ('EIC', '>f4', (3,)), ('RATE', '>f4', (num_chans, num_dets))] obj._data = np.recarray((num_bins,), dtype=dtypes) tstart = phaii_list[0].data.tstart tstop = phaii_list[0].data.tstop tcent = phaii_list[0].data.time_centroids # interpolate poshist to match the bin edges as is standard fermi_times = Time(tstart+trigtime, format='fermi') obj._poshist = poshist.at(fermi_times) # to preserve the detector ordering assumed by the trigdat # datatype, we must pull the phaii file from the list that # corresponds with the right detector in our sequence. if the # detector isn't represented in the phaii_list, then the rates will # be zeros. det_mask = np.zeros(14, dtype=int) for i in range(num_bins): rates = np.zeros((14,8)) for j in range(len(obj._detectors)): try: det_idx = [phaii.detector for \ phaii in phaii_list].index(obj._detectors[j]) det_mask[j] = 1 except: continue rates[j,:] = phaii_list[det_idx].data.counts[i,:] * \ (1.024/phaii_list[det_idx].data.time_widths[i,np.newaxis]) quat = np.append(obj._poshist[i].quaternion.xyz, obj._poshist[i].quaternion.w) eic = obj._poshist.obsgeoloc[0].xyz.to('km').value obj._data[i] = (tstart[i]+trigtime, tstop[i]+trigtime, quat, eic, rates.T) obj._time_range = TimeRange(obj._data['TIME'][0], obj._data['ENDTIME'][-1]) obj._gti = Gti.from_list(obj._gti_from_times(obj._data['TIME'], obj._data['ENDTIME'])) # assign/create trigrates, maxrates, backrates, and fsw_locations if trigrate is not None: if not isinstance(trigrate, MaxRates): raise TypeError('trigrate must be a MaxRates object') obj._trigrates = trigrate else: obj._trigrates = MaxRates.create() if maxrates is not None: try: iter(maxrates) except: raise TypeError('maxrates must be a list of MaxRate objects') if any([not isinstance(m, MaxRates) for m in maxrates]): raise TypeError('maxrates must be a list of MaxRate objects') obj._maxrates = maxrates else: obj._maxrates = [MaxRates.create()] if backrates is not None: if not isinstance(backrates, BackRates): raise TypeError('backrates must be a BackRats object') obj._backrates = backrates else: obj._backrates = BackRates.create() if fswlocations is not None: try: iter(fswlocations) except: raise TypeError('fsw_locations must be a list of FswLocation ' \ 'objects') if any([not isinstance(f, FswLocation) for f in fswlocations]): raise TypeError('fsw_locations must be a list of FswLocation ' \ 'objects') obj._fsw_locations = fswlocations else: obj._fsw_locations = [FswLocation.create()] if filename is not None: obj._filename = str(filename) if headers is not None: if not isinstance(headers, TrigdatHeaders): raise ValueError('headers must be a TrigdatHeaders object') obj._headers = headers else: headers = TrigdatHeaders() pos = obj._poshist.scpos.interpolate(obj.trigtime) headers[0]['FILENAME'] = obj._filename for hdu in headers: try: hdu['TSTART'] = obj._data['TIME'][0] hdu['TSTOP'] = obj._data['ENDTIME'][-1] except: pass try: hdu['TRIGTIME'] = obj._trigtime except: pass try: hdu['GEO_LONG'] = pos.longitude.value hdu['GEO_LAT'] = pos.latitude.value except: pass try: hdu['DETTYPE'] = 'BOTH' except: pass obj._headers = headers obj._headers[0]['DET_MASK'] = ''.join(det_mask.astype(str)) return obj
[docs] def to_phaii(self, detector, timescale=1024): """Convert the data for a detector to a :class:`~.phaii.GbmPhaii` object. Note: A standard Trigdat will contain data on 8192, 1024, 256, and 62 ms timescales. A non-standard Trigdat can be created from other data types (see :meth:`Trigdat.from_data`) and therefore the timescales will likely be different and arbitrary. For that reason, the ``timescale`` keyword will be ignored for non-standard Trigdat. Args: detector (str): The detector to convert timescale (int, optional): The minimum timescale in ms of the data to return. Available options are 1024, 256, and 64. This is ignored if the trigdat has non-standard timescales. Returns: (:class:`~.phaii.GbmPhaii`) """ # check for valid detectors and timescale detector = detector.lower() if detector not in self._detectors: raise ValueError('Illegal detector name') # grab the correct detector rates (stored as 14 dets x 8 channels) det_idx = self._detectors.index(detector) if (timescale != 1024) and (timescale != 256) and (timescale != 64): raise ValueError('Illegal Trigdat resolution. Available resolutions: \ 1024, 256, 64') # return the requested timescales try: time_idx, dt = self._time_indices(timescale) except: warnings.warn('\nTrigdat.to_ctime(): ' \ 'Exporting from non-standard file. ' \ 'Timescale is ignored.') time_idx = np.arange(self._rates.shape[0]) dt = self._data['ENDTIME']-self._data['TIME'] # calculate counts and exposure counts = np.round(self._rates[time_idx, det_idx, :] * \ (dt[:, np.newaxis] / 1.024)) exposure = self._calc_exposure(counts, dt) # the 'TIME' and 'ENDTIME' edges are not aligned before being written to # file, so we must fix this to prevent TimeEnergyBins from thinking # every single bin is a single discontiguous segment tstop = self._data['ENDTIME'][time_idx] - self.trigtime tstart = self._fix_tstart(tstop, dt) # create the Time-Energy histogram bins = TimeEnergyBins(counts, tstart, tstop, exposure, self._emin, self._emax) headers = self._set_phaii_headers(detector, bins.num_chans) obj = GbmPhaii.from_data(bins, gti=self.gti, trigger_time=self.trigtime, headers=headers) return obj
[docs] def sum_detectors(self, detectors, timescale=1024): """Sum the data from a list of detectors and convert to a GbmPhaii object. The exposures from different detectors are averaged. Args: detectors (list of str): The detectors to sum timescale (int, optional): The minimum timescale in ms of the data to return. Available options are 1024, 256, and 64. Returns: (:class:`~.phaii.GbmPhaii`) """ # check for valid detectors and timescale for det in detectors: det = det.lower() if det not in self._detectors: raise ValueError('Illegal detector name') if (timescale != 1024) and (timescale != 256) and (timescale != 64): raise ValueError('Illegal Trigdat resolution. Available resolutions: \ 1024, 256, 64') try: time_idx, dt = self._time_indices(timescale) except: warnings.warn('\nTrigdat.to_ctime(): ' \ 'Exporting from non-standard file. ' \ 'Timescale is ignored.') time_idx = np.arange(self._rates.shape[0]) dt = self._data['ENDTIME']-self._data['TIME'] counts = None exposure = None for det in detectors: # grab the correct detector rates (stored as 14 dets x 8 channels) det_idx = self._detectors.index(det) # calculate counts c = np.round(self._rates[time_idx, det_idx, :] * \ (dt[:, np.newaxis] / 1.024)) if counts is None: counts = c else: counts += c # calculate exposure e = self._calc_exposure(c, dt) if exposure is None: exposure = e else: exposure += e exposure /= float(len(detectors)) # the 'TIME' and 'ENDTIME' edges are not aligned before being written to # file, so we must fix this to prevent TimeEnergyBins from thinking # every single bin is a single discontiguous segment tstop = self._data['ENDTIME'][time_idx] - self.trigtime tstart = self._fix_tstart(tstop, dt) # create the Time-Energy histogram bins = TimeEnergyBins(counts, tstart, tstop, exposure, self._emin, self._emax) det_str = '+'.join(detectors) headers = self._set_phaii_headers(det_str, bins.num_chans) obj = GbmPhaii.from_data(bins, gti=self.gti, trigger_time=self.trigtime, headers=headers) return obj
def _build_hdulist(self): # create FITS and primary header hdulist = fits.HDUList() primary_hdu = fits.PrimaryHDU(header=self.headers['PRIMARY']) for key, val in self.headers['PRIMARY'].items(): primary_hdu.header[key] = val hdulist.append(primary_hdu) # TRIGRATE extension trigrate_hdu = fits.BinTableHDU(data=self.trigrates.to_recarray(), name='TRIGRATE', header=self.headers['TRIGRATE']) for key, val in self.headers['TRIGRATE'].items(): trigrate_hdu.header[key] = val trigrate_hdu.header.comments['TTYPE1'] = 'Beginning of accumulation, ' \ 'calculated value' trigrate_hdu.header.comments['TTYPE2'] = 'End of accumulation, same as'\ ' PCKTTIME' trigrate_hdu.header.comments['TTYPE3'] = 'Spacecraft attitude ' \ 'quaternions' trigrate_hdu.header.comments['TTYPE4'] = 'Spacecraft position: Earth ' \ 'X, Y, & Z' trigrate_hdu.header.comments['TTYPE5'] = 'Rates-14 detectors, 8 channels' hdulist.append(trigrate_hdu) # BCKRATES extension backrates_hdu = fits.BinTableHDU(data=self.backrates.to_recarray(), name='BCKRATES', header=self.headers['BCKRATES']) for key, val in self.headers['BCKRATES'].items(): backrates_hdu.header[key] = val backrates_hdu.header.comments['TTYPE1'] = 'Start time of the ' \ 'background accumulation' backrates_hdu.header.comments['TTYPE2'] = 'Creation time of the ' \ 'background model' backrates_hdu.header.comments['TTYPE3'] = 'Quality Flag for the ' \ 'background model' backrates_hdu.header.comments['TTYPE4'] = 'Rates-14 detectors, 8 channels' hdulist.append(backrates_hdu) # OB_CALC extension obcalc_hdu = fits.BinTableHDU(data=np.array([loc.to_recarray() for \ loc in self.fsw_locations]), name='OB_CALC', header=self.headers['OB_CALC']) for key, val in self.headers['OB_CALC'].items(): obcalc_hdu.header[key] = val obcalc_hdu.header.comments['TTYPE1'] = 'End time of location rates ' \ 'accumulation' obcalc_hdu.header.comments['TTYPE2'] = 'Calculated Right Ascension of '\ 'source' obcalc_hdu.header.comments['TTYPE3'] = 'Calculated Declination of source' obcalc_hdu.header.comments['TTYPE4'] = 'Error radius of calculated' \ 'location' obcalc_hdu.header.comments['TTYPE5'] = 'Location algorithm used in ' \ 'calculation' obcalc_hdu.header.comments['TTYPE6'] = 'Two highest probable event ' \ 'classes' obcalc_hdu.header.comments['TTYPE7'] = 'Reliabilities of probable ' \ 'event classes' obcalc_hdu.header.comments['TTYPE8'] = 'Standardized count intensity' obcalc_hdu.header.comments['TTYPE9'] = 'Standard hardness ratio ' \ '(channels 3 / 4)' obcalc_hdu.header.comments['TTYPE10'] = 'Fluence counts in 3 + 4 ' \ 'energy band' obcalc_hdu.header.comments['TTYPE11'] = 'Significance of localization '\ 'in sigma' obcalc_hdu.header.comments['TTYPE12'] = 'Localization rates-12 NaI ' \ 'detectors' obcalc_hdu.header.comments['TTYPE13'] = 'Location data timescale' obcalc_hdu.header.comments['TTYPE14'] = 'Source azimuth with respect ' \ 'to Spacecraft' obcalc_hdu.header.comments['TTYPE15'] = 'Source zenith with respect to'\ ' Spacecraft' hdulist.append(obcalc_hdu) # MAXRATES extension maxrates_hdu = fits.BinTableHDU(data=np.array([mr.to_recarray() for \ mr in self.maxrates]), name='MAXRATES', header=self.headers['MAXRATES']) for key, val in self.headers['MAXRATES'].items(): maxrates_hdu.header[key] = val maxrates_hdu.header.comments['TTYPE1'] = 'Beginning of accumulation, ' \ 'calculated value' maxrates_hdu.header.comments['TTYPE2'] = 'End of accumulation, same as'\ ' PCKTTIME' maxrates_hdu.header.comments['TTYPE3'] = 'Spacecraft attitude ' \ 'quaternions' maxrates_hdu.header.comments['TTYPE4'] = 'Spacecraft position: Earth ' \ 'X, Y, & Z' maxrates_hdu.header.comments['TTYPE5'] = 'Rates-14 detectors, 8 channels' hdulist.append(maxrates_hdu) # EVNTRATE extension evntrate_hdu = fits.BinTableHDU(data=self._data, name='EVNTRATE', header=self.headers['EVNTRATE']) for key, val in self.headers['EVNTRATE'].items(): evntrate_hdu.header[key] = val evntrate_hdu.header.comments['TTYPE1'] = 'Beginning of accumulation, ' \ 'calculated value' evntrate_hdu.header.comments['TTYPE2'] = 'End of accumulation, same as'\ ' PCKTTIME' evntrate_hdu.header.comments['TTYPE3'] = 'Spacecraft attitude ' \ 'quaternions' evntrate_hdu.header.comments['TTYPE4'] = 'Spacecraft position: Earth ' \ 'X, Y, & Z' evntrate_hdu.header.comments['TTYPE5'] = 'Rates-14 detectors, 8 channels' hdulist.append(evntrate_hdu) return hdulist def _fix_tstart(self, tstop, dt): # this ensures that edge differences < 1 ms get fixed tstart = tstop - dt mask = (np.abs(tstart[1:] - tstop[:-1]) < 0.001) tstart[1:][mask] = tstop[:-1][mask] return tstart def _calc_exposure(self, counts, dt): """Calculate the exposure Args: counts (np.array): The observed counts in each bin dt (np.array): The time bin widths Returns: (np.array) """ deadtime = np.copy(counts) deadtime[:, :-1] *= 2.6e-6 # 2.6 microsec for each count deadtime[:, -1] *= 1e-5 # 10 microsec for each count in overflow total_deadtime = np.sum(deadtime, axis=1) exposure = (1.0 - total_deadtime) * dt return exposure def _time_indices(self, time_res): """Indices into the Trigdat arrays corresponding to the desired time resolution(s) Args: time_res (int): The time resolution in ms of the data Returns: (np.array, np.array): Indices into the trigdat arrays and the \ bin widths in seconds """ # bin widths dt = np.round((self._data['ENDTIME'] - self._data['TIME']) * 1000) # background bins back_idx = np.where(dt == 8192)[0] # 1 s scale bins - this is the minimum amount returned idx = np.where(dt == 1024)[0] cnt = len(idx) # reconcile 8 s and 1 s data idx = self._reconcile_timescales(back_idx, idx) # reconcile 8 s + 1 s and 256 ms data if time_res <= 256: tidx = np.where(dt == 256)[0] idx = self._reconcile_timescales(idx, tidx) # reconcile 8 s + 1 s + 256 ms and 64 ms data if time_res == 64: tidx = np.where(dt == 64)[0] idx = self._reconcile_timescales(idx, tidx) # return reconciled indices return idx, np.reshape(dt[idx] / 1000.0, len(idx)) def _reconcile_timescales(self, idx1, idx2): """Reconcile indices representing different timescales and glue them together to form a complete (mostly) continuous set of indices Args: idx1 (np.array): Indices of the "bracketing" timescale idx2 (np.array): Indices of the "inserted" timescale Returns: np.array: Indices of idx2 spliced into idx1 """ # if there is no idx1, then we only have to return idx2 and v.v. if idx1.size == 0: return idx2 if idx2.size == 0: return idx1 # bin edges for both selections start_times1 = self._data['TIME'][idx1] end_times1 = self._data['ENDTIME'][idx1] start_times2 = self._data['TIME'][idx2] end_times2 = self._data['ENDTIME'][idx2] # find where bracketing timescale ends and inserted timescale begins mask = end_times1 >= start_times2[0] if mask.sum(): start_idx = (np.where(mask))[0][0] idx = np.concatenate((idx1[0:start_idx], idx2)) else: idx = np.concatenate((idx1, idx2)) # find where inserted timescale ends and bracketing timescale begins again mask = start_times1 >= end_times2[-1] if mask.sum(): end_idx = (np.where(mask))[0][0] idx = np.concatenate((idx, idx1[end_idx:])) return idx def _gti_from_times(self, tstarts, tstops): """Estimate the GTI from the bin start and stop times. This may return multiple GTIs if several background packets are missing Args: tstarts (np.array): The start times of the bins tstops (np.array): The end times of the bins Returns: [(float, float), ...]: A list of time ranges """ tstart = tstarts[0] tstop = tstops[-1] dt = tstarts[1:] - tstops[:-1] idx = np.where(np.abs(dt) > 10.0)[0] if idx.sum() > 0: gti = [(tstart, tstops[idx[0] - 1]), (tstarts[idx[0]], tstop)] else: gti = [(tstart, tstop)] return np.array(gti) def _set_phaii_headers(self, det, num_chans): headers = PhaiiTriggerHeaders() try: detname = GbmDetectors.from_str(det).full_name except: detname = det headers[0]['DETNAM'] = detname headers[0]['DATE-OBS'] = self.headers[0]['DATE-OBS'] headers[0]['DATE-END'] = self.headers[0]['DATE-END'] headers[0]['TSTART'] = self.headers[0]['TSTART'] headers[0]['TSTOP'] = self.headers[0]['TSTOP'] headers[0]['TRIGTIME'] = self.trigtime headers[0]['OBJECT'] = self.headers[0]['OBJECT'] headers[0]['RA_OBJ'] = self.headers[0]['RA_OBJ'] headers[0]['DEC_OBJ'] = self.headers[0]['DEC_OBJ'] headers[0]['ERR_RAD'] = self.headers[0]['ERR_RAD'] headers[1]['DETNAM'] = detname headers[1]['DATE-OBS'] = self.headers[0]['DATE-OBS'] headers[1]['DATE-END'] = self.headers[0]['DATE-END'] headers[1]['TSTART'] = self.headers[0]['TSTART'] headers[1]['TSTOP'] = self.headers[0]['TSTOP'] headers[1]['TRIGTIME'] = self.trigtime headers[1]['OBJECT'] = self.headers[0]['OBJECT'] headers[1]['RA_OBJ'] = self.headers[0]['RA_OBJ'] headers[1]['DEC_OBJ'] = self.headers[0]['DEC_OBJ'] headers[1]['ERR_RAD'] = self.headers[0]['ERR_RAD'] headers[1]['DETCHANS'] = num_chans headers[2]['DETNAM'] = detname headers[2]['DATE-OBS'] = self.headers[0]['DATE-OBS'] headers[2]['DATE-END'] = self.headers[0]['DATE-END'] headers[2]['TSTART'] = self.headers[0]['TSTART'] headers[2]['TSTOP'] = self.headers[0]['TSTOP'] headers[2]['TRIGTIME'] = self.trigtime headers[2]['OBJECT'] = self.headers[0]['OBJECT'] headers[2]['RA_OBJ'] = self.headers[0]['RA_OBJ'] headers[2]['DEC_OBJ'] = self.headers[0]['DEC_OBJ'] headers[2]['ERR_RAD'] = self.headers[0]['ERR_RAD'] headers[2]['DETCHANS'] = num_chans headers[3]['DETNAM'] = detname headers[3]['DATE-OBS'] = self.headers[0]['DATE-OBS'] headers[3]['DATE-END'] = self.headers[0]['DATE-END'] headers[3]['TSTART'] = self.headers[0]['TSTART'] headers[3]['TSTOP'] = self.headers[0]['TSTOP'] headers[3]['TRIGTIME'] = self.trigtime headers[3]['OBJECT'] = self.headers[0]['OBJECT'] headers[3]['RA_OBJ'] = self.headers[0]['RA_OBJ'] headers[3]['DEC_OBJ'] = self.headers[0]['DEC_OBJ'] headers[3]['ERR_RAD'] = self.headers[0]['ERR_RAD'] return headers def __repr__(self): s = '<Trigdat: {};\n'.format(self.filename) s += ' trigtime={:.2f};'.format(self.trigtime) s += ' triggered detectors={}>'.format(self.triggered_detectors) return s
[docs]class MaxRates: """Class for the MAXRATES and TRIGRATE data in Trigdat. """ _dtypes = [('TIME', '>f8'), ('ENDTIME', '>f8'), ('SCATTITD', '>f4', (4,)), ('EIC', '>f4', (3,)), ('TRIGRATE', '>f4', (8,14))] def __init__(self): self._detectors = [det.name for det in GbmDetectors] self._time_range = (None, None) self._quats = None self._eic = None self._rates = None @property def all_rates(self): """(np.array): An array (:attr:`num_chans`, :attr:`num_dets`) of the maxrates""" return self._rates @property def eic(self): """(np.array): The position of Fermi in Earth inertial coordinates""" return self._eic @property def num_chans(self): """(int): The number of energy channels""" if self._rates is not None: return self._rates.shape[0] else: return 0 @property def num_dets(self): """(int): The number of detectors""" if self._rates is not None: return self._rates.shape[1] else: return 0 @property def quaternion(self): """ (np.array): The quaternions at the maxrates time""" return self._quats @property def time_range(self): """(float, float): The time range of the maxrates""" return self._time_range @property def timescale(self): """(float): The timescale of the maxrates""" if self.time_range[0] is not None and self.time_range[1] is not None: return round((self.time_range[1] - self.time_range[0]) * 1000.0) else: return None
[docs] @classmethod def create(cls, tstart=None, tstop=None, quaternion=None, eic=None, rates=None): """Create a MaxRates object from keywords. Args: tstart (float, optional): The start time of the MaxRates interval tstop (float, optional): The stop time of the MaxRates interval quaternion (np.array, optional): The attitude quaternion eic (np.array, optional): The position in earth inertial coordinates rates (np.array): The (num channels, num detectors) rates array Returns: (:class:`MaxRates`) """ obj = cls() obj._time_range = (tstart, tstop) if isinstance(quaternion, (list, tuple)): quaternion = np.array(quaternion) if quaternion is not None: if quaternion.size != 4: raise ValueError('quaternion must have 4 elements') obj._quats = quaternion if isinstance(eic, (list, tuple)): eic = np.array(eic) if eic is not None: if eic.size != 3: raise ValueError('eic must have 3 elements') obj._eic = eic if isinstance(rates, (list, tuple)): rates = np.array(rates) obj._rates = rates return obj
[docs] @classmethod def from_recarray(cls, rec_array): """Create from a FITS or numpy record array. Args: rec_array (np.recarray): The FITS TRIGRATE or MAXRATES record array from the trigdat file Returns: (:class:`MaxRates`) """ obj = cls() obj._time_range = (rec_array['TIME'], rec_array['ENDTIME']) obj._quats = rec_array['SCATTITD'] obj._eic = rec_array['EIC'] try: obj._rates = rec_array['TRIGRATE'] except: obj._rates = rec_array['MAXRATES'] return obj
[docs] def get_detector(self, det): """Retrieve the rates for a detector Args: det (str): The detector Returns: (np.array) """ if self._rates is None: return None mask = (np.array(self._detectors) == det) return self._rates[:, mask].squeeze()
[docs] def to_recarray(self): """Convert contents of the object to a numpy record array. Returns: (np.recarray) """ dtypes = self._dtypes dtypes[-1] = (*dtypes[-1][:-1], (self.num_chans, self.num_dets)) record = np.array([(*self._time_range, self._quats, self._eic, self._rates)], dtype=dtypes) return record
def __repr__(self): return '<MaxRates: {0} ms>'.format(self.timescale)
[docs]class BackRates: """Class for the background rates data in Trigdat. """ _dtypes = [('TIME', '>f8'), ('ENDTIME', '>f8'), ('QUALITY', 'u1', (2,)), ('BCKRATES', '>f4', (8,14))] def __init__(self): self._detectors = [det.name for det in GbmDetectors] self._time_range = (None, None) self._quality = np.array((0, 0)) self._rates = None @property def all_rates(self): """(np.array): An array (:attr:`num_chans`, :attr:`num_dets`) of the background rates""" return self._rates @property def num_chans(self): """(int): The number of energy channels""" if self._rates is not None: return self._rates.shape[0] else: return 0 @property def num_dets(self): """(int): The number of detectors""" if self._rates is not None: return self._rates.shape[1] else: return 0 @property def quality(self): """(int, int): The quality flags for the background""" return self._quality @property def time_range(self): """(float, float): The time range of the background rates""" return self._time_range
[docs] @classmethod def create(cls, tstart=None, tstop=None, quality=None, rates=None): """Create a BackRates object from keywords. Args: tstart (float, optional): The start time of the MaxRates interval tstop (float, optional): The stop time of the MaxRates interval quality (int, int): Quality flags rates (np.array): The (num channels, num detectors) rates array Returns: (:class:`BackRates`) """ obj = cls() obj._time_range = (tstart, tstop) if quality is not None: if not isinstance(quality, (list, tuple)): raise TypeError('quality must be a 2-tuple integer') if len(quality) != 2: raise ValueError('quality must be a 2-tuple integer') obj._quality = np.asarray(quality) if isinstance(rates, (list, tuple)): rates = np.array(rates) obj._rates = rates return obj
[docs] @classmethod def from_recarray(cls, rec_array): """Create from a FITS or numpy record array. Args: rec_array (np.recarray): The FITS BCKRATES record array from the trigdat file Returns: (:class:`BackRates`) """ obj = cls() obj._time_range = (rec_array['TIME'], rec_array['ENDTIME']) obj._quality = rec_array['QUALITY'] obj._rates = rec_array['BCKRATES'] return obj
[docs] def get_detector(self, det): """Retrieve the background rates for a detector Args: det (str): The detector Returns: np.array: An array of size (:attr:`num_chans`,) of background rates for the detector """ if self._rates is None: return None mask = (np.array(self._detectors) == det) return self._rates[:, mask].squeeze()
[docs] def to_recarray(self): """Convert contents of the object to a numpy record array Returns: (np.recarray) """ dtypes = self._dtypes dtypes[-1] = (*dtypes[-1][:-1], (self.num_chans, self.num_dets)) record = np.array([(*self._time_range, self._quality, self._rates)], dtype=dtypes) return record
def __repr__(self): return '<BackRates: time range={0}>'.format(self.time_range)
[docs]class FswLocation: """Class for the Flight Software localization information. """ _dtypes = [('TIME', '>f8'), ('RA', '>f4'), ('DEC', '>f4'), ('STATERR', '>f4'), ('LOCALG', '>i2'), ('EVTCLASS', '>i2', (2,)), ('RELIABLT', '>f4', (2,)), ('INTNSITY', '>f4'), ('HDRATIO', '>f4'), ('FLUENCE', '>f4'), ('SIGMA', '>f4'), ('LOCRATES', '>i2', (12,)), ('TRIG_TS', '>f4'), ('TR_SCAZ', '>f4'), ('TR_SCZEN', '>f4')] def __init__(self): self._time = None self._location = (None, None, None) self._algorithm = None self._class1 = (20, 1.0) self._class2 = (20, 0.0) self._intensity = None self._hardness = None self._fluence = None self._sigma = None self._rates = None self._timescale = None self._azzen = (None, None) @property def fluence(self): """(float): The fluence of the localization interval""" return self._fluence @property def hardness_ratio(self): """(float): The hardness ratio for the localization interval""" return self._hardness @property def intensity(self): """(float): The brightness of the signal""" return self._intensity @property def location(self): """(float, float, float): The RA, Dec, and statistical error of the onboard localization""" return self._location @property def location_sc(self): """(float, float): The localization in spacecraft coordinates: Azimuth, Zenith""" return self._azzen @property def next_classification(self): """(str, float): The next most likely classification of the trigger and the probability""" return self._class2 @property def rates(self): """(np.array): The rates in each NaI detector used for localization""" return self._rates @property def significance(self): """(float): The S/N ratio of the localization interval""" return self._sigma @property def spectrum(self): """(str): The spectrum used in the localization""" return self._algorithm @property def time(self): """(float): Time at which the localization was calculated""" return self._time @property def timescale(self): """(float): The localization interval timescale""" return self._timescale @property def top_classification(self): """(str, float): The most likely classification of the trigger and the probability""" return self._class1
[docs] @classmethod def create(cls, time=None, radec=(None, None), err=None, algorithm='normal', class1='UNCERT', class1_prob=1.0, class2='UNCERT', class2_prob=0.0, intensity=None, hardness=None, fluence=None, sigma=None, rates=None, timescale=None, azzen=(None, None)): """Create a FswLocation object from keywords Args: time (float, optional): The time of the localization radec ((float, float), optional): RA and Dec err (float, optional): The localization error algorithm (str, optional): The algorithm spectrum; one of 'hard', 'normal', or 'soft' class1 (str, optional): The primary classification class1_prob (float, optional): The primary classification probability class2 (str, optional): The secondary classification class2_prob (float, optional): The secondary classification probability intensity: (float, optional): Intensity of the signal in the localization interval hardness: (float, optional): The hardness ratio in the localization interval fluence: (float, optional): The fluence in the localization interval sigma: (float, optional): The S/N ratio of the localization interval rates: (np.array, optional): A 12-element array containing the rate in each NaI used for localization timescale (float, optional): The localization timescale azzen ((float, float), optional): Spacecraft azimuth and zenith of the localization Returns: (:class:`FswLocation`) """ obj = cls() obj._time = time if not isinstance(radec, (list, tuple)): raise TypeError('radec must be a 2-tuple float') if len(radec) != 2: raise ValueError('radec must be a 2-tuple float') obj._location = (*radec, err) if algorithm not in spectrum: raise ValueError('algorithm is not valid') obj._algorithm = algorithm if class1 not in classifications.values(): raise ValueError('class1 is not a valid class.') if class1_prob < 0.0 or class1_prob > 1.0: raise ValueError('class1_prob must be in range [0,1]') obj._class1 = (class1, class1_prob) if class2 not in classifications.values(): raise ValueError('class2 is not a valid class.') if class2_prob < 0.0 or class2_prob > 1.0 or \ class1_prob+class2_prob > 1.0: raise ValueError('class2_prob must be in range [0,1] and '\ 'class1_prob + class2_prob must be <= 1') obj._class2 = (class2, class2_prob) obj._intensity = intensity obj._hardness = hardness obj._fluence = fluence obj._sigma = sigma obj._timescale = timescale if rates is None: rates = np.zeros(12) else: rates = np.asarray(rates) if rates.size != 12: raise ValueError('rates must be a 12-element array') obj._rates = rates if not isinstance(azzen, (list, tuple)): raise TypeError('azzen must be a 2-tuple float') if len(azzen) != 2: raise ValueError('azzen must be a 2-tuple float') obj._azzen = azzen return obj
[docs] @classmethod def from_recarray(cls, rec_array): """Create from a FITS or numpy record array. Args: rec_array (np.recarray): The FITS OB_CALC record array from the trigdat file Returns: (:class:`FswLocation`) """ obj = cls() obj._time = rec_array['TIME'] obj._location = (rec_array['RA'], rec_array['DEC'], rec_array['STATERR']) obj._algorithm = spectrum[rec_array['LOCALG'] - 1] obj._class1 = (classifications[rec_array['EVTCLASS'][0]], rec_array['RELIABLT'][0]) obj._class2 = (classifications[rec_array['EVTCLASS'][1]], rec_array['RELIABLT'][1]) obj._intensity = rec_array['INTNSITY'] obj._hardness = rec_array['HDRATIO'] obj._fluence = rec_array['FLUENCE'] obj._sigma = rec_array['SIGMA'] obj._rates = rec_array['LOCRATES'] try: obj._timescale = rec_array['TRIG_TS'] except KeyError: pass try: obj._azzen = (rec_array['TR_SCAZ'], rec_array['TR_SCZEN']) except KeyError: pass return obj
[docs] def to_recarray(self): """Convert contents of the object to a numpy record array Returns: (np.recarray) """ dtypes = self._dtypes algorithm = spectrum.index(self.spectrum) class_nums = {v: k for k, v in classifications.items()} record = np.array([(self.time, *self.location, algorithm, (class_nums[self._class1[0]], class_nums[self._class2[0]]), (self._class1[1], self._class2[1]), self.intensity, self.hardness_ratio, self.fluence, self.significance, self.rates, self.timescale, *self.location_sc)], dtype=dtypes) return record
def __repr__(self): s = '<FswLocation: {0}: {1:.1f}%;\n'.format(self.top_classification[0], self.top_classification[1]*100.) try: s += ' RA={0:.1f}, Dec={1:.1f}, err={2:.1f}>'.format(*self.location) except: s += ' RA=None, Dec=None, err=None>' return s