import typing as ty
import numpy as np
import tensorflow as tf
import pandas as pd
import flamedisx as fd
export, __all__ = fd.exporter()
o = tf.newaxis
[docs]@export
class Block:
"""One part of a BlockSource model.
For example, P(electrons_detected | electrons_produced).
"""
#: Source the block belongs to
source: fd.Source = None
#: Names of dimensions of the block's compute result
dimensions: ty.Tuple[str]
#: Additional dimensions used in the block computation.
#: Label True if they represent an internally contracted hidden variable;
#: these will be added to inner_dimensions so domain tensors are calculated
#: automatically.
#: Label False otherwise; these will be added to bonus_dimensions. Thus,
#: any additional domain tensors utilising them will need calculating via
#: the block overriding _domain_dict_bonus())
extra_dimensions: ty.Tuple[ty.Tuple[str, bool]]
#: Blocks whose result this block expects as an extra keyword
#: argument to compute. Specify as ((block_dims, argument_name), ...),
#: where block_dims is the dimensions-tuple of the block, and argument_name
#: the expected name of the compute keyword argument.
depends_on: ty.Tuple[ty.Tuple[ty.Tuple[str], str]] = tuple()
#: Names of model functions defined in this block
model_functions: ty.Tuple[str] = tuple()
#: Names of model functions that take an additional first argument
#: ('bonus arg') defined in this block; must be a subset of model_functions
special_model_functions: ty.Tuple[str] = tuple()
#: Names of columns this block expects to be array-valued
array_columns: ty.Tuple[str] = tuple()
#: Frozen model functions defined in this block
frozen_model_functions: ty.Tuple[str] = tuple()
#: Additional attributes this Block will furnish the source with.
#: These can be overriden by Source attributes, just like model functions.
model_attributes: ty.Tuple[str] = tuple()
def __init__(self, source):
self.source = source
assert len(self.dimensions) in (1, 2), \
"Blocks must output 1 or 2 dimensions"
# Currently only support 1 extra_dimension per block
assert len(self.extra_dimensions) <= 1, \
f"{self} has >1 extra dimension!"
# Redirect all model attribute queries to the linked source,
# as soon as one exists, and has the relevant attribute.
# We need __getattribute__, not __getattr__, to catch access
# to attributes that exist.
def __getattribute__(self, name):
# Can't use self.something here without recursion...
linked_attributes = (
super().__getattribute__('model_functions')
+ super().__getattribute__('model_attributes'))
source = super().__getattribute__('source')
if (name in linked_attributes and hasattr(source, name)):
return getattr(source, name)
else:
return super().__getattribute__(name)
def __setattr__(self, name, value):
if (name in (self.model_functions + self.model_attributes)
and self.source is not None):
setattr(self.source, name, value)
else:
super().__setattr__(name, value)
[docs] def setup(self):
"""Do any necessary initialization.
Called after the block's attributes have been properly overriden
by source attributes, if specified.
"""
[docs] def gimme(self, *args, **kwargs):
"""Shorthand for self.source.gimme, see docs there"""
return self.source.gimme(*args, **kwargs)
[docs] def gimme_numpy(self, *args, **kwargs):
"""Shorthand for self.source.gimme_numpy"""
return self.source.gimme_numpy(*args, **kwargs)
[docs] def compute(self, data_tensor, ptensor, **kwargs):
if len(self.extra_dimensions) == 0:
kwargs.update(self.source._domain_dict(
self.dimensions, data_tensor))
else:
if self.extra_dimensions[0][1] is True:
raise NotImplementedError
else:
kwargs.update(self.source._domain_dict(
self.dimensions, data_tensor))
kwargs.update(self._domain_dict_bonus(data_tensor))
result = self._compute(data_tensor, ptensor, **kwargs)
assert result.dtype == fd.float_type(), \
f"{self}._compute returned tensor of wrong dtype!"
assert len(result.shape) == len(self.dimensions) + 1, \
f"{self}._compute returned tensor of wrong rank!"
return result
[docs] def simulate(self, d: pd.DataFrame):
return_value = self._simulate(d)
assert return_value is None, f"_simulate of {self} should return None"
# Check necessary columns were actually added
for dim in self.dimensions:
assert dim in d.columns, f"_simulate of {self} must set {dim}"
assert np.all(np.isfinite(d[dim].values)),\
f"_simulate of {self} returned non-finite values of {dim}"
[docs] def annotate(self, d: pd.DataFrame):
"""Add _min and _max for each dimension to d in-place"""
return_value = self._annotate(d)
assert return_value is None, f"_annotate of {self} should return None"
for dim in self.dimensions:
if dim in self.source.final_dimensions \
or dim in self.source.initial_dimensions:
continue
for bound in ('min', 'max'):
colname = f'{dim}_{bound}'
assert colname in d.columns, \
f" must set {colname}"
assert np.all(d[f'{dim}_min'].values
<= d[f'{dim}_max'].values), \
f"_annotate of {self} set misordered bounds"
[docs] def check_data(self):
pass
[docs] def _compute(self, data_tensor, ptensor, **kwargs):
"""Return (n_batch_events, ...dimensions...) tensor"""
raise NotImplementedError
[docs] def _simulate(self, d):
"""Simulate extra columns in place.
Use the p_accepted column to modify acceptances; do not remove
events here.
"""
raise NotImplementedError
[docs] def _annotate(self, d):
"""Add _min and _max for each dimension to d in-place"""
raise NotImplementedError
[docs] def _domain_dict_bonus(self, d):
"""Calculate any additional intenal tensors arising from the use of
bonus_dimensions in a block. Override within block"""
raise NotImplementedError
[docs] def _calculate_dimsizes_special(self):
"""Re-calculate dimension size and steps differently for any
dimensions; will need to override _calculate_dimsizes_special()
within a block"""
pass
[docs]@export
class FirstBlock(Block):
"""The first Block of a source. This is usually an energy spectrum"""
[docs] def _simulate(self, d):
raise RuntimeError("FirstBlock's shouldn't simulate")
[docs] def _annotate(self, d):
# First block can omit annotate
pass
[docs] def domain(self, data_tensor):
"""Return dictionary mapping dimension -> domain"""
raise NotImplementedError
[docs] def random_truth(self, n_events, fix_truth=None, **params):
raise NotImplementedError
[docs] def mu_before_efficiencies(self, **params):
raise NotImplementedError
[docs] def validate_fix_truth(self, d):
raise NotImplementedError
[docs]@export
class BlockModelSource(fd.Source):
"""Source whose model is split over different Blocks
"""
#: Blocks the source is built from.
#: simulate will be called from first to last, annotate from last to first.
model_blocks: tuple
#: Dimensions provided by the first block
initial_dimensions: tuple
#: Additional dimensions used in the block computation;
#: see Block.extra_dimensions for info
extra_dimensions = ()
def __init__(self, *args, **kwargs):
if isinstance(self.model_blocks[0], FirstBlock):
# Blocks have already been instantiated
return
if not issubclass(self.model_blocks[0], FirstBlock):
raise RuntimeError("The first block must inherit from FirstBlock")
for b in self.model_blocks[1:]:
if issubclass(b, FirstBlock):
raise RuntimeError("Only the first block can be a FirstBlock")
# Collect attributes from the different blocks in this dictionary:
collected = {k: [] for k in (
'dimensions',
'extra_dimensions',
'model_functions',
'special_model_functions',
'model_attributes',
'frozen_model_functions',
'array_columns')}
# Instantiate the blocks.
self.model_blocks = tuple([b(self) for b in self.model_blocks])
for b in self.model_blocks:
# Maybe someome forgot a comma in a tuple specification
for k in collected:
if not isinstance(getattr(b, k), tuple):
raise ValueError(
f"{k} in {b} should be a tuple, not a {type(k)}")
# Call the setup method. This method is not really needed anymore;
# blocks can simply override __init__ for setup, as long as they
# call super().__init__ *first* (else self.source would not be set)
b.setup()
# Set the source attributes from the block's attributes.
# From here on, block attributes become invisible and irrelevant,
# since get/setattr on the blocks will redirect to the source.
for x in set(b.model_functions + b.model_attributes):
setattr(self, x, getattr(b, x))
# Collect all information from the block.
# We do this after the setup; the array columns field in
# particular is nice to change in the block setup code
for k in collected:
collected[k] += getattr(b, k)
# Someone might try to modify a source attribute after
# instantiation, then be surprised when nothing happens
# (because the same-named block attribute wasn't changed).
# Sorry. I can't use properties to prevent writing, since
# those are class-bound.
# The source may declare additional frozen data methods
collected['frozen_model_functions'] += self.frozen_model_functions
# For array columns, the source may declare new columns
# or override the length of the old columns
for column, length in self.array_columns:
for i, (_old_column, _) in enumerate(collected['array_columns']):
if _old_column == column:
# Change length of existing column
collected['array_columns'][i] = (column, length)
break
else:
# New column
collected['array_columns'] += [(column, length)]
# Make the collected attributes available to the source
for k, v in collected.items():
if k == 'dimensions':
# Dimensions is a special case, see below
continue
setattr(self, k, getattr(self, k) + tuple(set(v)))
self.inner_dimensions = tuple(
[d for d in collected['dimensions']
if ((d not in self.final_dimensions)
and (d not in self.model_blocks[0].dimensions))]
+ [d[0] for d in collected['extra_dimensions']
if d[1] is True])
self.initial_dimensions = self.model_blocks[0].dimensions
self.bonus_dimensions = tuple([
d[0] for d in collected['extra_dimensions'] if d[1] is False])
super().__init__(*args, **kwargs)
[docs] @staticmethod
def _find_block(blocks,
has_dim: ty.Union[list, tuple, set],
exclude: Block = None):
"""Find a block with a dimension in has_dim, other than the block in
exclude. Return (dimensions, b), or raises BlockNotFoundError.
"""
for dims, b in blocks.items():
if b is exclude:
continue
for d in dims:
if d in has_dim:
return dims, b
raise BlockNotFoundError(f"No block with {has_dim} found!")
[docs] def _differential_rate(self, data_tensor, ptensor):
results = {}
already_stepped = () # Avoid double-multiplying to account for stepping
for b in self.model_blocks:
b_dims = b.dimensions
# Gather extra compute arguments.
kwargs = dict()
for dependency_dims, dependency_name in b.depends_on:
if dependency_dims not in results:
raise ValueError(
f"Block {b} depends on {dependency_dims}, but that has "
f"not yet been computed")
kwargs[dependency_name] = results[dependency_dims]
kwargs.update(self._domain_dict(dependency_dims, data_tensor))
# Compute the block
r = b.compute(data_tensor, ptensor, **kwargs)
# Scale the block by stepped dimensions, if not already done in
# another block
for dim in b_dims:
if (dim in self.inner_dimensions) and \
(dim not in self.no_step_dimensions) and \
(dim not in already_stepped):
steps = self._fetch(dim+'_steps', data_tensor=data_tensor)
step_mul = tf.repeat(steps[:, o], tf.shape(r)[1], axis=1)
step_mul = tf.repeat(step_mul[:, :, o],
tf.shape(r)[2], axis=2)
r *= step_mul
already_stepped += (dim,)
results[b_dims] = r
# Try to matrix multiply with earlier blocks, until we cannot
# do so anymore.
try:
while True:
b2_dims, r2 = self._find_block(
results, has_dim=b_dims, exclude=r)
new_dims, r = self.multiply_block_results(
b_dims, b2_dims, r, r2)
results[new_dims] = r
del results[b_dims]
del results[b2_dims]
except BlockNotFoundError:
continue
# The result should have a tensor with only final dimensions
result = None
for dims, result in results.items():
if all([d in self.final_dimensions for d in dims]):
break
if result is None:
raise ValueError("Result was not computed!")
return tf.reshape(tf.squeeze(result), (self.batch_size,))
[docs] def multiply_block_results(self, b_dims, b2_dims, r, r2):
"""Return result of matrix-multiplying two block results
:param b_dims: tuple, dimension specification of r
:param b2_dims: tuple, dimension specification of r2
:param r: tensor , first block result to be multiplier
:param r2: tensor, second block result to be multiplied
:return: (dimension specification, tensor) of results
"""
shared_dim = set(b_dims).intersection(set(b2_dims))
if len(shared_dim) != 1:
raise ValueError(f"Expected one shared dimension, "
f"found {len(shared_dim)}!")
shared_dim = list(shared_dim)[0]
# Figure out dimensions of result
new_dims = tuple([d for d in b_dims if d != shared_dim]
+ [d for d in b2_dims if d != shared_dim])
# TODO: can we generalize to rank > 2?
assert len(b_dims) in [1, 2]
assert len(b2_dims) in [1, 2]
# Due to the batch dimension, tf.matmul requires that the
# ranks of arguments to tf.matmul match exactly.
dummy_axis = None
if len(b_dims) == 1 and len(b2_dims) == 2:
dummy_axis = 1
r = r[:, o, :]
elif len(b_dims) == 2 and len(b2_dims) == 1:
dummy_axis = 2
r2 = r2[:, :, o]
elif len(b_dims) == 2 and len(b2_dims) == 2:
# Ensure the shared dimension is in the right position
# for matrix multiplication
if b_dims.index(shared_dim) != 1:
assert len(b_dims) == 2
r = tf.transpose(r, [0, 2, 1])
if b2_dims.index(shared_dim) != 0:
assert len(b2_dims) == 2
r2 = tf.transpose(r2, [0, 2, 1])
else:
raise ValueError(
"Unsupported ranks {len(b_dims)}, {len(b2_dims)}")
# Multiply the matching index to get a new block
r = r @ r2
if dummy_axis:
r = tf.squeeze(r, dummy_axis)
assert len(r.shape) == len(new_dims) + 1
return (new_dims, r)
[docs] def random_truth(self, n_events, fix_truth=None, **params):
# First block provides the 'deep' truth (energies, positions, time)
return self.model_blocks[0].random_truth(
n_events, fix_truth=fix_truth, **params)
[docs] def validate_fix_truth(self, fix_truth):
return self.model_blocks[0].validate_fix_truth(fix_truth)
[docs] def _check_data(self):
super()._check_data()
for b in self.model_blocks:
b.check_data()
[docs] def _simulate_response(self):
# All blocks after the first help to simulate the response
d = self.data
d['p_accepted'] = 1.
for b in self.model_blocks[1:]:
b.simulate(d)
return d.iloc[np.random.rand(len(d)) < d['p_accepted'].values].copy()
[docs] def _annotate(self, _skip_bounds_computation=False):
d = self.data
# By going in reverse order through the blocks, we can use the bounds
# on hidden variables closer to the final signals (easy to compute)
# for estimating the bounds on deeper hidden variables.
for b in self.model_blocks[::-1]:
b.annotate(d)
[docs] def mu_before_efficiencies(self, **params):
return self.model_blocks[0].mu_before_efficiencies(**params)
[docs] def draw_positions(self, *args, **kwargs):
# TODO: This is a kludge; allows one to reuse draw_positions
# from the first block in a source-overriden random_truth function.
return self.model_blocks[0].draw_positions(*args, **kwargs)
[docs] def domain(self, x, data_tensor=None):
if x in self.initial_dimensions:
# Domain computation of the inner dimension is passed to the
# first block
return self.model_blocks[0].domain(data_tensor=data_tensor)[x]
else:
return super().domain(x, data_tensor=data_tensor)
[docs] def _domain_dict(self, dimensions, data_tensor):
if len(dimensions) == 1:
return {dimensions[0]:
self.domain(dimensions[0], data_tensor)}
assert len(dimensions) == 2
return dict(zip(dimensions,
self.cross_domains(*dimensions,
data_tensor=data_tensor)))
[docs] def add_derived_observables(self, d):
pass
[docs] def calculate_dimsizes_special(self):
"""Custom calulcation of any dimension sizes and steps; override
_calculate_dimsizes_special within block"""
for b in self.model_blocks:
b._calculate_dimsizes_special()
class BlockNotFoundError(Exception):
pass