Source code for flamedisx.lxe_blocks.energy_spectrum

from multihist import Histdd
import numpy as np
import pandas as pd
import tensorflow as tf
import wimprates as wr

import flamedisx as fd
export, __all__ = fd.exporter()
o = tf.newaxis


[docs]@export class EnergySpectrum(fd.FirstBlock): dimensions = ('energy',) extra_dimensions = () model_attributes = ( 'energies', 'fv_radius', 'fv_high', 'fv_low', 'drift_velocity', 't_start', 't_stop') #: Maximum radius at which events are allowed in cm #: Fiducial volume defaults to full (2t) XENON1T dimensions. fv_radius = 47.9 # cm #: Maximum z value (-depth) at which events are allowed in cm fv_high = 0. # cm #: Minimum z value (-depth) at which events are allowed in cm #: Fiducial volume defaults to full (2t) XENON1T dimensions. fv_low = -97.6 # cm #: Electron drift velocity in cm/ns drift_velocity = 1.335 * 1e-4 #: Earliest time at which events are allowed, datetime.datetime #: The default time boundaries are one year apart, starting and ending at #: Sept. 1, when the WIMP speed is average. #: The WIMP speed is also average at the halfway point. t_start = pd.to_datetime('2019-09-01T08:28:00') #: Last time at which events are allowed, datetime.datetime t_stop = pd.to_datetime('2020-09-01T08:28:00') #: Tensor listing energies this source can produce. #: Approximate the energy spectrum as a sequence of delta functions. energies = tf.cast(tf.linspace(0., 10., 1000), dtype=fd.float_type())
[docs] def domain(self, data_tensor): assert isinstance(self.energies, tf.Tensor) # see WIMPsource for why return {self.dimensions[0]: tf.repeat(fd.np_to_tf(self.energies)[o, :], self.source.batch_size, axis=0)}
[docs] def draw_positions(self, n_events, **params): """Return dictionary with x, y, z, r, theta, drift_time randomly drawn. """ data = dict() data['r'] = (np.random.rand(n_events) * self.fv_radius**2)**0.5 data['theta'] = np.random.uniform(0, 2*np.pi, size=n_events) data['z'] = np.random.uniform(self.fv_low, self.fv_high, size=n_events) data['x'], data['y'] = fd.pol_to_cart(data['r'], data['theta']) data['drift_time'] = - data['z'] / self.drift_velocity return data
[docs] def draw_time(self, n_events, **params): """Return n_events event_times drawn uniformaly between t_start and t_stop""" return np.random.uniform( self.t_start.value, self.t_stop.value, size=n_events)
[docs] def random_truth(self, n_events, fix_truth=None, **params): """Return pandas dataframe with event positions and times randomly drawn. """ data = self.draw_positions(n_events, **params) data['event_time'] = self.draw_time(n_events, **params) # The energy spectrum is represented as a 'comb' of delta functions # in the differential rate calculation. To get simulator-data agreement, # we have to do the same thing here. # TODO: can we fix both to be better? spectrum_numpy = fd.tf_to_np(self.rates_vs_energy) assert len(spectrum_numpy) == len(self.energies), \ "Energies and spectrum have different length" data['energy'] = np.random.choice( fd.tf_to_np(self.energies), size=n_events, p=spectrum_numpy / spectrum_numpy.sum(), replace=True) assert np.all(data['energy'] >= 0), "Generated negative energies??" # For a constant-shape spectrum, fixing truth values is easy: # we just overwrite the simulated values. # Fixing one does not constrain any of the others. self.source._overwrite_fixed_truths(data, fix_truth, n_events) data = pd.DataFrame(data) return data
[docs] def validate_fix_truth(self, d): """Clean fix_truth, ensure all needed variables are present Compute derived variables. """ if d is None: return dict() elif isinstance(d, pd.DataFrame): # This is useful, since it allows you to fix_truth with an # observed event. # When passing in an event as DataFrame we select and set # only these columns: cols = ['x', 'y', 'z', 'r', 'theta', 'event_time', 'drift_time'] # Assume fix_truth is a one-line dataframe with at least # cols columns return d[cols].iloc[0].to_dict() else: assert isinstance(d, dict), \ "fix_truth needs to be a DataFrame, dict, or None" if 'z' in d: # Position is fixed. Ensure both Cartesian and polar coordinates # are available, and compute drift_time from z. if 'x' in d and 'y' in d: d['r'], d['theta'] = fd.cart_to_pol(d['x'], d['y']) elif 'r' in d and 'theta' in d: d['x'], d['y'] = fd.pol_to_cart(d['r'], d['theta']) else: raise ValueError("When fixing position, give (x, y, z), " "or (r, theta, z).") d['drift_time'] = - d['z'] / self.drift_velocity elif 'event_time' not in d and 'energy' not in d: # Neither position, time, nor energy given raise ValueError(f"Dict should contain at least ['x', 'y', 'z'] " "and/or ['r', 'theta', 'z'] and/or 'event_time' " f"and/or 'energy', but it contains: {d.keys()}") return d
[docs] def _compute(self, data_tensor, ptensor, **kwargs): raise NotImplementedError
[docs] def mu_before_efficiencies(self, **params): raise NotImplementedError
[docs]@export class FixedShapeEnergySpectrum(EnergySpectrum): """For a source whose energy spectrum has the same shape throughout space and time. If you add a rate variation with space, you must override draw_positions and mu_before_efficiencies. If you add a rate variation with time, you must override draw_times. If you add a rate variation depending on both space and time, you must override all of random_truth! By default, this uses a flat 0 - 10 keV spectrum, sampled at 1000 points. """ model_attributes = ('rates_vs_energy',) + EnergySpectrum.model_attributes model_functions = ('energy_spectrum_rate_multiplier',) #: Tensor listing the number of events for each energy the souce produces #: Recall we approximate energy spectra by a sequence of delta functions. rates_vs_energy = tf.ones(1000, dtype=fd.float_type()) #: Model function describing a rate multiplier to the energy spectrum. #: You probably have to update random_truth when modifying this! energy_spectrum_rate_multiplier = 1.
[docs] def _compute(self, data_tensor, ptensor, *, energy): spectrum = tf.repeat(self.rates_vs_energy[o, :], self.source.batch_size, axis=0) rate_multiplier = self.gimme('energy_spectrum_rate_multiplier', data_tensor=data_tensor, ptensor=ptensor) return spectrum * rate_multiplier[:, o]
[docs] def mu_before_efficiencies(self, **params): return np.sum(fd.np_to_tf(self.rates_vs_energy))
[docs]@export class SpatialRateEnergySpectrum(FixedShapeEnergySpectrum): model_attributes = (('spatial_hist',) + FixedShapeEnergySpectrum.model_attributes) frozen_model_functions = ('energy_spectrum_rate_multiplier',) #: multihist.Histdd of events/bin produced by this source. #: Axes can be either (r, theta, z) or (x, y, z). #: Do not apply any normalization yourself, flamedisx will multiply by #: appropriate physical bin volume factors. spatial_hist: Histdd
[docs] def setup(self): assert isinstance(self.spatial_hist, Histdd) # Are we Cartesian, polar, or in trouble? axes = tuple(self.spatial_hist.axis_names) self.polar = (axes == ('r', 'theta', 'z')) self.bin_volumes = self.spatial_hist.bin_volumes() if self.polar: # Volume element in cylindrical coords = r * (dr dq dz) self.bin_volumes *= self.spatial_hist.bin_centers('r')[:, None, None] else: assert axes == ('x', 'y', 'z'), \ ("axis_names of spatial_rate_hist must be either " "or ['r', 'theta', 'z'] or ['x', 'y', 'z']") # Normalize the histogram self.spatial_hist.histogram = \ self.spatial_hist.histogram.astype(np.float) / self.spatial_hist.n # Local rate multiplier = PDF / uniform PDF # = ((normed_hist/bin_volumes) / (1/total_volume)) self.local_rate_multiplier = self.spatial_hist.similar_blank_hist() self.local_rate_multiplier.histogram = ( (self.spatial_hist.histogram / self.bin_volumes) * self.bin_volumes.sum())
[docs] def energy_spectrum_rate_multiplier(self, x, y, z): if self.polar: positions = list(fd.cart_to_pol(x, y)) + [z] else: positions = [x, y, z] return self.local_rate_multiplier.lookup(*positions)
[docs] def draw_positions(self, n_events, **params): """Return dictionary with x, y, z, r, theta, drift_time drawn from the spatial rate histogram. """ data = dict() positions = self.spatial_hist.get_random(size=n_events) for idx, col in enumerate(self.spatial_hist.axis_names): data[col] = positions[:, idx] if self.polar: data['x'], data['y'] = fd.pol_to_cart(data['r'], data['theta']) else: data['r'], data['theta'] = fd.cart_to_pol(data['x'], data['y']) data['drift_time'] = - data['z'] / self.drift_velocity return data
## # Variable spectra ##
[docs]@export class VariableEnergySpectrum(EnergySpectrum): """For a source for which the entire energy spectrum (not just the rate) depends on observables (e.g. reconstruction position or time). You must implement both energy_spectrum and random_truth. Note that you cannot draw homogeneous positions/times first, then energies, if your energy spectrum depends on positions/time. """ model_functions = ('energy_spectrum',)
[docs] def energy_spectrum(self, event_time): # Note this returns a 2d tensor! return tf.ones(len(event_time), len(self.energies), dtype=fd.float_type())
[docs] def _compute(self, data_tensor, ptensor, *, energy): return self.gimme('energy_spectrum', data_tensor=data_tensor, ptensor=ptensor)
[docs] def random_truth(self, n_events, fix_truth=None, **params): raise NotImplementedError
[docs] def mu_before_efficiencies(self, **params): raise NotImplementedError
[docs]@export class InvalidEventTimes(Exception): pass
[docs]@export class WIMPEnergySpectrum(VariableEnergySpectrum): model_attributes = ('pretend_wimps_dont_modulate', 'mw', 'sigma_nucleon', 'n_time_bins', 'energy_edges') + VariableEnergySpectrum.model_attributes #: If set to True, the energy spectrum at each time will be set to its #: average over the data taking period. pretend_wimps_dont_modulate = False #: WIMP Mass in GeV/c^2 mw = 1e3 #: WIMP-nucleon cross-section in cm^2 sigma_nucleon = 1e-45 #: Number of time bins to use for annual modulation computation n_time_bins = 24 #: Bin *edges* to use for energy histogram. Centers of the bins correspond #: to allowed energies. energy_edges = np.geomspace(0.7, 50, 100) frozen_model_functions = ('energy_spectrum',) array_columns = (('energy_spectrum', len(energy_edges) - 1),)
[docs] def setup(self): wimp_kwargs = dict(mw=self.mw, sigma_nucleon=self.sigma_nucleon, energy_edges=self.energy_edges) # BlockModelSource is kind enough to let us change these attributes # at this stage. e_centers = self.bin_centers(wimp_kwargs['energy_edges']) self.energies = fd.np_to_tf(e_centers) self.array_columns = (('energy_spectrum', len(self.energy_edges) - 1),) times = np.linspace(wr.j2000(self.t_start.value), wr.j2000(self.t_stop.value), self.n_time_bins + 1) time_centers = self.bin_centers(times) # Transform wimp_kwargs to arguments that can be passed to wimprates # which means transforming es from edges to centers del wimp_kwargs['energy_edges'] spectra = np.array([wr.rate_wimp_std(t=t, es=e_centers, **wimp_kwargs) * np.diff(self.energy_edges) for t in time_centers]) assert spectra.shape == (len(time_centers), len(e_centers)) self.energy_hist = Histdd.from_histogram( spectra, bin_edges=(times, self.energy_edges)) if self.pretend_wimps_dont_modulate: self.energy_hist.histogram = ( np.ones_like(self.energy_hist.histogram) * self.energy_hist.sum(axis=0).histogram.reshape(1, -1) / self.n_time_bins)
[docs] def energy_spectrum(self, event_time): ts = fd.tf_to_np(event_time) ts = wr.j2000(ts) ts = self.clip_j2000_times(ts) result = np.stack([self.energy_hist.slicesum(t).histogram for t in ts]) return fd.np_to_tf(result)
[docs] def clip_j2000_times(self, ts): """Return J2000 time(s) ts, clipped to the range of the energy-time histogram. If times are more than one day out, raises InvalidEventTimes. :param ts: J2000 timestamp, or array of timestamps """ tbins = self.energy_hist.bin_edges[0] if np.min(ts) < tbins[0] - 1 or np.max(ts) > tbins[-1] + 1: raise InvalidEventTimes( f"You passed J200 times in [{np.min(ts):.1f}, {np.max(ts):.1f}]" f"But this source expects [{tbins[0]:.1f} - {tbins[-1]:.1f}].") return np.clip(ts, tbins[0], tbins[-1])
[docs] def mu_before_efficiencies(self, **params): return self.energy_hist.n / self.n_time_bins
[docs] def random_truth(self, n_events, fix_truth=None, **params): """Draw n_events random energies and times from the energy/ time spectrum and add them to the data dict. """ data = self.draw_positions(n_events) if 'event_time' in fix_truth: # Convert to valid J200 timestamp (from whatever user specified) t = self.clip_j2000_times(wr.j2000(fix_truth['event_time'])) # Time is fixed, so the energy spectrum differs. # (if energy is also fixed, it will just be overridden later # and we're doing a bit of unnecessary work here) data['energy'] = \ self.energy_hist \ .slicesum(t, axis=0) \ .get_random(n_events) times = t elif 'energy' in fix_truth: # Energy is fixed, so the time distribution differs. e_edges = self.energy_hist.bin_edges[1] assert e_edges[0] <= fix_truth['energy'] < e_edges[-1], \ "fix_truth energy out of bounds" times = \ self.energy_hist \ .slicesum(fix_truth['energy'], axis=1) \ .get_random(n_events) else: times, data['energy'] = self.energy_hist.get_random(n_events).T data['event_time'] = fd.j2000_to_event_time(times) # Time has already been handled, do not overwrite it again # (if we do, we could crash if the user specified it as a datetime # object rather than a unix timestamp) fix_truth_notime = {k: v for k, v in fix_truth.items() if k != 'event_time'} self.source._overwrite_fixed_truths(data, fix_truth_notime, n_events) return pd.DataFrame(data)
[docs] @staticmethod def bin_centers(x): return 0.5 * (x[1:] + x[:-1])