from pathlib import Path
import subprocess
import numpy as np
import pandas as pd
from scipy import stats
import tensorflow as tf
lgamma = tf.math.lgamma
o = tf.newaxis
FLOAT_TYPE = tf.float32
INT_TYPE = tf.int32
# Extreme mean and standard deviations give numerical errors
# in the beta-binomial.
MAX_MEAN_P = 0.95 # issue #36
MIN_FLUCTUATION_P = 0.005 # issue #36
MIN_MEAN_P = 0.011 # issue #83. Adjust if changing MIN_FLUCTUATION_P!
# The MAX_FLUCTUATION_P depends on the mean, see issue #83.
[docs]def exporter():
"""Export utility modified from https://stackoverflow.com/a/41895194
Returns export decorator, __all__ list
"""
all_ = []
def decorator(obj):
all_.append(obj.__name__)
return obj
return decorator, all_
export, __all__ = exporter()
__all__.extend([
'float_type', 'exporter',
'MIN_FLUCTUATION_P', 'MAX_MEAN_P'])
[docs]@export
def float_type():
return FLOAT_TYPE
[docs]@export
def int_type():
return INT_TYPE
[docs]@export
def lookup_axis1(x, indices, fill_value=0):
"""Return values of x at indices along axis 1,
returning fill_value for out-of-range indices.
"""
# Save shape of x and flatten
ind_shape = tf.shape(indices)
a = tf.shape(x)[0]
b = tf.shape(x)[1]
x = tf.reshape(x, [-1])
legal_index = tf.cast(indices, dtype=int_type()) < b
# Convert indices to legal indices in flat array
indices = tf.clip_by_value(indices, 0., tf.cast(b, dtype=float_type())-1.)
indices = indices + tf.cast(b, dtype=float_type()) * \
tf.range(a, dtype=float_type())[:, o, o]
indices = tf.reshape(indices, shape=(-1,))
indices = tf.dtypes.cast(indices, dtype=int_type())
# Do indexing
result = tf.reshape(tf.gather(x,
indices),
shape=ind_shape)
# Replace illegal indices with fill_value, cast to float explicitly
return tf.cast(tf.where(legal_index,
result,
tf.zeros_like(result) + fill_value),
dtype=float_type())
[docs]@export
def tf_to_np(x):
"""Convert (list/tuple of) tensors x to numpy"""
if isinstance(x, (list, tuple)):
return tuple([tf_to_np(y) for y in x])
if isinstance(x, np.ndarray):
return x
return x.numpy()
[docs]@export
def np_to_tf(x):
"""Convert (list/tuple of) arrays x to tensorflow"""
if isinstance(x, pd.Series):
x = x.values
elif isinstance(x, pd.DataFrame):
raise ValueError("Cannot convert pd.DataFrame's to tensors!")
elif isinstance(x, (list, tuple)):
return tuple([np_to_tf(y) for y in x])
elif isinstance(x, tf.Tensor):
return x
return tf.convert_to_tensor(x, dtype=float_type())
[docs]@export
def cart_to_pol(x, y):
return (x**2 + y**2)**0.5, np.arctan2(y, x)
[docs]@export
def pol_to_cart(r, theta):
return r * np.cos(theta), r * np.sin(theta)
[docs]@export
def tf_log10(x):
return tf.math.log(x) / tf.math.log(tf.constant(10, dtype=x.dtype))
[docs]@export
def safe_p(ps):
"""Clip probabilities to be in [1e-5, 1 - 1e-5]
NaNs are replaced by 1e-5.
"""
ps = tf.where(tf.math.is_nan(ps),
tf.zeros_like(ps, dtype=float_type()),
tf.cast(ps, dtype=float_type()))
ps = tf.clip_by_value(ps, 1e-5, 1 - 1e-5)
return ps
[docs]@export
def beta_params(mean, sigma, force_valid=True):
"""Convert (p_mean, p_sigma) to (alpha, beta) params of beta distribution
:param force_valid: If true, adjust values to give valid, stable, and
unimodal beta distributions. See issues #36 and #83
"""
m, v = mean, sigma ** 2
m, v = np_to_tf(m), np_to_tf(v) # We're called from numpy sometimes
if force_valid:
m = tf.clip_by_value(m, MIN_MEAN_P, MAX_MEAN_P)
max_v = tf.maximum(
(m-1)**2 * m / (2-m),
m**2 * (1-m)/(1+m))
v = tf.clip_by_value(v, MIN_FLUCTUATION_P**2, max_v)
a = m * (m/v - m**2/v - 1.)
b = a * (1/m - 1)
return a, b
[docs]@export
def beta_binom_pmf(x, n, p_mean, p_sigma):
"""Return probability mass function of beta-binomial distribution.
That is, give the probability of obtaining x successes in n trials,
if the success probability p is drawn from a beta distribution
with mean p_mean and standard deviation p_sigma.
"""
a, b = beta_params(p_mean, p_sigma)
res = tf.exp(
lgamma(n + 1.) + lgamma(x + a) + lgamma(n - x + b)
+ lgamma(a + b)
- (lgamma(x + 1.) + lgamma(n - x + 1.)
+ lgamma(a) + lgamma(b) + lgamma(n + a + b)))
return tf.where(tf.math.is_finite(res),
res,
tf.zeros_like(res, dtype=float_type()))
[docs]@export
def is_numpy_number(x):
try:
return (np.issubdtype(x.dtype, np.integer)
or np.issubdtype(x.dtype, np.floating))
except (AttributeError, TypeError):
return False
[docs]@export
def symmetrize_matrix(x):
upper = tf.linalg.band_part(x, 0, -1)
diag = tf.linalg.band_part(x, 0, 0)
return (upper - diag) + tf.transpose(upper)
[docs]@export
def j2000_to_event_time(dates):
"""Convert a numpy array of j2000 timestamps to event_times
which are ns unix timestamps. This is the reverse of wimprates.j2000
"""
zero = pd.to_datetime('2000-01-01T12:00')
nanoseconds_per_day = 1e9 * 3600 * 24
return nanoseconds_per_day * dates + zero.value
[docs]@export
def index_lookup_dict(names, column_widths=None):
"""Return dictionary mapping names to successive tensor indices
(tf.constant integers.)
:param column_widths: dictionary mapping names to column width.
For columns with width > 1, the result contains a tensor slice.
"""
names = list(names)
if column_widths is None:
column_widths = dict()
result = dict()
i = 0
while names:
name = names.pop(0)
width = column_widths.get(name, 1)
if width == 1:
result[name] = tf.constant(i, dtype=int_type())
else:
result[name] = slice(tf.constant(i, dtype=int_type()),
tf.constant(i + width, dtype=int_type()))
i += width
return result
[docs]@export
def values_to_constants(kwargs):
"""Return dictionary with python/numpy values replaced by tf.constant"""
for k, v in kwargs.items():
if isinstance(v, (float, int)) or is_numpy_number(v):
kwargs[k] = tf.constant(v, dtype=float_type())
return kwargs
[docs]@export
def wilks_crit(confidence_level):
"""Return critical value from Wilks' theorem for upper limits"""
return stats.norm.ppf(confidence_level) ** 2
[docs]@export
def run_command(command):
"""Run command and show its output in STDOUT"""
# Is there no easier way??
with subprocess.Popen(
command.split(),
bufsize=1,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT) as p:
for line in iter(p.stdout.readline, ''):
print(line.rstrip())
[docs]@export
def load_config(config_files=None):
"""Return dictionary of configuration options from (a) python file(s)"""
if config_files is None:
return {}
if isinstance(config_files, str):
# Support one or multiple files
config_files = (config_files,)
config = dict()
for filename in config_files:
if not filename.endswith('.py'):
# This is (hopefully) a named config shipped with flamedisx
filename = Path(__file__).parent / 'configs' / (filename + '.py')
# Adapted from https://stackoverflow.com/a/37611448
with open(filename) as f:
code = compile(f.read(), filename, 'exec')
captured_locals = dict()
exec(code, globals(), captured_locals)
config.update({
k: v for k, v in captured_locals.items()
if not k.startswith('_')})
return config