import numpy as np
from scipy import stats
import tensorflow as tf
import tensorflow_probability as tfp
import flamedisx as fd
export, __all__ = fd.exporter()
o = tf.newaxis
DEFAULT_WORK_PER_QUANTUM = 13.7e-3
[docs]@export
class MakeERQuanta(fd.Block):
dimensions = ('quanta_produced', 'energy')
extra_dimensions = (('quanta_produced_noStep', False),)
depends_on = ((('energy',), 'rate_vs_energy'),)
model_functions = ('work',)
work = DEFAULT_WORK_PER_QUANTUM
[docs] def _compute(self,
data_tensor, ptensor,
# Domain
quanta_produced,
# Dependency domain and value
energy, rate_vs_energy,
# Extra domains for internal use
quanta_produced_noStep, energy_noStep):
# We will use the tensors without stepping througout, then reduce
# to the correct dimensions for the stepped domains via an averaging
# Assume the intial number of quanta is always the same for each energy
work = self.gimme('work', data_tensor=data_tensor, ptensor=ptensor)
quanta_produced_real = tf.cast(
tf.floor(energy_noStep / work[:, o, o]),
dtype=fd.float_type())
# (n_events, |nq|, |ne|) tensor giving p(nq | e)
result = tf.cast(
tf.equal(quanta_produced_noStep,
quanta_produced_real), dtype=fd.float_type())
# Padding needed to correctly average over the unstepped quanta
# domain to the stepped quanta domain: sum slices in equal chunks with
# the necessary padding on either side, then average the two
padding = int(
tf.floor(
tf.shape(quanta_produced_noStep)[1] /
(tf.shape(quanta_produced)[1]-1))
) - tf.shape(quanta_produced_noStep)[1] % \
(tf.shape(quanta_produced)[1]-1)
# Do the padding
result_pad_left = tf.pad(result, [[0, 0], [padding, 0], [0, 0]])
result_pad_right = tf.pad(result, [[0, 0], [0, padding], [0, 0]])
# Chunks to reshape into, to allow for summation of slices via a
# reduce_sum
chunks = int(tf.shape(result_pad_left)[1] / tf.shape(quanta_produced)[1])
steps = self.source._fetch('quanta_produced_steps',
data_tensor=data_tensor)
# Average slices padding from the left, diving again by the step size
# as this will be multiplied by later (once more than it needs to be)
result_temp_left = tf.reshape(
result_pad_left,
[tf.shape(result_pad_left)[0],
int(tf.shape(result_pad_left)[1] / chunks),
chunks,
tf.shape(result_pad_left)[2]])
result_left = tf.reduce_sum(result_temp_left, axis=2) \
/ (steps[:, o, o] * steps[:, o, o])
# Average slices padding from the right, diving again by the step size
# as this will be multiplied by later (once more than it needs to be)
result_temp_right = tf.reshape(
result_pad_right,
[tf.shape(result_pad_right)[0],
int(tf.shape(result_pad_right)[1] / chunks),
chunks,
tf.shape(result_pad_right)[2]])
result_right = tf.reduce_sum(result_temp_right, axis=2) \
/ (steps[:, o, o] * steps[:, o, o])
# Return the average of the two
return (result_left + result_right) / 2
[docs] def _simulate(self, d):
work = self.gimme_numpy('work')
d['quanta_produced'] = np.floor(d['energy'].values
/ work).astype(np.int)
[docs] def _annotate(self, d):
d['quanta_produced_noStep_min'] = (
d['electrons_produced_min']
+ d['photons_produced_min'])
annotate_ces(self, d)
[docs] def _domain_dict_bonus(self, d):
return domain_dict_bonus(self, d)
[docs] def _calculate_dimsizes_special(self):
return calculate_dimsizes_special(self)
[docs]@export
class MakeNRQuanta(fd.Block):
dimensions = ('quanta_produced', 'energy')
extra_dimensions = (('quanta_produced_noStep', False),)
depends_on = ((('energy',), 'rate_vs_energy'),)
special_model_functions = ('lindhard_l',)
model_functions = ('work',) + special_model_functions
work = DEFAULT_WORK_PER_QUANTUM
[docs] @staticmethod
def lindhard_l(e, lindhard_k=tf.constant(0.138, dtype=fd.float_type())):
"""Return Lindhard quenching factor at energy e in keV"""
eps = e * tf.constant(11.5 * 54.**(-7./3.), dtype=fd.float_type()) # Xenon: Z = 54
n0 = tf.constant(3., dtype=fd.float_type())
n1 = tf.constant(0.7, dtype=fd.float_type())
n2 = tf.constant(1.0, dtype=fd.float_type())
p0 = tf.constant(0.15, dtype=fd.float_type())
p1 = tf.constant(0.6, dtype=fd.float_type())
g = n0 * tf.pow(eps, p0) + n1 * tf.pow(eps, p1) + eps
res = lindhard_k * g/(n2 + lindhard_k * g)
return res
[docs] def _compute(self,
data_tensor, ptensor,
# Domain
quanta_produced,
# Dependency domain and value
energy, rate_vs_energy,
# Extra domains for internal use
quanta_produced_noStep, energy_noStep):
# We will use the tensors without stepping througout, then reduce
# to the correct dimensions for the stepped domains via an averaging
work = self.gimme('work', data_tensor=data_tensor, ptensor=ptensor)
mean_q_produced = (
energy_noStep
* self.gimme('lindhard_l', bonus_arg=energy_noStep,
data_tensor=data_tensor, ptensor=ptensor)
/ work[:, o, o])
# (n_events, |nq|, |ne|) tensor giving p(nq | e)
result = tfp.distributions.Poisson(mean_q_produced).prob(
quanta_produced_noStep)
# Padding needed to correctly average over the unstepped quanta
# domain to the stepped quanta domain: sum slices in equal chunks with
# the necessary padding on either side, then average the two
padding = int(
tf.floor(
tf.shape(quanta_produced_noStep)[1] /
(tf.shape(quanta_produced)[1]-1))
) - tf.shape(quanta_produced_noStep)[1] % \
(tf.shape(quanta_produced)[1]-1)
# Do the padding
result_pad_left = tf.pad(result, [[0, 0], [padding, 0], [0, 0]])
result_pad_right = tf.pad(result, [[0, 0], [0, padding], [0, 0]])
# Chunks to reshape into, to allow for summation of slices via a
# reduce_sum
chunks = int(tf.shape(result_pad_left)[1] / tf.shape(quanta_produced)[1])
steps = self.source._fetch('quanta_produced_steps',
data_tensor=data_tensor)
# Average slices padding from the left, diving again by the step size
# as this will be multiplied by later (once more than it needs to be)
result_temp_left = tf.reshape(
result_pad_left,
[tf.shape(result_pad_left)[0],
int(tf.shape(result_pad_left)[1] / chunks),
chunks,
tf.shape(result_pad_left)[2]])
result_left = tf.reduce_sum(result_temp_left, axis=2) \
/ (steps[:, o, o] * steps[:, o, o])
# Average slices padding from the right, diving again by the step size
# as this will be multiplied by later (once more than it needs to be)
result_temp_right = tf.reshape(
result_pad_right,
[tf.shape(result_pad_right)[0],
int(tf.shape(result_pad_right)[1] / chunks),
chunks,
tf.shape(result_pad_right)[2]])
result_right = tf.reduce_sum(result_temp_right, axis=2) \
/ (steps[:, o, o] * steps[:, o, o])
# Return the average of the two
return (result_left + result_right) / 2
[docs] def _simulate(self, d):
# If you forget the .values here, you may get a Python core dump...
energies = d['energy'].values
work = self.gimme_numpy('work')
lindhard_l = self.gimme_numpy('lindhard_l', bonus_arg=energies)
d['quanta_produced'] = stats.poisson.rvs(energies * lindhard_l / work)
[docs] def _annotate(self, d):
d['quanta_produced_noStep_min'] = (
d['electrons_produced_min']
+ d['photons_produced_min'])
annotate_ces(self, d)
[docs] def _domain_dict_bonus(self, d):
return domain_dict_bonus(self, d)
[docs] def _calculate_dimsizes_special(self):
return calculate_dimsizes_special(self)
def annotate_ces(self, d):
# No bounds need to be estimated; we will consider the entire
# energy spectrum for each event.
# Nonetheless, it's useful to reconstruct the 'visible' energy
# via the combined energy scale (CES
work = self.gimme_numpy('work')
d['e_charge_vis'] = work * d['electrons_produced_mle']
d['e_light_vis'] = work * d['photons_produced_mle']
d['e_vis'] = d['e_charge_vis'] + d['e_light_vis']
for bound in ('min', 'max'):
d['quanta_produced_' + bound] = (
d['electrons_produced_' + bound]
+ d['photons_produced_' + bound])
def domain_dict_bonus(self, d):
# Calculate cross_domains from quanta_produced_noStep and energy
mi = self.source._fetch('quanta_produced_noStep_min', data_tensor=d)[:, o]
quanta_produced_noStep_domain = mi + tf.range(tf.reduce_max(
self.source._fetch('quanta_produced_noStep_dimsizes', data_tensor=d)))
energy_domain = self.source.domain('energy', d)
quanta_produced_noStep = tf.repeat(
quanta_produced_noStep_domain[:, :, o],
tf.shape(energy_domain)[1],
axis=2)
energy_noStep = tf.repeat(
energy_domain[:, o, :],
tf.shape(quanta_produced_noStep_domain)[1],
axis=1)
# Return as domain_dict
return dict({'quanta_produced_noStep': quanta_produced_noStep,
'energy_noStep': energy_noStep})
def calculate_dimsizes_special(self):
d = self.source.data
# Want electrons and photons to have the same stepping: choose the minimum
# of the two for each event
quanta_steps = (d['electrons_produced_steps'] <=
d['photons_produced_steps']) * \
d['electrons_produced_steps'] \
+ (d['photons_produced_steps'] <
d['electrons_produced_steps']) * d['photons_produced_steps']
batch_size = self.source.batch_size
n_batches = self.source.n_batches
# Need the electrons/photons steps to be the same within a batch for the
# averaging procedure in _compute to work correctly
for i in range(n_batches):
quanta_steps[i * batch_size: (i + 1) * batch_size + 1] = \
max(quanta_steps[i * batch_size: (i + 1) * batch_size + 1])
d['electrons_produced_steps'] = quanta_steps
d['photons_produced_steps'] = quanta_steps
d['quanta_produced_steps'] = quanta_steps
d['quanta_produced_noStep_steps'] = 1
# Ensure that we still cover the full electrons_produced bounds, even if
# the stepping has changed
electrons_produced_dimsizes = np.ceil(
(d['electrons_produced_max'].to_numpy()
- d['electrons_produced_min'].to_numpy()) / quanta_steps) + 1
self.source.dimsizes['electrons_produced'] = electrons_produced_dimsizes
# Ensure that we still cover the full photons_produced bounds, even if
# the stepping has changed
photons_produced_dimsizes = np.ceil(
(d['photons_produced_max'].to_numpy()
- d['photons_produced_min'].to_numpy()) / quanta_steps) + 1
self.source.dimsizes['photons_produced'] = photons_produced_dimsizes
# Correct dimsize for quanta_produced, to cover the full range that the sum
# of electrons_produced + photons_produced can take
quanta_produced_dimsizes = electrons_produced_dimsizes \
+ photons_produced_dimsizes - 1
# Need the quanta_produced dimsizes to be the same within a batch for the
# averaging procedure in _compute to work correctly
for i in range(n_batches):
quanta_produced_dimsizes[i * batch_size: (i + 1) * batch_size + 1] = \
max(quanta_produced_dimsizes[i * batch_size:
(i + 1) * batch_size + 1])
self.source.dimsizes['quanta_produced'] = quanta_produced_dimsizes
# Correct dimsizes for quanta_produced_noStep, to cover the full range of
# quanta_produced with integer steps
self.source.dimsizes['quanta_produced_noStep'] = quanta_produced_dimsizes \
+ (quanta_steps - 1) * (quanta_produced_dimsizes - 1)