"""Miscellaneous functions related to model import, independent of any specific
model format"""
from typing import (
Dict, Union, Optional, Callable, Sequence, Tuple, Iterable, Any
)
import sympy as sp
import enum
import itertools as itt
from toposort import toposort
from sympy.logic.boolalg import BooleanAtom
from sympy.functions.elementary.piecewise import ExprCondPair
SymbolDef = Dict[sp.Symbol, Union[Dict[str, sp.Expr], sp.Expr]]
[docs]def noise_distribution_to_cost_function(
noise_distribution: Union[str, Callable]
) -> Callable[[str], str]:
"""
Parse noise distribution string to a cost function definition amici can
work with.
The noise distributions listed in the following are supported. :math:`m`
denotes the measurement, :math:`y` the simulation, and :math:`\\sigma` a
distribution scale parameter
(currently, AMICI only supports a single distribution parameter).
- `'normal'`, `'lin-normal'`: A normal distribution:
.. math::
\\pi(m|y,\\sigma) = \\frac{1}{\\sqrt{2\\pi}\\sigma}\\
exp\\left(-\\frac{(m-y)^2}{2\\sigma^2}\\right)
- `'log-normal'`: A log-normal distribution (i.e. log(m) is
normally distributed):
.. math::
\\pi(m|y,\\sigma) = \\frac{1}{\\sqrt{2\\pi}\\sigma m}\\
exp\\left(-\\frac{(\\log m - \\log y)^2}{2\\sigma^2}\\right)
- `'log10-normal'`: A log10-normal distribution (i.e. log10(m) is
normally distributed):
.. math::
\\pi(m|y,\\sigma) = \\frac{1}{\\sqrt{2\\pi}\\sigma m \\log(10)}\\
exp\\left(-\\frac{(\\log_{10} m - \\log_{10} y)^2}{2\\sigma^2}\\right)
- `'laplace'`, `'lin-laplace'`: A laplace distribution:
.. math::
\\pi(m|y,\\sigma) = \\frac{1}{2\\sigma}
\\exp\\left(-\\frac{|m-y|}{\\sigma}\\right)
- `'log-laplace'`: A log-Laplace distribution (i.e. log(m) is Laplace
distributed):
.. math::
\\pi(m|y,\\sigma) = \\frac{1}{2\\sigma m}
\\exp\\left(-\\frac{|\\log m - \\log y|}{\\sigma}\\right)
- `'log10-laplace'`: A log10-Laplace distribution (i.e. log10(m) is
Laplace distributed):
.. math::
\\pi(m|y,\\sigma) = \\frac{1}{2\\sigma m \\log(10)}
\\exp\\left(-\\frac{|\\log_{10} m - \\log_{10} y|}{\\sigma}\\right)
- `'binomial'`, `'lin-binomial'`: A (continuation of a discrete) binomial
distribution, parameterized via the success probability
:math:`p=\\sigma`:
.. math::
\\pi(m|y,\\sigma) = \\operatorname{Heaviside}(y-m) \\cdot
\\frac{\\Gamma(y+1)}{\\Gamma(m+1) \\Gamma(y-m+1)}
\\sigma^m (1-\\sigma)^{(y-m)}
- `'negative-binomial'`, `'lin-negative-binomial'`: A (continuation of a
discrete) negative binomial distribution, with with `mean = y`,
parameterized via success probability `p`:
.. math::
\\pi(m|y,\\sigma) = \\frac{\\Gamma(m+r)}{\\Gamma(m+1) \\Gamma(r)}
(1-\\sigma)^m \\sigma^r
where
.. math::
r = \\frac{1-\\sigma}{\\sigma} y
The distributions above are for a single data point.
For a collection :math:`D=\\{m_i\\}_i` of data points and corresponding
simulations :math:`Y=\\{y_i\\}_i` and noise parameters
:math:`\\Sigma=\\{\\sigma_i\\}_i`, AMICI assumes independence,
i.e. the full distributions is
.. math::
\\pi(D|Y,\\Sigma) = \\prod_i\\pi(m_i|y_i,\\sigma_i)
AMICI uses the logarithm :math:`\\log(\\pi(m|y,\\sigma)`.
In addition to the above mentioned distributions, it is also possible to
pass a function taking a symbol string and returning a log-distribution
string with variables '{str_symbol}', 'm{str_symbol}', 'sigma{str_symbol}'
for y, m, sigma, respectively.
:param noise_distribution: An identifier specifying a noise model.
Possible values are
{`'normal'`, `'lin-normal'`, `'log-normal'`, `'log10-normal'`,
`'laplace'`, `'lin-laplace'`, `'log-laplace'`, `'log10-laplace'`,
`'binomial'`, `'lin-binomial'`, `'negative-binomial'`,
`'lin-negative-binomial'`, `<Callable>`}
For the meaning of the values see above.
:return: A function that takes a strSymbol and then creates a cost
function string (negative log-likelihood) from it, which can be
sympified.
"""
if isinstance(noise_distribution, Callable):
return noise_distribution
if noise_distribution in ['normal', 'lin-normal']:
y_string = '0.5*log(2*pi*{sigma}**2) + 0.5*(({y} - {m}) / {sigma})**2'
elif noise_distribution == 'log-normal':
y_string = '0.5*log(2*pi*{sigma}**2*{m}**2) ' \
'+ 0.5*((log({y}) - log({m})) / {sigma})**2'
elif noise_distribution == 'log10-normal':
y_string = '0.5*log(2*pi*{sigma}**2*{m}**2*log(10)**2) ' \
'+ 0.5*((log({y}, 10) - log({m}, 10)) / {sigma})**2'
elif noise_distribution in ['laplace', 'lin-laplace']:
y_string = 'log(2*{sigma}) + Abs({y} - {m}) / {sigma}'
elif noise_distribution == 'log-laplace':
y_string = 'log(2*{sigma}*{m}) + Abs(log({y}) - log({m})) / {sigma}'
elif noise_distribution == 'log10-laplace':
y_string = 'log(2*{sigma}*{m}*log(10)) ' \
'+ Abs(log({y}, 10) - log({m}, 10)) / {sigma}'
elif noise_distribution in ['binomial', 'lin-binomial']:
# Binomial noise model parameterized via success probability p
y_string = '- log(Heaviside({y} - {m})) - loggamma({y}+1) ' \
'+ loggamma({m}+1) + loggamma({y}-{m}+1) ' \
'- {m} * log({sigma}) - ({y} - {m}) * log(1-{sigma})'
elif noise_distribution in ['negative-binomial', 'lin-negative-binomial']:
# Negative binomial noise model of the number of successes m
# (data) before r=(1-sigma)/sigma * y failures occur,
# with mean number of successes y (simulation),
# parameterized via success probability p = sigma.
r = '{y} * (1-{sigma}) / {sigma}'
y_string = f'- loggamma({{m}}+{r}) + loggamma({{m}}+1) ' \
f'+ loggamma({r}) - {r} * log(1-{{sigma}}) ' \
f'- {{m}} * log({{sigma}})'
else:
raise ValueError(
f"Cost identifier {noise_distribution} not recognized.")
def nllh_y_string(str_symbol):
y, m, sigma = _get_str_symbol_identifiers(str_symbol)
return y_string.format(y=y, m=m, sigma=sigma)
return nllh_y_string
def _get_str_symbol_identifiers(str_symbol: str) -> tuple:
"""Get identifiers for simulation, measurement, and sigma."""
y, m, sigma = f"{str_symbol}", f"m{str_symbol}", f"sigma{str_symbol}"
return y, m, sigma
[docs]def smart_subs_dict(sym: sp.Expr,
subs: SymbolDef,
field: Optional[str] = None,
reverse: bool = True) -> sp.Expr:
"""
Subsitutes expressions completely flattening them out. Requires
sorting of expressions with toposort.
:param sym:
Symbolic expression in which expressions will be substituted
:param subs:
Substitutions
:param field:
Field of substitution expressions in subs.values(), if applicable
:param reverse:
Whether ordering in subs should be reversed. Note that substitution
requires the reverse order of what is required for evaluation.
:return:
Substituted symbolic expression
"""
s = [
(eid, expr[field] if field is not None else expr)
for eid, expr in subs.items()
]
if reverse:
s.reverse()
for substitution in s:
# note that substitution may change free symbols, so we have to do
# this recursively
if substitution[0] in sym.free_symbols:
sym = sym.subs(*substitution)
return sym
[docs]def smart_subs(element: sp.Expr, old: sp.Symbol, new: sp.Expr) -> sp.Expr:
"""
Optimized substitution that checks whether anything needs to be done first
:param element:
substitution target
:param old:
to be substituted
:param new:
subsitution value
:return:
substituted expression
"""
if old in element.free_symbols:
return element.subs(old, new)
return element
[docs]def toposort_symbols(symbols: SymbolDef,
field: Optional[str] = None) -> SymbolDef:
"""
Topologically sort symbol definitions according to their interdependency
:param symbols:
symbol definitions
:param field:
field of definition.values() that is used to compute interdependency
:return:
ordered symbol definitions
"""
sorted_symbols = toposort({
identifier: {
s for s in (
definition[field] if field is not None else definition
).free_symbols
if s in symbols
}
for identifier, definition
in symbols.items()
})
return {
s: symbols[s]
for symbol_group in sorted_symbols
for s in symbol_group
}
def _parse_special_functions(sym: sp.Expr, toplevel: bool = True) -> sp.Expr:
"""
Recursively checks the symbolic expression for functions which have be
to parsed in a special way, such as piecewise functions
:param sym:
symbolic expressions
:param toplevel:
as this is called recursively, are we in the top level expression?
"""
args = tuple(arg if arg.__class__.__name__ == 'piecewise'
and sym.__class__.__name__ == 'piecewise'
else _parse_special_functions(arg, False)
for arg in sym.args)
fun_mappings = {
'times': sp.Mul,
'xor': sp.Xor,
'abs': sp.Abs,
'min': sp.Min,
'max': sp.Max,
'ceil': sp.functions.ceiling,
'floor': sp.functions.floor,
'factorial': sp.functions.factorial,
'arcsin': sp.functions.asin,
'arccos': sp.functions.acos,
'arctan': sp.functions.atan,
'arccot': sp.functions.acot,
'arcsec': sp.functions.asec,
'arccsc': sp.functions.acsc,
'arcsinh': sp.functions.asinh,
'arccosh': sp.functions.acosh,
'arctanh': sp.functions.atanh,
'arccoth': sp.functions.acoth,
'arcsech': sp.functions.asech,
'arccsch': sp.functions.acsch,
}
if sym.__class__.__name__ in fun_mappings:
# c++ doesnt like mixing int and double for arguments of those
# functions
if sym.__class__.__name__ in ['min', 'max']:
args = tuple(sp.Float(arg) if arg.is_number else arg
for arg in args)
return fun_mappings[sym.__class__.__name__](*args)
elif sym.__class__.__name__ == 'piecewise' \
or isinstance(sym, sp.Piecewise):
if isinstance(sym, sp.Piecewise):
# this is sympy piecewise, can't be nested
denested_args = args
else:
# this is sbml piecewise, can be nested
denested_args = _denest_piecewise(args)
return _parse_piecewise_to_heaviside(denested_args)
if sym.__class__.__name__ == 'plus' and not sym.args:
return sp.Float(0.0)
if isinstance(sym, (sp.Function, sp.Mul, sp.Add, sp.Pow)):
sym._args = args
elif toplevel and isinstance(sym, BooleanAtom):
# Replace boolean constants by numbers so they can be differentiated
# must not replace in Piecewise function. Therefore, we only replace
# it the complete expression consists only of a Boolean value.
sym = sp.Float(int(bool(sym)))
return sym
def _denest_piecewise(
args: Sequence[Union[sp.Expr, sp.logic.boolalg.Boolean, bool]]
) -> Tuple[Union[sp.Expr, sp.logic.boolalg.Boolean, bool]]:
"""
Denest piecewise functions that contain piecewise as condition
:param args:
Arguments to the piecewise function
:return:
Arguments where conditions no longer contain piecewise functions and
the conditional dependency is flattened out
"""
args_out = []
for coeff, cond in grouper(args, 2, True):
# handling of this case is explicitely disabled in
# _parse_special_functions as keeping track of coeff/cond
# arguments is tricky. Simpler to just parse them out here
if coeff.__class__.__name__ == 'piecewise':
coeff = _parse_special_functions(coeff, False)
# we can have conditions that are piecewise function
# returning True or False
if cond.__class__.__name__ == 'piecewise':
# this keeps track of conditional that the previous
# piece was picked
previous_was_picked = sp.false
# recursively denest those first
for sub_coeff, sub_cond in grouper(
_denest_piecewise(cond.args), 2, True
):
# flatten the individual pieces
pick_this = sp.And(
sp.Not(previous_was_picked), sub_cond
)
if sub_coeff == sp.true:
args_out.extend([coeff, pick_this])
previous_was_picked = pick_this
else:
args_out.extend([coeff, cond])
# cut off last condition as that's the default
return tuple(args_out[:-1])
def _parse_piecewise_to_heaviside(args: Iterable[sp.Expr]) -> sp.Expr:
"""
Piecewise functions cannot be transformed into C++ right away, but AMICI
has a special interface for Heaviside functions, so we transform them.
:param args:
symbolic expressions for arguments of the piecewise function
"""
# how many condition-expression pairs will we have?
formula = sp.Float(0.0)
not_condition = sp.Float(1.0)
if all(isinstance(arg, ExprCondPair) for arg in args):
# sympy piecewise
grouped_args = args
else:
# smbl piecewise
grouped_args = grouper(args, 2, True)
for coeff, trigger in grouped_args:
if isinstance(coeff, BooleanAtom):
coeff = sp.Float(int(bool(coeff)))
if trigger == sp.true:
return formula + coeff * not_condition
if trigger == sp.false:
continue
tmp = _parse_heaviside_trigger(trigger)
formula += coeff * sp.simplify(not_condition * tmp)
not_condition *= (1-tmp)
return formula
def _parse_heaviside_trigger(trigger: sp.Expr) -> sp.Expr:
"""
Recursively translates a boolean trigger function into a real valued
root function
:param trigger:
:return: real valued root function expression
"""
if trigger.is_Relational:
root = trigger.args[0] - trigger.args[1]
_check_unsupported_functions(root, 'sympy.Expression')
# normalize such that we always implement <,
# this ensures that we can correctly evaluate the condition if
# simulation starts at H(0). This is achieved by translating
# conditionals into Heaviside functions H that is implemented as unit
# step with H(0) = 1
if isinstance(trigger, sp.core.relational.StrictLessThan):
# x < y => x - y < 0 => r < 0
return 1 - sp.Heaviside(root)
if isinstance(trigger, sp.core.relational.LessThan):
# x <= y => not(y < x) => not(y - x < 0) => not -r < 0
return sp.Heaviside(-root)
if isinstance(trigger, sp.core.relational.StrictGreaterThan):
# y > x => y - x < 0 => -r < 0
return 1 - sp.Heaviside(-root)
if isinstance(trigger, sp.core.relational.GreaterThan):
# y >= x => not(x < y) => not(x - y < 0) => not r < 0
return sp.Heaviside(root)
# or(x,y) = not(and(not(x),not(y))
if isinstance(trigger, sp.Or):
return 1-sp.Mul(*[1-_parse_heaviside_trigger(arg)
for arg in trigger.args])
if isinstance(trigger, sp.And):
return sp.Mul(*[_parse_heaviside_trigger(arg)
for arg in trigger.args])
raise RuntimeError(
'AMICI can not parse piecewise/event trigger functions with argument '
f'{trigger}.'
)
[docs]def grouper(iterable: Iterable, n: int,
fillvalue: Any = None) -> Iterable[Tuple[Any]]:
"""
Collect data into fixed-length chunks or blocks
grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
:param iterable:
any iterable
:param n:
chunk length
:param fillvalue:
padding for last chunk if length < n
:return: itertools.zip_longest of requested chunks
"""
args = [iter(iterable)] * n
return itt.zip_longest(*args, fillvalue=fillvalue)
def _check_unsupported_functions(sym: sp.Expr,
expression_type: str,
full_sym: Optional[sp.Expr] = None):
"""
Recursively checks the symbolic expression for unsupported symbolic
functions
:param sym:
symbolic expressions
:param expression_type:
type of expression, only used when throwing errors
:param full sym:
outermost symbolic expression in recursive checks, only used for errors
"""
if full_sym is None:
full_sym = sym
# note that sp.functions.factorial, sp.functions.ceiling,
# sp.functions.floor applied to numbers should be simplified out and
# thus pass this test
unsupported_functions = (
sp.functions.factorial, sp.functions.ceiling, sp.functions.floor,
sp.functions.sec, sp.functions.csc, sp.functions.cot,
sp.functions.asec, sp.functions.acsc, sp.functions.acot,
sp.functions.acsch, sp.functions.acoth,
sp.Mod, sp.core.function.UndefinedFunction
)
if isinstance(sym.func, unsupported_functions) \
or isinstance(sym, unsupported_functions):
raise RuntimeError(f'Encountered unsupported expression '
f'"{sym.func}" of type '
f'"{type(sym.func)}" as part of a '
f'{expression_type}: "{full_sym}"!')
for arg in list(sym.args):
_check_unsupported_functions(arg, expression_type)