Source code for gdt.core.pha

# CONTAINS TECHNICAL DATA/COMPUTER SOFTWARE DELIVERED TO THE U.S. GOVERNMENT WITH UNLIMITED RIGHTS
#
# Contract No.: CA 80MSFC17M0022
# Contractor Name: Universities Space Research Association
# Contractor Address: 7178 Columbia Gateway Drive, Columbia, MD 21046
#
# Copyright 2017-2022 by Universities Space Research Association (USRA). All rights reserved.
#
# Developed by: William Cleveland and Adam Goldstein
#               Universities Space Research Association
#               Science and Technology Institute
#               https://sti.usra.edu
#
# Developed by: Daniel Kocevski
#               National Aeronautics and Space Administration (NASA)
#               Marshall Space Flight Center
#               Astrophysics Branch (ST-12)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing permissions and limitations under the
# License.
#
import numpy as np
import astropy.io.fits as fits

from .data_primitives import Ebounds, Gti, EnergyBins
from .file import FitsFileContextManager
from .headers import Header, FileHeaders
from gdt.core.background.primitives import BackgroundSpectrum

__all__ = ['Pha', 'Bak']

# header information

_chantype_card = ('CHANTYPE', 'PHA', 'No corrections have been applied')
_detchans_card = ('DETCHANS', 0, 'Total number of channels in each rate')
_extname_card = ('EXTNAME', '', 'name of this binary table extension')
_filter_card = ('FILTER', '', 'The instrument filter in use (if any)')
_hduclass_card = ('HDUCLASS', 'OGIP', 'Conforms to OGIP standard indicated in HDUCLAS1')
_hduvers_card = ('HDUVERS', '1.2.1', 'Version of HDUCLAS1 format in use')
_trigtime_card = ('TRIGTIME', 0.0, 'Trigger time')
_telescope_card = ('TELESCOP', '', 'Name of mission/satellite')
_instrument_card = ('INSTRUME', '', 'Specific instrument used for observation')

class PrimaryHeader(Header):
    name = 'PRIMARY'
    keywords = [Header.creator(), 
                ('DATE', '', 'file creation date (YYYY-MM-DDThh:mm:ss UT)'),
                ('TSTART', 0.0, 'Observation start time'),
                ('TSTOP', 0.0, 'Observation stop time'),
                _trigtime_card,]

class EboundsHeader(Header):
    name = 'EBOUNDS'
    keywords = [_extname_card, _hduclass_card,
                ('HDUCLAS1', 'RESPONSE', 'These are typically found in RMF ' \
                                         'files'),
                ('HDUCLAS2', 'EBOUNDS', 'From CAL/GEN/92-002'), 
                _hduvers_card, _chantype_card, _filter_card, _detchans_card,
                ('CH2E_VER', '', 'Channel to energy conversion scheme used'),
                ('GAIN_COR', 0.0, 'Gain correction factor applied to energy ' \
                                  'edges')]

class PhaSpectrumHeader(Header):
    name = 'SPECTRUM'
    keywords = [_extname_card, _filter_card, _telescope_card, _instrument_card,
            ('AREASCAL', 1., 'No special scaling of effective area by channel'),
            ('BACKFILE', '', 'Name of corresponding background file (if any)'),
            ('BACKSCAL', 1., 'background file scaling factor'),
            ('CORRFILE', '', 'Name of corresponding correction file (if any)'),
            ('CORRSCAL', 1., 'Correction scaling file'),
            ('RESPFILE', '', 'Name of corresponding RMF file (if any)'),
            ('ANCRFILE', '', 'Name of corresponding ARF file (if any)'),
            ('SYS_ERR', 0., 'Systematic errors'),
            ('POISERR', True, 'Assume Poisson Errors'),
            ('GROUPING', 0, 'No special grouping has been applied'),
            _hduclass_card,
            ('HDUCLAS1', 'SPECTRUM', 'PHA dataset (OGIP memo OGIP-92-007)'),
            ('HDUCLAS2', 'TOTAL', 'Indicates gross data (source + background)'),
            ('HDUCLAS3', 'COUNT', 'Indicates data stored as counts'),
            ('HDUCLAS4', 'TYPEI', 'Indicates PHA Type I file format'),
            _hduvers_card, _chantype_card, _detchans_card, 
            ('EXPOSURE', 0.0, 'Accumulation time - deadtime')]

class BakSpectrumHeader(Header):
    name = 'SPECTRUM'
    keywords = [_extname_card, _filter_card, _telescope_card, _instrument_card,
            ('AREASCAL', 1., 'No special scaling of effective area by channel'),
            ('BACKFILE', '', 'Name of corresponding background file (if any)'),
            ('BACKSCAL', 1., 'background file scaling factor'),
            ('CORRFILE', '', 'Name of corresponding correction file (if any)'),
            ('CORRSCAL', 1., 'Correction scaling file'),
            ('RESPFILE', '', 'Name of corresponding RMF file (if any)'),
            ('ANCRFILE', '', 'Name of corresponding ARF file (if any)'),
            ('SYS_ERR', 0., 'Systematic errors'),
            ('POISERR', True, 'Assume Poisson Errors'),
            ('GROUPING', 0, 'No special grouping has been applied'),
            _hduclass_card,
            ('HDUCLAS1', 'SPECTRUM', 'PHA dataset (OGIP memo OGIP-92-007)'),
            ('HDUCLAS2', 'BKG', 'Background PHA Spectrum'),
            ('HDUCLAS3', 'RATE', 'PHA data stored as rates'),
            ('HDUCLAS4', 'TYPEI', 'Indicates PHA Type I file format'),
            _hduvers_card, _chantype_card, _detchans_card, 
            ('EXPOSURE', 0.0, 'Accumulation time - deadtime')]

class GtiHeader(Header):
    name = 'GTI'
    keywords = [_extname_card, _hduclass_card,
                ('HDUCLAS1', 'GTI', 'Indicates good time intervals'),
                _hduvers_card]

class PhaHeaders(FileHeaders):
    _header_templates = [PrimaryHeader(), PhaSpectrumHeader(), EboundsHeader(), 
                         GtiHeader()]

class BakHeaders(FileHeaders):
    _header_templates = [PrimaryHeader(), BakSpectrumHeader(), EboundsHeader(),
                         GtiHeader()]


[docs]class Pha(FitsFileContextManager): """PHA class for count spectra. """ def __init__(self): super().__init__() self._ebounds = None self._data = None self._gti = None self._trigtime = None self._channel_mask = None @property def channel_mask(self): """(np.array): A Boolean array representing the valid channels""" if self._channel_mask is None: return np.ones(self.num_chans, dtype=bool) else: return self._channel_mask @property def data(self): """(:class:`~.data_primitives.EnergyBins`): The PHA data""" return self._data @property def ebounds(self): """(:class:`~.data_primitives.Ebounds`): The energy bounds""" return self._ebounds @property def energy_range(self): """(float, float): The energy range of the spectrum""" if self._data is not None: lo = self._data.lo_edges[self._channel_mask][0] hi = self._data.hi_edges[self._channel_mask][-1] return (lo, hi) @property def exposure(self): """(float): The exposure of the PHA data""" return self._data.exposure[0] @property def gti(self): """(:class:`~.data_primitives.Gti`): The good time intervals""" return self._gti @property def num_chans(self): """(int): The number of energy channels""" return self._data.size @property def tcent(self): """(float): The center time of the data""" return sum(self.time_range) / 2.0 @property def time_range(self): """(float, float): The time range of the spectrum""" if self._gti is not None: return self._gti.range @property def trigtime(self): """(float): The trigger time of the data, if available""" return self._trigtime @property def valid_channels(self): """(np.array): The channel indices that are valid""" return np.arange(self.num_chans, dtype=int)[self.channel_mask]
[docs] def rebin_energy(self, method, *args, energy_range=(None, None), **kwargs): """Rebin the PHA in energy given a rebinning method. Args: method (<function>): The rebinning function *args: Arguments to be passed to the rebinning function energy_range ((float, float), optional): The starting and ending energy to rebin. If omitted, uses the full range of data. Setting start or end to ``None`` will use the data from the beginning or end of the data, respectively. Returns (:class:`Pha`) """ emin, emax = self._assert_range(energy_range) data = self.data.rebin(method, *args, emin=emin, emax=emax) headers = self._build_headers(self.trigtime, *self.time_range, data.size) pha = self.from_data(data, gti=self.gti, trigger_time=self.trigtime, headers=headers, **kwargs) return pha
[docs] def slice_energy(self, energy_ranges, **kwargs): """Slice the PHA by one or more energy ranges. Produces a new PHA object. Args: energy_ranges ([(float, float), ...]): The energy ranges to slice the data to. Returns: (:class:`Pha`) """ energy_ranges = self._assert_range_list(energy_ranges) data = [self.data.slice(*self._assert_range(energy_range)) \ for energy_range in energy_ranges] data = EnergyBins.merge(data) headers = self._build_headers(self.trigtime, *self.time_range, data.size) pha = self.from_data(data, gti=self.gti, trigger_time=self.trigtime, headers=headers, **kwargs) return pha
[docs] @classmethod def from_data(cls, data, gti=None, trigger_time=None, filename=None, headers=None, channel_mask=None, header_type=PhaHeaders, valid_channels=None, **kwargs): """Create a PHA object from an :class:`~.data_primitives.EnergyBins` object. Args: data (:class:`~.data_primitives.EnergyBins`): The PHA count spectrum data gti (:class:`~.data_primitives.Gti`), optional): The good time intervals of the pectrum data. If omitted, then assumes the range (0, exposure). trigger_time (float, optional): The trigger time, if applicable. If provided, the data times will be shifted relative to the trigger time. Default is zero. headers (:class:`~.headers.FileHeaders`): The file headers channel_mask (np.array(dtype=bool)): A boolean array representing the valid channels. If omitted, assumes all non-zero count channels are valid. header_type (:class:`~.headers.FileHeaders`): Default file header class. Only used if ``headers`` is not defined valid_channels (np.array(dtype=int)): An integer array indicating which channels are valid. This is overriden if `channel_mask` is set. Returns: (:class:`Pha`) """ obj = cls() obj._filename = filename # set data and ebounds if not isinstance(data, EnergyBins): raise TypeError('data must be of type EnergyBins') obj._data = data obj._ebounds = Ebounds.from_bounds(data.lo_edges, data.hi_edges) # set GTI if gti is not None: if not isinstance(gti, Gti): raise TypeError('gti must be of type Gti') else: gti = Gti.from_list([(0.0, data.exposure[0])]) obj._gti = gti # update times to be relative to ...if trigtime is set if trigger_time is not None: if trigger_time < 0.0: raise ValueError('trigger_time must be non-negative') obj._trigtime = trigger_time # set headers if headers is not None: if not isinstance(headers, FileHeaders): raise TypeError('headers must be of type FileHeaders') obj._headers = headers else: obj._headers = header_type() tstart, tstop = obj._gti.range obj._headers['PRIMARY']['TSTART'] = tstart obj._headers['PRIMARY']['TSTOP'] = tstop obj._headers['EBOUNDS']['DETCHANS'] = data.size obj._headers['SPECTRUM']['DETCHANS'] = data.size obj._headers['PRIMARY']['TRIGTIME'] = trigger_time obj._headers['SPECTRUM']['EXPOSURE'] = obj._data.exposure[0] if obj._headers != None: for key in obj._headers['SPECTRUM'].keys(): if obj._headers['SPECTRUM'][key] =='': if key in kwargs.keys(): obj._headers['SPECTRUM'][key] = kwargs[key] # set the channel mask if channel_mask is None: channel_mask = np.zeros(data.size, dtype=bool) # if no channel mask is given, check to see if there is a list of # valid channels if valid_channels is not None: if not isinstance(valid_channels, (np.ndarray, list, tuple, set)): raise TypeError('valid_channels must be an iterable') try: valid_channels = np.asarray(valid_channels).flatten().astype(int) except ValueError: raise ValueError('valid_channels must contain integer values') channel_mask[valid_channels] = True else: # otherwise assume zero-count channels are bad channel_mask[data.counts > 0] = True if not isinstance(channel_mask, (np.ndarray, list, tuple, set)): raise TypeError('channel_mask must be an iterable') try: channel_mask = np.asarray(channel_mask).flatten().astype(bool) except ValueError: raise ValueError('channel_mask must contain booleans') if channel_mask.size != obj._data.size: raise ValueError('channel_mask must be the same size as the ' \ 'number of data bins') obj._channel_mask = channel_mask return obj
[docs] @classmethod def open(cls, file_path, **kwargs): """Open a PHA FITS file and return the PHA object If this class is inherited, this method may be over-written if a non-standard file is being parsed, or if there is extra header information/data that needs to be stored. Args: file_path (str): The file path of the FITS file Returns: (:class:`Pha`) """ obj = super().open(file_path, **kwargs) trigtime = None spec_ext = obj.hdu_index_from_name('SPECTRUM') eb_ext = obj.hdu_index_from_name('EBOUNDS') gti_ext = obj.hdu_index_from_name('GTI') if gti_ext is None: gti_ext = obj.hdu_index_from_name('STDGTI') # in the event the SPECTRUM and EBOUNDS extensions are swapped if (eb_ext == 1) and (spec_ext == 2): PhaHeaders._header_templates = [PrimaryHeader(), EboundsHeader(), PhaSpectrumHeader(), GtiHeader()] else: PhaHeaders._header_templates = [PrimaryHeader(), PhaSpectrumHeader(), EboundsHeader(), GtiHeader()] # get the headers hdrs = [hdu.header for hdu in obj.hdulist] if (hdrs[spec_ext]['HDUCLAS2'] == 'TOTAL') \ or (hdrs[spec_ext]['HDUCLAS2'] == 'NET'): headers = PhaHeaders.from_headers(hdrs) elif hdrs[spec_ext]['HDUCLAS2'] == 'BKG': headers = BakHeaders.from_headers(hdrs) if 'TRIGTIME' in hdrs[0].keys(): trigtime = float(headers['PRIMARY']['TRIGTIME']) # get the data. exposure = headers[spec_ext]['EXPOSURE'] # we need to handle the case where the errors are not Poisson uncerts = None if 'STAT_ERR' in obj.get_column_names(spec_ext): uncerts = obj.column(spec_ext, 'STAT_ERR') # and the case where there are systematic errors frac_sys_err = None if 'SYS_ERR' in obj.get_column_names(spec_ext): frac_sys_err = obj.column(spec_ext, 'SYS_ERR') # data can either be counts... if headers[spec_ext]['HDUCLAS3'] == 'COUNT': counts = obj.column(spec_ext, 'COUNTS') if uncerts is None: uncerts = np.sqrt(counts) if frac_sys_err is not None: uncerts = np.sqrt(uncerts ** 2 + (counts * frac_sys_err) ** 2 ) data = EnergyBins(counts, obj.column(eb_ext, 'E_MIN'), obj.column(eb_ext, 'E_MAX'), exposure, count_uncerts=uncerts) # ...or rates... elif headers[spec_ext]['HDUCLAS3'] == 'RATE': rates = obj.column(spec_ext, 'RATE') if uncerts is None: uncerts = np.sqrt(rates * exposure) / exposure if frac_sys_err is not None: uncerts = np.sqrt(uncerts ** 2 + (rates * frac_sys_err) ** 2 ) data = EnergyBins.from_rates(rates, uncerts, obj.column(eb_ext, 'E_MIN'), obj.column(eb_ext, 'E_MAX'), exposure) # ...otherwise it is not a standard PHA file else: raise ValueError('Unable to determine PHA type. Looking for' \ '"COUNT" or "RATE" value for HDUCLAS3') # Quality flag indicates which channels are valid. # Valid: Quality = 0 # Invalid: Quality = 1 if 'QUALITY' in obj.get_column_names(spec_ext): channel_mask = (obj.column(spec_ext, 'QUALITY') == 0) else: channel_mask = None # GTI gti_start = obj.column(gti_ext, 'START') gti_stop = obj.column(gti_ext, 'STOP') if trigtime is not None: gti_start -= trigtime gti_stop -= trigtime gti = Gti.from_bounds(gti_start, gti_stop) obj._hdulist.close() # create object obj = cls.from_data(data, gti=gti, trigger_time=trigtime, filename=obj.filename, headers=headers, channel_mask=channel_mask) return obj
[docs] def write(self, directory, filename=None, rates=False, poisson_errs=False, **kwargs): self._hdulist = self._build_hdulist(rates=rates, poisson_errs=poisson_errs) super().write(directory, filename=filename, **kwargs)
def _assert_range(self, valrange): if valrange[0] is None and valrange[1] is None: return valrange assert valrange[0] <= valrange[1], \ 'Range must be in increasing order: (lo, hi)' return valrange def _assert_range_list(self, range_list): try: iter(range_list[0]) except: range_list = [tuple(range_list)] range_list = [self._assert_range(r) for r in range_list] return range_list
[docs] def _build_hdulist(self, rates=False, poisson_errs=False): """This builds the HDU list for the FITS file. If this class is inherited, this method may be over-written if a non-standard file is being written, or if there is extra header information/data that needs to be written. This method should construct each HDU (PRIMARY, SPECTRUM, EBOUNDS, GTI, etc.) containing the respective header and data. The HDUs should then be inserted into a HDUList and that list returned Args: rates (bool): Set to True to write out the data as rates instead of counts. Default is to write counts. poisson_errs (bool): Set to True to write with Poisson errors. Default is to *not* assume Poisson errors and create a STAT_ERRS column. Returns: (:class:`astropy.io.fits.HDUList`) """ # create FITS and primary header hdulist = fits.HDUList() primary_hdu = fits.PrimaryHDU(header=self.headers['PRIMARY']) for key, val in self.headers['PRIMARY'].items(): primary_hdu.header[key] = val hdulist.append(primary_hdu) # the spectrum extension spectrum_hdu = self._spectrum_table(rates, poisson_errs) hdulist.append(spectrum_hdu) # the ebounds extension ebounds_hdu = self._ebounds_table() hdulist.append(ebounds_hdu) # the GTI extension gti_hdu = self._gti_table() hdulist.append(gti_hdu) return hdulist
[docs] def _build_headers(self, trigtime, tstart, tstop, num_chans): """This builds the headers for the FITS file. If this class is inherited, this method may be over-written if there is extra header information that needs to be written. This method should construct the headers from the minimum required arguments and additional keywords and return a :class:`~.headers.FileHeaders` object. Args: trigtime (float or None): The trigger time. Set to None if no trigger time. tstart (float): The start time tstop (float): The stop time num_chans (int): Number of detector energy channels Returns: (:class:`~.headers.FileHeaders`) """ headers = self.headers.copy() headers['PRIMARY']['TRIGTIME'] = trigtime headers['PRIMARY']['TSTART'] = tstart headers['PRIMARY']['TSTOP'] = tstop headers['EBOUNDS']['DETCHANS'] = num_chans headers['SPECTRUM']['DETCHANS'] = num_chans return headers
def _ebounds_table(self): chan_col = fits.Column(name='CHANNEL', format='1I', array=np.arange(self.num_chans, dtype=int)) emin_col = fits.Column(name='E_MIN', format='1E', unit='keV', array=self.ebounds.low_edges()) emax_col = fits.Column(name='E_MAX', format='1E', unit='keV', array=self.ebounds.high_edges()) hdu = fits.BinTableHDU.from_columns([chan_col, emin_col, emax_col], header=self.headers['EBOUNDS']) for key, val in self.headers['EBOUNDS'].items(): hdu.header[key] = val return hdu def _spectrum_table(self, use_rates, is_poisson): chan_col = fits.Column(name='CHANNEL', format='1I', array=np.arange(self.num_chans, dtype=int)) if use_rates: data_col = fits.Column(name='RATE', format='E', unit='count/s', array=self.data.rates) else: data_col = fits.Column(name='COUNTS', format='J', bzero=32768, bscale=1, unit='count', array=self.data.counts) columns = [chan_col, data_col] if not is_poisson: if use_rates: staterr_col = fits.Column(name='STAT_ERR', format='E', unit='count/s', array=self.data.rate_uncertainty) else: staterr_col = fits.Column(name='STAT_ERR', format='E', unit='count', array=self.data.count_uncertainty) columns.append(staterr_col) qual_col = fits.Column(name='QUALITY', format='1I', array=(~self.channel_mask).astype(int)) columns.append(qual_col) hdu = fits.BinTableHDU.from_columns(columns, header=self.headers['SPECTRUM']) for key, val in self.headers['SPECTRUM'].items(): hdu.header[key] = val hdu.header['POISERR'] = is_poisson if not use_rates: hdu.header['HDUCLAS3'] = 'COUNT' hdu.header.comments['TZERO2'] = 'offset for unsigned integers' hdu.header.comments['TSCAL2'] = 'data are not scaled' else: hdu.header['HDUCLAS3'] = 'RATE' return hdu def _gti_table(self): tstart = np.array(self.gti.low_edges()) tstop = np.array(self.gti.high_edges()) if self.trigtime is not None: tstart += self.trigtime tstop += self.trigtime start_col = fits.Column(name='START', format='1D', unit='s', bzero=self.trigtime, array=tstart) stop_col = fits.Column(name='STOP', format='1D', unit='s', bzero=self.trigtime, array=tstop) hdu = fits.BinTableHDU.from_columns([start_col, stop_col], header=self.headers['GTI']) for key, val in self.headers['GTI'].items(): hdu.header[key] = val if self.trigtime is not None: hdu.header.comments['TZERO1'] = 'Offset, equal to TRIGTIME' hdu.header.comments['TZERO2'] = 'Offset, equal to TRIGTIME' return hdu def __repr__(self): s = '<{0}: '.format(self.__class__.__name__) if self.filename is not None: s += '{};'.format(self.filename) if self.trigtime is not None: s += '\n trigger time: {};'.format(self.trigtime) s += '\n time range {};'.format(self.time_range) s += '\n energy range {}>'.format(self.energy_range) return s
[docs]class Bak(Pha): """Class for a PHA background spectrum. """
[docs] @classmethod def from_data(cls, data, gti=None, trigger_time=None, filename=None, headers=None, channel_mask=None, **kwargs): """Create a BAK object from an :class:`~..background.background.BackgroundSpectrum` object. Args: data (:class:`~..background.background.BackgroundSpectrum`): The background spectrum data gti (:class:`~.data_primitives.Gti`), optional): The good time intervals of the pectrum data. If omitted, then assumes the range (0, exposure). trigger_time (float, optional): The trigger time, if applicable. If provided, the data times will be shifted relative to the trigger time. Default is zero. headers (:class:`~.headers.FileHeaders`): The file headers channel_mask (np.array(dtype=bool)): A boolean array representing the valid channels. If omitted, assumes all non-zero count channels are valid. Returns: (:class:`Bak`) """ return super().from_data(data, gti=gti, trigger_time=trigger_time, filename=filename, headers=headers, channel_mask=channel_mask, header_type=BakHeaders, **kwargs)
[docs] def rebin_energy(self, method, *args, emin=None, emax=None): """Not Implemented""" raise NotImplementedError('Function not available for BAK objects')
[docs] def slice_energy(self, method, *args, emin=None, emax=None): """Not Implemented""" raise NotImplementedError('Function not available for BAK objects')
def _spectrum_table(self, use_rates, is_poisson): chan_col = fits.Column(name='CHANNEL', format='1I', array=np.arange(self.num_chans, dtype=int)) if use_rates: data_col = fits.Column(name='RATE', format='1D', unit='count/s', array=self.data.rates) staterr_col = fits.Column(name='STAT_ERR', format='E', unit='count/s', array=self.data.rate_uncertainty) else: data_col = fits.Column(name='COUNTS', format='J', bzero=32768, bscale=1, unit='count', array=self.data.counts) staterr_col = fits.Column(name='STAT_ERR', format='E', unit='count', array=self.data.count_uncertainty) hdu = fits.BinTableHDU.from_columns([chan_col, data_col, staterr_col], header=self.headers['SPECTRUM']) for key, val in self.headers['SPECTRUM'].items(): hdu.header[key] = val return hdu