Source code for gdt.core.background.unbinned

# CONTAINS TECHNICAL DATA/COMPUTER SOFTWARE DELIVERED TO THE U.S. GOVERNMENT WITH UNLIMITED RIGHTS
#
# Contract No.: CA 80MSFC17M0022
# Contractor Name: Universities Space Research Association
# Contractor Address: 7178 Columbia Gateway Drive, Columbia, MD 21046
#
# Copyright 2017-2022 by Universities Space Research Association (USRA). All rights reserved.
#
# Developed by: William Cleveland and Adam Goldstein
#               Universities Space Research Association
#               Science and Technology Institute
#               https://sti.usra.edu
#
# Developed by: Daniel Kocevski
#               National Aeronautics and Space Administration (NASA)
#               Marshall Space Flight Center
#               Astrophysics Branch (ST-12)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing permissions and limitations under the
# License.
#
import numpy as np
from scipy.interpolate import interp1d

__all__ = ['NaivePoisson']

[docs]class NaivePoisson(): """A class to estimate the background of unbinned data using the naive Poisson maximum likelihood. This method is approximately equivalent to sliding a window of fixed length through unbinned data and calculating the Poisson maximum likelihood for the rate. The rate estimate is applied to the center of the sliding window, therefore, the amount of data equivalent to half of the sliding window at the beginning and half of the window at the end of the data is a constant. Note: This naive approach assumes there is either no *strong* signal in the \ data, or the presence of a *weaker* signal has a duration much less \ than the window width of the sliding window. Parameters: counts (list of np.array): A list of length ``num_chans``, and each element of the list is an array of event times in that channel. """ def __init__(self, times): self._times = times self._numchans = len(times) #self._min_dt = min_dt self._window_width = None self._actual_widths = None self._rates = None self._rates_interp = None self._width_interp = None
[docs] def fit(self, window_width=100.0, fast=True): """Fit the data via Naive Poisson Maximum Likelihood. Args: window_width (float): The width of the sliding window in seconds. Note: If the range of the data is shorter than ``window_width``, the ``window_width`` will automatically be shortened to the range of the data. fast (bool): If True, then will use the fast approximation of the algorithm that allows the ``window_width`` to change throughout the data (the number of counts in the window is constant). If False, uses the exact algorithm with a fixed window, but is much slower. """ self._window_width = window_width self._actual_widths = [] self._rates = [] for i in range(self._numchans): if fast: r, u, w = self._fit_one_fast(i) else: r, u, w = self._fit_one_exact(i) self._rates.append(r) self._actual_widths.append(w) # note: uncertainty 'u' from the fit is unused since # we compute our own uncertainty during interpolate() self._rates_interp = [] self._width_interp = [] for i in range(self._numchans): if self._rates[i].size == 0: self._rates_interp.append(None) self._width_interp.append(None) continue if self._times[i].size-1 == self._rates[i].size: idx = 0 else: idx = 1 self._rates_interp.append( interp1d(self._times[i][idx:-1], self._rates[i], fill_value='extrapolate')) self._width_interp.append( interp1d(self._times[i][idx:-1], self._actual_widths[i], fill_value='extrapolate'))
[docs] def interpolate(self, tstart, tstop): """Interpolate the background at the given times Args: tstart (np.array): The start times of the bins to interpolate tstop (np.array): The end times of the bins to interpolate Returns: (np.array, np.array): The interpolated model value and model uncertainty in each bin """ times = (tstart + tstop) / 2.0 rates = [] uncert = [] for i in range(self._numchans): if self._rates_interp[i] is None: rates.append(np.zeros_like(times)) uncert.append(np.zeros_like(times)) continue r = self._rates_interp[i](times) r[r < 0.0] = 0.0 widths = self._width_interp[i](times) mask = (widths > 0.0) u = np.zeros_like(widths) u[mask] = np.sqrt(r[mask]/widths[mask]) uncert.append(u) rates.append(r) rates = np.array(rates).T uncert = np.array(uncert).T return rates, uncert
def _fit_one_exact(self, channel): """Fit a single channel of event data. This is the exact (not approximate) algorithm that uses a fixed window duration throughout the data, except where the window must be necessarily truncated at the ends of the data. This function is much slower than the approximate version. Args: channel (int): The energy channel Returns: (np.array, np.array, np.array): The background rates, uncertainty, and actual window width """ window_width = self._window_width events = self._times[channel] num_events = len(events) if num_events == 0: return (np.array([]), np.array([]), np.array([])) # get the different parts of the array # pre: before the full sliding window width # mid: during the full sliding window width # post: after the full sliding window width pre_mask = (events < events[0] + window_width / 2.0) post_mask = (events > events[-1] - window_width / 2.0) mid_mask = (~pre_mask & ~post_mask) # do the sliding window idx = np.sum(pre_mask) mid_bins = [ np.sum(np.abs(events - events[i + idx]) <= window_width / 2.0) \ for i in range(np.sum(mid_mask))] mid_rates = np.array(mid_bins) / window_width mid_uncert = np.sqrt(mid_rates / window_width) # now considering the pre- and post-full-window-width data: # assume a constant rate at either end, but shorten the window # appropriately pre_events = events[pre_mask] pre_dt = (pre_events - events[0]) pre_rates = np.full(np.sum(pre_mask) - 1, mid_rates[0]) pre_uncert = np.sqrt(pre_rates / pre_dt[1:]) idx = num_events - np.sum(post_mask) post_events = events[post_mask] post_dt = events[-1] - post_events post_rates = np.full(np.sum(post_mask) - 1, mid_rates[-1]) post_uncert = np.sqrt(post_rates / post_dt[:-1]) # put it all together brates = np.hstack((pre_rates, mid_rates, post_rates)) uncert = np.hstack((pre_uncert, mid_uncert, post_uncert)) dt = np.hstack((pre_dt[1:], np.full(np.sum(mid_mask), window_width), post_dt[:-1])) return brates, uncert, dt def _fit_one_fast(self, channel): """Fit a single channel of event data. This performs an approximation to the NaivePoisson algorithm to considerably speed up the computation. Instead of assuming a fixed window duration throughout the data, the window is initialized by determining the number of counts in the first window, and fixing the window width by the total number of counts in the data. This allows the window duration to change, and is similar to smoothing the data. For slowly varying data, this is a good approximation. Args: channel (int): The energy channel Returns: (np.array, np.array, np.array): The background rates, uncertainty, \ and actual window width """ window_width = self._window_width events = self._times[channel] num_events = len(events) if num_events == 0: return (np.array([]), np.array([]), np.array([])) # get extent of window at the start of the data end_idx = np.sum(events <= events[0] + window_width) # from the middle of the sliding window mid_idx = int(np.floor(end_idx / 2.0)) # number of 'bins' containing a count (= number of counts in window) full_bins = end_idx # number of 'bins' containing a count not in the window num_back_bins = num_events - end_idx array_back = np.arange(num_back_bins) # actual window widths and the rate dts = (events[end_idx + array_back] - events[array_back]) back_rates = full_bins / dts # Now we want to estimate the rate at the ends. This means that the # sliding window will truncate at either end until it's the width of # one event. The main concern is to calculate the width of the # truncated window correctly so that the uncertainty in the rate will # properly increase as the window width decreases pre_full_bins = np.arange(mid_idx) + 1 pre_dts = events[2 * (pre_full_bins[1:] - 1)] - events[0] # pre_back_rates = np.full(mid_idx-1, back_rates[0]) pre_back_rates = (2.0 * pre_full_bins[1:]) / pre_dts post_full_bins = pre_full_bins[::-1] post_dts = events[-1] - events[-1 - 2 * (post_full_bins[:-1] - 1)] post_back_rates = (2.0 * post_full_bins[:-1]) / post_dts # post_back_rates = np.full(mid_idx-1, back_rates[-1]) # put all of it together brates = np.hstack((pre_back_rates, back_rates, post_back_rates)) dts = np.hstack((pre_dts, dts, post_dts)) # this is if we ended up with an odd number of events if num_events - 2 > brates.shape[0]: brates = np.append(brates, brates[-1]) dts = np.append(dts, dts[-1]) # now the uncertainty uncert = np.sqrt(brates / dts) return (brates, uncert, dts)