from copy import deepcopy
import warnings
import flamedisx as fd
import numpy as np
import pandas as pd
import tensorflow as tf
import typing as ty
export, __all__ = fd.exporter()
o = tf.newaxis
DEFAULT_DSETNAME = 'the_dataset'
[docs]@export
class LogLikelihood:
param_defaults: ty.Dict[str, tf.Tensor]
dsetnames: ty.List[str] # Dataset names
sources: ty.Dict[str, fd.Source] # Source name -> Source instance
# Track which source takes which dataset, and converse
dset_for_source = ty.Dict[str, str]
sources_in_dset = ty.Dict[str, str]
# Tensor with batch info
# First dimension runs over datasets (same indices as dsetnames)
# Second dimension is (n_batches, batch_size, n_padding)
batch_info: tf.Tensor
dsetnames: ty.List
# Concatenated data tensors for all sources in each dataset
# dsetname -> Tensor
data_tensors: ty.Dict[str, tf.Tensor]
# Track which columns in the data tensor belong to which sources
# dsetname -> [start, stop] array over sources
column_indices: ty.Dict[str, np.ndarray]
def __init__(
self,
sources: ty.Union[
ty.Dict[str, fd.Source.__class__],
ty.Dict[str, ty.Dict[str, fd.Source.__class__]]],
data: ty.Union[
None,
pd.DataFrame,
ty.Dict[str, pd.DataFrame]] = None,
free_rates=None,
batch_size=10,
max_sigma=None,
max_dim_size=120,
n_trials=int(1e5),
log_constraint=None,
bounds_specified=True,
progress=True,
defaults=None,
**common_param_specs):
"""
:param sources: Dictionary {datasetname : {sourcename: class,, ...}, ...}
or just {sourcename: class} in case you have one dataset
Every source name must be unique.
:param data: Dictionary {datasetname: pd.DataFrame}
or just pd.DataFrame if you have one dataset or None if you
set data later.
:param free_rates: names of sources whose rates are floating
:param batch_size: Number of events to use for a computation batch.
Higher numbers give better performance, especially for GPUs,
at the cost of more memory
:param max_sigma: Maximum sigma to use in bounds estimation.
Higher numbers give better accuracy, at the cost of performance.
If not specified, each source will use its own default.
:param max_dim_size: Maximum bounds size for inner_dimensions,
excluding no_step_dimensions
:param n_trials: Number of Monte-Carlo trials for mu estimation.
:param log_constraint: Logarithm of constraint to include in likelihood
:param bounds_specified: If True (default), optimizers will be
constrained within the specified parameter ranges.
:param progress: Show progress bars during initialization
:param defaults: dictionary of default parameter values to use for
all sources.
:param **common_param_specs: param_name = (min, max, anchors), ...
"""
param_defaults = dict()
if defaults is None:
defaults = dict()
if isinstance(data, pd.DataFrame) or data is None:
# Only one dataset
data = {DEFAULT_DSETNAME: data}
if not isinstance(list(sources.values())[0], dict):
sources: ty.Dict[str, ty.Dict[str, fd.Source.__class__]] = \
{DEFAULT_DSETNAME: sources}
assert data.keys() == sources.keys(), "Inconsistent dataset names"
self.dsetnames = list(data.keys())
# Flatten sources and fill data for source
self.sources: ty.Dict[str, fd.Source.__class__] = dict()
self.dset_for_source = dict()
self.sources_in_dset = dict()
for dsetname, ss in sources.items():
self.sources_in_dset[dsetname] = []
for sname, s in ss.items():
self.dset_for_source[sname] = dsetname
self.sources_in_dset[dsetname].append(sname)
if sname in self.sources:
raise ValueError(f"Duplicate source name {sname}")
self.sources[sname] = s
del sources # so we don't use it by accident
if free_rates is None:
free_rates = tuple()
if isinstance(free_rates, str):
free_rates = (free_rates,)
for sn in free_rates:
if sn not in self.sources:
raise ValueError(f"Can't free rate of unknown source {sn}")
# The rate multiplier guesses will be improved after data is set
param_defaults[sn + '_rate_multiplier'] = 1.
# Create sources
self.sources = {
sname: sclass(data=None,
max_sigma=max_sigma,
max_dim_size=max_dim_size,
# The source will filter out parameters it does not
# take
fit_params=list(k for k in common_param_specs.keys()),
batch_size=batch_size,
**defaults)
for sname, sclass in self.sources.items()}
for pname in common_param_specs:
# Check defaults for common parameters are consistent between
# sources
defs = [s.defaults[pname]
for s in self.sources.values()
if pname in s.defaults]
if not defs:
raise ValueError(f"Invalid fit parameter {pname}: "
f"no source takes it?")
if len(set([x.numpy() for x in defs])) > 1:
raise ValueError(
f"Inconsistent defaults {defs} for common parameters")
param_defaults[pname] = defs[0]
self.param_defaults = fd.values_to_constants(param_defaults)
self.param_names = list(param_defaults.keys())
if bounds_specified:
self.default_bounds = {
p_name: (start, stop)
for p_name, (start, stop, n) in common_param_specs.items()}
else:
self.default_bounds = dict()
self.mu_itps = {
sname: s.mu_function(
n_trials=n_trials,
progress=progress,
# Source will filter out the params it needs
**common_param_specs)
for sname, s in self.sources.items()}
# Not used, but useful for mu smoothness diagnosis
self.param_specs = common_param_specs
# Add the constraint
if log_constraint is None:
def log_constraint(**kwargs):
return 0.
self.log_constraint = log_constraint
self.set_data(data)
[docs] def set_data(self,
data: ty.Union[pd.DataFrame, ty.Dict[str, pd.DataFrame]]):
"""set new data for sources in the likelihood.
Data is passed in the same format as for __init__
Data can contain any subset of the original data keys to only
update specific datasets.
"""
if isinstance(data, pd.DataFrame):
assert len(self.dsetnames) == 1, \
"You passed one DataFrame but there are multiple datasets"
data = {DEFAULT_DSETNAME: data}
is_none = [d is None for d in data.values()]
if any(is_none):
if not all(is_none):
warnings.warn("Cannot set only one dataset to None: "
"setting all to None instead.",
UserWarning)
for s in self.sources.values():
s.set_data(None)
return
batch_info = np.zeros((len(self.dsetnames), 3), dtype=np.int)
for sname, source in self.sources.items():
dname = self.dset_for_source[sname]
if dname not in data:
warnings.warn(f"Dataset {dname} not provided in set_data")
continue
# Copy ensures annotations don't clobber
source.set_data(deepcopy(data[dname]))
# Update batch info
dset_index = self.dsetnames.index(dname)
batch_info[dset_index, :] = [
source.n_batches, source.batch_size, source.n_padding]
# Choose sensible default rate multiplier guesses:
# (1) Assume each free source produces just 1 event
for sname in self.sources:
if self.dset_for_source[sname] not in data:
# This dataset is not being updated, skip
continue
rmname = sname + '_rate_multiplier'
if rmname in self.param_names:
n_expected = self.mu(source_name=sname).numpy()
assert n_expected >= 0
self.param_defaults[rmname] = (
self.param_defaults[rmname] / n_expected)
# (2) If we still saw more events than expected, assume the
# first free source is responsible for all of this.
for dname, _data in data.items():
n_observed = len(_data)
n_expected = self.mu(dataset_name=dname).numpy()
assert n_expected > 0
if n_observed <= n_expected:
continue
for sname in self.sources_in_dset[dname]:
rmname = sname + '_rate_multiplier'
if rmname in self.param_names:
# Rate multiplier is set to value that produces
# one event, so we just have to multiply it:
self.param_defaults[rmname] *= 1 + n_observed - n_expected
break
self.batch_info = tf.convert_to_tensor(batch_info, dtype=fd.int_type())
# Build a big data tensor for each dataset.
# Each source has an [n_batches, batch_size, n_columns] tensor.
# Since the number of columns are different, we must concat along
# axis=2 and track which indices belong to which source.
self.data_tensors = {
dsetname: tf.concat(
[self.sources[sname].data_tensor
for sname in self.sources_in_dset[dsetname]],
axis=2)
for dsetname in self.dsetnames}
self.column_indices = dict()
for dsetname in self.dsetnames:
# Do not use len(cols_to_cache), some sources have extra columns...
stop_idx = np.cumsum([self.sources[sname].data_tensor.shape[2]
for sname in self.sources_in_dset[dsetname]])
self.column_indices[dsetname] = np.transpose([
np.concatenate([[0], stop_idx[:-1]]),
stop_idx])
[docs] def simulate(self, fix_truth=None, **params):
"""Simulate events from sources.
"""
params = self.prepare_params(params, free_all_rates=True)
# Collect Source event DFs in ds
ds = []
for sname, s in self.sources.items():
# mean number of events to simulate, rate mult times mu before
# efficiencies, the simulator deals with the efficiencies
rm = self._get_rate_mult(sname, params)
mu = rm * s.mu_before_efficiencies(
**self._filter_source_kwargs(params, sname))
# Simulate this many events from source
n_to_sim = np.random.poisson(mu)
if n_to_sim == 0:
continue
d = s.simulate(n_to_sim,
fix_truth=fix_truth,
**self._filter_source_kwargs(params,
sname))
# If events were simulated add them to the list
if len(d) > 0:
# Keep track of what source simulated which events
d['source'] = sname
ds.append(d)
# Concatenate results and shuffle them.
# Adding empty DataFrame ensures pd.concat doesn't fail if
# n_to_sim is 0 for all sources or all sources return 0 events
ds = pd.concat([pd.DataFrame()] + ds, sort=False)
return ds.sample(frac=1).reset_index(drop=True)
def __call__(self, **kwargs):
assert 'second_order' not in kwargs, 'Roep gewoon log_likelihood aan'
return self.log_likelihood(second_order=False, **kwargs)[0]
[docs] def log_likelihood(self, second_order=False,
omit_grads=tuple(), **kwargs):
params = self.prepare_params(kwargs)
n_grads = len(self.param_defaults) - len(omit_grads)
ll = 0.
llgrad = np.zeros(n_grads, dtype=np.float64)
llgrad2 = np.zeros((n_grads, n_grads), dtype=np.float64)
for dsetname in self.dsetnames:
# Getting this from the batch_info tensor is much slower
n_batches = self.sources[self.sources_in_dset[dsetname][0]].n_batches
for i_batch in range(n_batches):
# Iterating over tf.range seems much slower!
results = self._log_likelihood(
tf.constant(i_batch, dtype=fd.int_type()),
dsetname=dsetname,
data_tensor=self.data_tensors[dsetname][i_batch],
batch_info=self.batch_info,
omit_grads=omit_grads,
second_order=second_order,
**params)
ll += results[0].numpy().astype(np.float64)
if self.param_names:
llgrad += results[1].numpy().astype(np.float64)
if second_order:
llgrad2 += results[2].numpy().astype(np.float64)
if second_order:
return ll, llgrad, llgrad2
return ll, llgrad, None
[docs] def minus2_ll(self, *, omit_grads=tuple(), **kwargs):
result = self.log_likelihood(omit_grads=omit_grads, **kwargs)
ll, grad = result[:2]
hess = -2 * result[2] if result[2] is not None else None
return -2 * ll, -2 * grad, hess
[docs] def prepare_params(self, kwargs, free_all_rates=False):
for k in kwargs:
if k not in self.param_defaults:
if k.endswith('_rate_multiplier') and free_all_rates:
continue
raise ValueError(f"Unknown parameter {k}")
return {**self.param_defaults, **fd.values_to_constants(kwargs)}
[docs] def _get_rate_mult(self, sname, kwargs):
rmname = sname + '_rate_multiplier'
if rmname in self.param_names:
return kwargs.get(rmname, self.param_defaults[rmname])
return tf.constant(1., dtype=fd.float_type())
[docs] def _source_kwargnames(self, source_name):
"""Return parameter names that apply to source"""
return [pname for pname in self.param_names
if not pname.endswith('_rate_multiplier')
and pname in self.sources[source_name].defaults]
[docs] def _filter_source_kwargs(self, kwargs, source_name):
"""Return {param: value} dictionary with keyword arguments
for source, with values extracted from kwargs"""
return {pname: kwargs[pname]
for pname in self._source_kwargnames(source_name)}
[docs] def _param_i(self, pname):
"""Return index of parameter pname"""
return self.param_names.index(pname)
[docs] def mu(self, *,
source_name=None,
dataset_name=None,
**kwargs):
"""Return expected number of events
:param dataset_name: ... for just this dataset
:param source_name: ... for just this source.
You must provide either dsetname or source, since it makes no sense to
add events from multiple datasets
"""
kwargs = {**self.param_defaults, **kwargs}
if dataset_name is None and source_name is None:
raise ValueError("Provide either source or dataset name")
mu = tf.constant(0., dtype=fd.float_type())
for sname, s in self.sources.items():
if (dataset_name is not None
and self.dset_for_source[sname] != dataset_name):
continue
if source_name is not None and sname != source_name:
continue
mu += (self._get_rate_mult(sname, kwargs)
* self.mu_itps[sname](**self._filter_source_kwargs(kwargs, sname)))
return mu
[docs] @tf.function
def _log_likelihood(self,
i_batch, dsetname, data_tensor, batch_info,
omit_grads=tuple(), second_order=False, **params):
# Stack the params to create a single node
# to differentiate with respect to.
grad_par_stack = tf.stack([
params[k] for k in self.param_names
if k not in omit_grads])
# Retrieve individual params from the stacked node,
# then add back the params we do not differentiate w.r.t.
params_unstacked = dict(zip(
[x for x in self.param_names if x not in omit_grads],
tf.unstack(grad_par_stack)))
for k in omit_grads:
params_unstacked[k] = params[k]
# Forward computation
ll = self._log_likelihood_inner(
i_batch, params_unstacked, dsetname, data_tensor, batch_info)
# Autodifferentiation. This is why we use tensorflow:
grad = tf.gradients(ll, grad_par_stack)[0]
if second_order:
return ll, grad, tf.hessians(ll, grad_par_stack)[0]
return ll, grad, None
[docs] def _log_likelihood_inner(self, i_batch, params,
dsetname, data_tensor, batch_info):
"""Return log likelihood contribution of one batch in a dataset
This loops over sources in the dataset and events in the batch,
but not not over datasets or batches.
"""
# Retrieve batching info. Cannot use tuple-unpacking, tensorflow
# doesn't like it when you iterate over tenstors
dataset_index = self.dsetnames.index(dsetname)
n_batches = batch_info[dataset_index, 0]
batch_size = batch_info[dataset_index, 1]
n_padding = batch_info[dataset_index, 2]
# Compute differential rates from all sources
# drs = list[n_sources] of [n_events] tensors
drs = tf.zeros((batch_size,), dtype=fd.float_type())
for source_i, sname in enumerate(self.sources_in_dset[dsetname]):
s = self.sources[sname]
rate_mult = self._get_rate_mult(sname, params)
col_start, col_stop = self.column_indices[dsetname][source_i]
dr = s.differential_rate(
data_tensor[:, col_start:col_stop],
# We are already tracing; if we call the traced function here
# it breaks the Hessian (it will give NaNs)
autograph=False,
**self._filter_source_kwargs(params, sname))
drs += dr * rate_mult
# Sum over events and remove padding
n = tf.where(tf.equal(i_batch, n_batches - 1),
batch_size - n_padding,
batch_size)
ll = tf.reduce_sum(tf.math.log(drs[:n]))
# Add mu once (to the first batch)
# and constraint really only once (to first batch of first dataset)
ll += tf.where(tf.equal(i_batch, tf.constant(0, dtype=fd.int_type())),
- self.mu(dataset_name=dsetname, **params)
+ (self.log_constraint(**params)
if dsetname == self.dsetnames[0] else 0.),
0.)
return ll
[docs] def guess(self) -> ty.Dict[str, float]:
"""Return dictionary of parameter guesses"""
return {k: v.numpy()
for k, v in self.param_defaults.items()}
[docs] def params_to_dict(self, values):
"""Return parameter {name: value} dictionary"""
return {k: v for k, v in zip(self.param_names, tf.unstack(values))}
[docs] def bestfit(self,
guess=None,
fix=None,
bounds=None,
optimizer='scipy',
get_lowlevel_result=False,
get_history=False,
use_hessian=True,
return_errors=False,
nan_val=float('inf'),
optimizer_kwargs=None,
allow_failure=False):
"""Return best-fit parameter dict
:param guess: Guess parameters: dict {param: guess} of guesses to use.
Any omitted parameters will be guessed at LogLikelihood.defaults()
:param fix: dict {param: value} of parameters to keep fixed
during the minimzation.
:param optimizer: 'tf', 'minuit' or 'scipy'
:param get_lowlevel_result: Returns the full optimizer result instead
of the best fit parameters. Bool.
:param get_history: Returns the history of optimizer calls instead
of the best fit parameters. Bool.
:param use_hessian: If True, uses flamedisxs' exact Hessian
in the optimizer. Otherwise, most optimizers estimate it by finite-
difference calculations.
:param return_errors: If using the minuit minimizer, instead return
a 2-tuple of (bestfit dict, error dict).
If the optimizer is minuit, you can also pass 'hesse' or 'minos'.
:param allow_failure: If True, raise a warning instead of an exception
if there is an optimizer failure.
"""
if bounds is None:
bounds = dict()
if guess is None:
guess = dict()
if not isinstance(guess, dict):
raise ValueError("Must specify bestfit guess as a dictionary")
# Check the likelihood has a finite value and gradient before starting
val, grad, hess = self.log_likelihood(**guess,
second_order=use_hessian)
if not np.isfinite(val):
raise ValueError("The likelihood is - infinity at your guess, "
"please guess better, remove outlier events, or "
"add a fallback background model.")
if not np.all(np.isfinite(grad)):
raise ValueError("The likelihood is finite at your guess, "
"but the gradient is not. Are you starting at a "
"cusp?")
if use_hessian:
if hess is None:
raise RuntimeError("Likelihood did't provide Hessian!")
if not np.all(np.isfinite(hess)):
raise ValueError("The likelihood and gradient are finite at "
"your guess, but the Hessian is not. "
"Are you starting at an unusual point? "
"You could also try use_hessian=False.")
opt = fd.SUPPORTED_OPTIMIZERS[optimizer]
res = opt(
lf=self,
guess={**self.guess(), **guess},
fix=fix,
bounds={**self.default_bounds, **bounds},
nan_val=nan_val,
get_lowlevel_result=get_lowlevel_result,
get_history=get_history,
use_hessian=use_hessian,
return_errors=return_errors,
optimizer_kwargs=optimizer_kwargs,
allow_failure=allow_failure,
).minimize()
if get_lowlevel_result or get_history:
return res
# TODO: This is to deal with a minuit-specific convention,
# should either put this to minuit or force others into same mold.
names = self.param_names
result, errors = (
{k: v for k, v in res.items() if k in names},
{k: v for k, v in res.items() if k.startswith('error_')})
if return_errors:
# Filter out errors and return separately
return result, errors
return result
[docs] def interval(self, parameter, **kwargs):
"""Return central confidence interval on parameter.
Options are the same as for limit."""
kwargs.setdefault('kind', 'central')
return self.limit(parameter, **kwargs)
[docs] def limit(
self,
parameter,
bestfit=None,
guess=None,
fix=None,
bounds=None,
confidence_level=0.9,
kind='upper',
sigma_guess=None,
t_ppf=None,
t_ppf_grad=None,
t_ppf_hess=None,
optimizer='scipy',
get_history=False,
get_lowlevel_result=False,
# Set so 90% CL intervals actually report ~90.25% intervals
# asymptotically due to the tilt... if a pen-and-paper computation
# Jelle did a long time ago is actually correct
tilt_overshoot=0.037,
optimizer_kwargs=None,
use_hessian=True,
allow_failure=False,
):
"""Return frequentist limit or confidence interval.
Returns a float (for upper or lower limits) or a 2-tuple of floats
(for a central interval)
:param parameter: string, the parameter to set the interval on
:param bestfit: {parameter: value} dictionary, global best-fit.
If omitted, will compute it using bestfit.
:param guess: {param: value} guess of the result, or None.
If omitted, nuisance parameters will be guessed equal to bestfit.
If omitted, guess for target parameters will be based on asymptotic
parabolic computation.
:param fix: {param: value} to fix during interval computation.
Result is only valid if same parameters were fixed for bestfit.
:param confidence_level: Requried confidence level of the interval
:param kind: Type of interval, 'upper', 'lower' or 'central'
:param sigma_guess: Guess for one sigma uncertainty on the target
parameter. If not provided, will be computed from Hessian.
:param t_ppf: returns critical value as function of parameter
Use Wilks' theorem if omitted.
:param t_ppf_grad: return derivative of t_ppf
:param t_ppf_hess: return second derivative of t_ppf
:param tilt_overshoot: Set tilt so the limit's log likelihood will
overshoot the target value by roughly this much.
:param optimizer_kwargs: dict of additional arguments for optimizer
:param allow_failure: If True, raise a warning instead of an exception
if there is an optimizer failure.
:param use_hessian: If True, uses flamedisxs' exact Hessian
in the optimizer. Otherwise, most optimizers estimate it by finite-
difference calculations.
"""
if optimizer_kwargs is None:
optimizer_kwargs = dict()
if bounds is None:
bounds = dict()
if bestfit is None:
# Determine global bestfit
if optimizer == 'nlin':
# This optimizer is only for interval setting.
# Use scipy to get best-fit first
bestfit = self.bestfit(fix=fix, optimizer='scipy')
else:
bestfit = self.bestfit(fix=fix, optimizer=optimizer)
lower_bound = None
if parameter.endswith('rate_multiplier'):
lower_bound = fd.LOWER_RATE_MULTIPLIER_BOUND
# Set (bound, critical_quantile) for the desired kind of limit
if kind == 'upper':
requested_limits = [
dict(bound=(bestfit[parameter], None),
crit=confidence_level,
direction=1,
guess=guess)]
elif kind == 'lower':
requested_limits = [
dict(bound=(lower_bound, bestfit[parameter]),
crit=1 - confidence_level,
direction=-1,
guess=guess)]
elif kind == 'central':
if guess is None:
guess = (None, None)
elif not isinstance(guess, tuple) or not len(guess) == 2:
raise ValueError("Guess for central interval must be a 2-tuple")
requested_limits = [
dict(bound=(lower_bound, bestfit[parameter]),
crit=(1 - confidence_level) / 2,
direction=-1,
guess=guess[0]),
dict(bound=(bestfit[parameter], None),
direction=+1,
crit=1 - (1 - confidence_level) / 2,
guess=guess[1])]
else:
raise ValueError(f"kind must be upper/lower/central but is {kind}")
result = []
for req in requested_limits:
opt = fd.SUPPORTED_INTERVAL_OPTIMIZERS[optimizer]
res = opt(
# To generic objective
lf=self,
guess=req['guess'],
fix=fix,
bounds={
**self.default_bounds,
parameter: req['bound'],
**bounds},
# TODO: nan_val
get_lowlevel_result=get_lowlevel_result,
get_history=get_history,
use_hessian=use_hessian,
optimizer_kwargs=optimizer_kwargs,
allow_failure=allow_failure,
# To IntervalObjective
target_parameter=parameter,
bestfit=bestfit,
direction=req['direction'],
critical_quantile=req['crit'],
tilt_overshoot=tilt_overshoot,
sigma_guess=sigma_guess,
t_ppf=t_ppf,
t_ppf_grad=t_ppf_grad,
t_ppf_hess=t_ppf_hess,
).minimize()
if get_lowlevel_result or get_history:
result.append(res)
else:
result.append(res[parameter])
if len(result) == 1:
return result[0]
return result
[docs] def inverse_hessian(self, params, omit_grads=tuple()):
"""Return inverse hessian (square tensor)
of -2 log_likelihood at params
"""
# Also Tensorflow has tf.hessians, but:
# https://github.com/tensorflow/tensorflow/issues/29781
# Get second order derivatives of likelihood at params
_, _, grad2_ll = self.log_likelihood(**params,
omit_grads=omit_grads,
second_order=True)
return np.linalg.inv(-2 * grad2_ll)
[docs] def summary(self, bestfit=None, fix=None, guess=None,
inverse_hessian=None, precision=3):
"""Print summary information about best fit"""
if fix is None:
fix = dict()
if bestfit is None:
bestfit = self.bestfit(guess=guess, fix=fix)
params = {**bestfit, **fix}
if inverse_hessian is None:
inverse_hessian = self.inverse_hessian(
params,
omit_grads=tuple(fix.keys()))
stderr, cov = cov_to_std(inverse_hessian)
var_par_i = 0
for i, pname in enumerate(self.param_names):
if pname in fix:
print("{pname}: {x:.{precision}g} (fixed)".format(
pname=pname, x=fix[pname], precision=precision))
else:
template = "{pname}: {x:.{precision}g} +- {xerr:.{precision}g}"
print(template.format(
pname=pname,
x=bestfit[pname],
xerr=stderr[var_par_i],
precision=precision))
var_par_i += 1
var_pars = [x for x in self.param_names if x not in fix]
df = pd.DataFrame(
{p1: {p2: cov[i1, i2]
for i2, p2 in enumerate(var_pars)}
for i1, p1 in enumerate(var_pars)},
columns=var_pars)
# Get rows in the correct order
df['index'] = [var_pars.index(x)
for x in df.index.values]
df = df.sort_values(by='index')
del df['index']
print("Correlation matrix:")
pd.set_option('precision', 3)
print(df)
pd.reset_option('precision')
[docs]@export
def cov_to_std(cov):
"""Return (std errors, correlation coefficent matrix)
given covariance matrix cov
"""
std_errs = np.diag(cov) ** 0.5
corr = cov * np.outer(1 / std_errs, 1 / std_errs)
return std_errs, corr