Source code for gdt.missions.fermi.gbm.scat

# CONTAINS TECHNICAL DATA/COMPUTER SOFTWARE DELIVERED TO THE U.S. GOVERNMENT WITH UNLIMITED RIGHTS
#
# Contract No.: CA 80MSFC17M0022
# Contractor Name: Universities Space Research Association
# Contractor Address: 7178 Columbia Gateway Drive, Columbia, MD 21046
#
# Copyright 2017-2022 by Universities Space Research Association (USRA). All rights reserved.
#
# Developed by: William Cleveland and Adam Goldstein
#               Universities Space Research Association
#               Science and Technology Institute
#               https://sti.usra.edu
#
# Developed by: Daniel Kocevski
#               National Aeronautics and Space Administration (NASA)
#               Marshall Space Flight Center
#               Astrophysics Branch (ST-12)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing permissions and limitations under the
# License.
#
import numpy as np
import astropy.io.fits as fits
from gdt.core.data_primitives import Parameter
from gdt.core.file import FitsFileContextManager
from gdt.core.spectra.parameters import *
from .headers import ScatHeaders

__all__ = ['Scat', 'GbmDetectorData', 'GbmModelFit']


[docs]class GbmModelFit(ModelFit): """A container for the info from a model fit, with values used in the GBM SCAT files. """ def __init__(self): super().__init__() self._photon_flux_50_300 = None self._energy_fluence_50_300 = None self._duration_fluence = None @property def duration_fluence(self): """(:class:`~gdt.spectra.parameters.EnergyFluence`): The energy fluence over the duration energy range, nominally 50-300 keV.""" return self._duration_fluence @duration_fluence.setter def duration_fluence(self, val): if not isinstance(val, Parameter): raise TypeError('duration_fluence must be of Parameter type') self._duration_fluence = val @property def energy_fluence_50_300(self): """(:class:`~gdt.spectra.parameters.EnergyFluence`): The energy fluence over 50-300 keV""" return self._energy_fluence_50_300 @energy_fluence_50_300.setter def energy_fluence_50_300(self, val): if not isinstance(val, Parameter): raise TypeError('energy_fluence_50_300 must be of Parameter type') self._energy_fluence_50_300 = val @property def photon_flux_50_300(self): """(:class:`~gdt.spectra.parameters.PhotonFlux`): The photon flux over 50-300 keV""" return self._photon_flux_50_300 @photon_flux_50_300.setter def photon_flux_50_300(self, val): if not isinstance(val, Parameter): raise TypeError('photon_flux_50_300 must be of Parameter type') self._photon_flux_50_300 = val
[docs] def to_fits_row(self): """Return the contained data as a FITS table row Returns: (astropy.io.fits.BinTableHDU): The FITS table """ numparams = len(self.parameters) cols = [] cols.append( fits.Column(name='TIMEBIN', format='2D', array=[self.time_range])) i = 0 for param in self.parameters: col = fits.Column(name='PARAM{0}'.format(i), format='3E', array=[param.to_fits_value()]) cols.append(col) i += 1 cols.append(fits.Column(name='PHTFLUX', format='3E', array=[self.photon_flux.to_fits_value()])) cols.append(fits.Column(name='PHTFLNC', format='3E', array=[self.photon_fluence.to_fits_value()])) cols.append(fits.Column(name='NRGFLUX', format='3E', array=[self.energy_flux.to_fits_value()])) cols.append(fits.Column(name='NRGFLNC', format='3E', array=[self.energy_fluence.to_fits_value()])) cols.append(fits.Column(name='PHTFLUXB', format='3E', array=[ self.photon_flux_50_300.to_fits_value()])) cols.append(fits.Column(name='DURFLNC', format='3E', array=[self.duration_fluence.to_fits_value()])) cols.append(fits.Column(name='NRGFLNCB', format='3E', array=[ self.energy_fluence_50_300.to_fits_value()])) cols.append(fits.Column(name='REDCHSQ', format='2E', array=[[self.stat_value / self.dof] * 2])) cols.append(fits.Column(name='CHSQDOF', format='1I', array=[self.dof])) cols.append(fits.Column(name='COVARMAT', format='{0}E'.format(int(numparams * numparams)), dim='({0},{0})'.format(numparams), array=[self.covariance])) hdu = fits.BinTableHDU.from_columns(cols, name='FIT PARAMS') return hdu
[docs] @classmethod def from_fits_row(cls, fits_row, model_name, param_names=None, flux_range=(10.0, 1000.0), dur_range=(50.0, 300.0)): """Read a FITS row and return a :class:`GbmModelFit` object Args: fits_row (np.recarray): The FITS row model_name (str): The model name param_names (list, optional): The list of parameter names flux_range (tuple, optional): The flux energy range (low, high) dur_range (tuple, optional): The duration energy range (low, high) Returns: (:class:`GbmModelFit`) """ time_range = tuple(fits_row['TIMEBIN']) nparams = sum([1 for name in fits_row.array.dtype.names if 'PARAM' in name]) if param_names is None: param_names = [''] * nparams params = [] for i in range(nparams): param = fits_row['PARAM' + str(i)] params.append(Parameter(param[0], tuple(param[1:]), name=param_names[i])) pflux = PhotonFlux(fits_row['PHTFLUX'][0], tuple(fits_row['PHTFLUX'][1:]), flux_range) pflnc = PhotonFluence(fits_row['PHTFLNC'][0], tuple(fits_row['PHTFLNC'][1:]), flux_range) eflux = EnergyFlux(fits_row['NRGFLUX'][0], tuple(fits_row['NRGFLUX'][1:]), flux_range) eflnc = EnergyFluence(fits_row['NRGFLNC'][0], tuple(fits_row['NRGFLNC'][1:]), flux_range) pfluxb = PhotonFlux(fits_row['PHTFLUXB'][0], tuple(fits_row['PHTFLUXB'][1:]), (50.0, 300.0)) eflncb = EnergyFluence(fits_row['NRGFLNCB'][0], tuple(fits_row['NRGFLNCB'][1:]), (50.0, 300.0)) durflnc = PhotonFluence(fits_row['DURFLNC'][0], tuple(fits_row['DURFLNC'][1:]), dur_range) dof = fits_row['CHSQDOF'] # scat provides the [fit stat, chisq], while bcat is only the fit stat try: stat_val = fits_row['REDCHSQ'][1] * dof except: stat_val = fits_row['REDCHSQ'] * dof num_params = len(params) covar = fits_row['COVARMAT'].reshape(num_params, num_params) obj = cls.from_data(model_name, time_range, parameters=params, photon_flux=pflux, photon_fluence=pflnc, energy_flux=eflux, energy_fluence=eflnc, flux_energy_range=flux_range, stat_value=stat_val, dof=dof, covariance=covar, photon_flux_50_300=pfluxb, energy_fluence_50_300=eflncb, duration_fluence=durflnc) return obj
[docs]class GbmDetectorData(DetectorData): """A container for the detector info, with values used in the GBM SCAT files. """
[docs] def to_fits_row(self): """Return the contained data as a FITS table row Returns: (astropy.io.fits.BinTableHDU) """ numchans = len(self.energy_edges) e_dim = str(numchans) + 'E' p_dim = str(numchans - 1) + 'E' p_unit = 'Photon cm^-2 s^-1 keV^-1' fit_int = '{0}: {1} s, '.format(self.time_range[0], self.time_range[1]) fit_int += '{0}: {1} keV, '.format(self.energy_range[0], self.energy_range[1]) fit_int += 'channels {0}: {1}'.format(self.channel_range[0], self.channel_range[1]) col1 = fits.Column(name='INSTRUME', format='20A', array=[self.instrument]) col2 = fits.Column(name='DETNAM', format='20A', array=[self.detector]) col3 = fits.Column(name='DATATYPE', format='20A', array=[self.datatype]) col4 = fits.Column(name='DETSTAT', format='20A', array=['INCLUDED' if self.active else 'OMITTED']) col5 = fits.Column(name='DATAFILE', format='60A', array=[self.filename]) col6 = fits.Column(name='RSPFILE', format='60A', array=[self.response]) col7 = fits.Column(name='FIT_INT', format='60A', array=[fit_int]) col8 = fits.Column(name='CHANNUM', format='1J', array=[numchans - 1]) col9 = fits.Column(name='FITCHAN', format='{}J'.format(numchans), array=[self.channel_mask]) col10 = fits.Column(name='E_EDGES', format=e_dim, unit='keV', array=[self.energy_edges]) col11 = fits.Column(name='PHTCNTS', format=p_dim, unit=p_unit, array=[self.photon_counts]) col12 = fits.Column(name='PHTMODL', format=p_dim, unit=p_unit, array=[self.photon_model]) col13 = fits.Column(name='PHTERRS', format=p_dim, unit=p_unit, array=[self.photon_errors]) hdu = fits.BinTableHDU.from_columns( [col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13], name='DETECTOR DATA') return hdu
[docs] @classmethod def from_fits_row(cls, fits_row): """Read a FITS row and return a GbmDetectorData object. Returns: (:class:`GbmDetectorData`) """ instrument = fits_row['INSTRUME'] det = fits_row['DETNAM'] datatype = fits_row['DATATYPE'] if fits_row['DETSTAT'] == 'INCLUDED': active = True else: active = False filename = fits_row['DATAFILE'] rspfile = fits_row['RSPFILE'] fit_ints = fits_row['FIT_INT'].split() time_range = (float(fit_ints[0][:-1]), float(fit_ints[1])) energy_range = (float(fit_ints[3][:-1]), float(fit_ints[4])) channel_range = (int(fit_ints[7][:-1]), int(fit_ints[8])) numchans = fits_row['CHANNUM'] channel_mask = np.zeros(numchans, dtype=bool) channel_mask[fits_row['FITCHAN'][0]:fits_row['FITCHAN'][1] + 1] = True energy_edges = fits_row['E_EDGES'] photon_counts = fits_row['PHTCNTS'] photon_model = fits_row['PHTMODL'] photon_errors = fits_row['PHTERRS'] obj = cls.from_data(instrument, det, datatype, numchans, filename=filename, active=active, response=rspfile, time_range=time_range, energy_range=energy_range, channel_range=channel_range, channel_mask=channel_mask, energy_edges=energy_edges, photon_counts=photon_counts, photon_model=photon_model, photon_errors=photon_errors) return obj
[docs]class Scat(FitsFileContextManager): """A container class for the spectral fit data in an SCAT (Spectroscopy CATalog) file. """ def __init__(self): super().__init__() self._detectors = [] self._model_fits = [] @property def detectors(self): """(list): The :class:`GbmDetectorData` objects used in the analysis""" return self._detectors @property def model_fits(self): """(list): The :class:`GbmModelFit` objects, one for each model fit""" return self._model_fits @property def num_detectors(self): """(int): The number of detectors in the SCAT file""" return len(self.detectors) @property def num_fits(self): """(int): The number of model fits""" return len(self.model_fits)
[docs] def add_detector_data(self, detector_data): """Add a new detector to the Scat Args: detector_data (:class:`GbmDetectorData`): The detector data """ if not isinstance(detector_data, GbmDetectorData): raise TypeError("Can only add GbmDetectorData objects") self._detectors.append(detector_data)
[docs] def add_model_fit(self, model_fit): """Add a new model fit to the Scat Args: model_fit (:class:`GbmModelFit`): The model fit data """ if not isinstance(model_fit, GbmModelFit): raise TypeError("Can only add GbmModelFit objects") self._model_fits.append(model_fit)
[docs] @classmethod def from_data(cls, detector_list, model_fits, headers=None, filename=None): """Create a Scat object from data. Args: detector_list (list of :class:`GbmDetectorData`): The detector info used in the fit model_fits (list of :class:`GbmModelFit`): The model fits headers (:class:`~.headers.ScatHeaders`, optional): The file headers filename (str, optional): The filename Returns: (:class:`Scat`) """ obj = cls() obj._filename = filename try: iter(detector_list) except: raise TypeError('detector_list must be a list of DetectorData objects') for det in detector_list: obj.add_detector_data(det) try: iter(model_fits) except: raise TypeError('model_fits must be a list of ModelFit objects') for model_fit in model_fits: obj.add_model_fit(model_fit) if not isinstance(headers, ScatHeaders): raise TypeError('headers must be a ScatHeaders object') obj._headers = headers return obj
[docs] @classmethod def open(cls, file_path, **kwargs): """Open a SCAT FITS file and create a Scat object. Args: file_path (str): The file path of the FITS file Returns: (:class:`Scat`) """ with super().open(file_path, **kwargs) as obj: hdrs = [hdu.header for hdu in obj.hdulist] headers = ScatHeaders.from_headers(hdrs) # read the detector data HDU det_list = [] for row in obj.hdulist['DETECTOR DATA'].data: det_list.append(GbmDetectorData.from_fits_row(row)) # read the fit params HDU model_fits = cls._from_fitparam_hdu(obj.hdulist['FIT PARAMS']) obj = cls.from_data(det_list, model_fits, headers=headers, filename=obj.filename) return obj
@staticmethod def _from_fitparam_hdu(fit_hdu): """format the data from the fitparam HDU so the GbmModelFit list can be created.""" fit_data = fit_hdu.data fit_hdr = fit_hdu.header nparams = fit_hdr['N_PARAM'] # find unique models in the event there are multiple components models = [fit_hdr.comments['TTYPE' + str(2 + i)].split(':')[0] for i in range(nparams)] model = '+'.join(list(set(models))) # the parameter names param_names = [ fit_hdr.comments['TTYPE' + str(2 + i)].split(':')[1].strip() for i in range(nparams)] # populate each model fit model_fits = [] for row in fit_data: modelfit = GbmModelFit.from_fits_row(row, model, param_names=param_names) modelfit.stat_name = fit_hdr['STATISTC'] model_fits.append(modelfit) return model_fits def _build_hdulist(self): # create FITS and primary header hdulist = fits.HDUList() primary_hdu = fits.PrimaryHDU(header=self.headers['PRIMARY']) for key, val in self.headers['PRIMARY'].items(): primary_hdu.header[key] = val hdulist.append(primary_hdu) # the detector data extension det_hdu = self._det_table() hdulist.append(det_hdu) # the model fit extension model_hdu = self._model_table() hdulist.append(model_hdu) return hdulist def _det_table(self): det_data = np.concatenate([det.to_fits_row().data \ for det in self.detectors]) adet = self.detectors[0].to_fits_row() cols = [] for column in adet.columns: col = fits.Column(name=column.name, format=column.format, unit=column.unit, array=det_data[column.name]) cols.append(col) det_hdu = fits.BinTableHDU.from_columns(cols, header=self.headers[1]) for key, val in self.headers[1].items(): det_hdu.header[key] = val det_hdu.header.comments['TTYPE1'] = 'Instrument name for this detector' det_hdu.header.comments['TTYPE2'] = \ 'Detector number; if one of several available' det_hdu.header.comments['TTYPE3'] = 'Data type used for this analysis' det_hdu.header.comments['TTYPE4'] = \ 'Was this detector INCLUDED or OMITTED' det_hdu.header.comments['TTYPE5'] = 'Data file name for this dataset' det_hdu.header.comments['TTYPE6'] = \ 'Response file name for this dataset' det_hdu.header.comments['TTYPE7'] = 'Fit intervals' det_hdu.header.comments['TTYPE8'] = \ 'Total number of energy channels for this detector' det_hdu.header.comments['TTYPE9'] = \ 'Channels selected in fitting this detector' det_hdu.header.comments['TTYPE10'] = \ 'Energy edges for each selected detector' det_hdu.header.comments['TTYPE11'] = 'Array of photon counts data' det_hdu.header.comments['TTYPE12'] = 'Array of photon model data' det_hdu.header.comments['TTYPE13'] = \ 'Array of errors in photon counts data' det_hdu.header['NUMFITS'] = (self.num_fits, 'Number of spectral fits in the data') return det_hdu def _model_table(self): model_data = np.concatenate([model.to_fits_row().data \ for model in self.model_fits]) amodel = self.model_fits[0].to_fits_row() cols = [] for column in amodel.columns: col = fits.Column(name=column.name, format=column.format, unit=column.unit, array=model_data[column.name]) cols.append(col) model_hdu = fits.BinTableHDU.from_columns(cols, header=self.headers[2]) for key, val in self.headers[2].items(): model_hdu.header[key] = val e_range = self.model_fits[0].flux_energy_range model_name = self.model_fits[0].name param_names = self.model_fits[0].parameter_list() numparams = len(param_names) statistic = self.model_fits[0].stat_name g_range = '({0}-{1} keV)'.format(e_range[0], e_range[1]) b_range = '(50-300 keV)' model_hdu.header.comments['TTYPE1'] = \ 'Start and stop times relative to trigger' for i in range(numparams): colname = 'TTYPE{0}'.format(i + 2) model_hdu.header.comments[colname] = '{0}: {1}'.format(model_name, param_names[i]) ttypes = ['TTYPE' + str(numparams + 2 + i) for i in range(10)] model_hdu.header.comments[ttypes[0]] = \ 'Photon Flux (ph/s-cm^2) std energy ' + g_range model_hdu.header.comments[ttypes[1]] = \ 'Photon Fluence (ph/cm^2) std energy ' + g_range model_hdu.header.comments[ttypes[2]] = \ 'Energy Flux (erg/s-cm^2) std energy ' + g_range model_hdu.header.comments[ttypes[3]] = \ 'Energy Fluence (erg/cm^2) std energy ' + g_range model_hdu.header.comments[ttypes[4]] = \ 'Photon Flux (ph/s-cm^2) BATSE energy ' + b_range model_hdu.header.comments[ttypes[5]] = \ 'Photon Fluence (ph/cm^2) BATSE energy ' + b_range model_hdu.header.comments[ttypes[6]] = \ 'Energy Fluence (erg/cm^2) BATSE energy ' + b_range model_hdu.header.comments[ttypes[7]] = \ 'Reduced Chi^2 (1) and fitting statistic (2)' model_hdu.header.comments[ttypes[8]] = 'Degrees of Freedom' model_hdu.header.comments[ttypes[9]] = \ 'Covariance matrix for the fir (N_PARAM^2)' model_hdu.header['N_PARAM'] = (numparams, 'Total number of fit parameters (PARAMn)') model_hdu.header['FLU_LOW'] = (e_range[0], 'Lower limit of flux/fluence integration (keV)') model_hdu.header['FLU_HIGH'] = (e_range[1], 'Upper limit of flux/fluence integration (keV)') model_hdu.header['STATISTC'] = (statistic, 'Indicates merit function used for fitting') model_hdu.header['NUMFITS'] = (self.num_fits, 'Number of spectral fits in the data') return model_hdu def __repr__(self): s = '<{0}: {1};\n'.format(self.__class__.__name__, self.filename) s += ' {0} detectors; {1} fits>'.format(self.num_detectors, self.num_fits) return s