Source code for amici.import_utils

"""Miscellaneous functions related to model import, independent of any specific
 model format"""

from typing import (
    Dict, Union, Optional, Callable, Sequence, Tuple, Iterable, Any
)
import sympy as sp
import enum
import itertools as itt

from toposort import toposort
from sympy.logic.boolalg import BooleanAtom
from sympy.functions.elementary.piecewise import ExprCondPair

SymbolDef = Dict[sp.Symbol, Union[Dict[str, sp.Expr], sp.Expr]]


[docs]class ObservableTransformation(str, enum.Enum): """ Different modes of observable transformation. """ LOG10 = 'log10' LOG = 'log' LIN = 'lin'
[docs]def noise_distribution_to_observable_transformation( noise_distribution: Union[str, Callable] ) -> ObservableTransformation: """ Parse noise distribution string and extract observable transformation :param noise_distribution: see :func:`noise_distribution_to_cost_function` :return: observable transformation """ if isinstance(noise_distribution, str): if noise_distribution.startswith('log-'): return ObservableTransformation.LOG if noise_distribution.startswith('log10-'): return ObservableTransformation.LOG10 return ObservableTransformation.LIN
[docs]def noise_distribution_to_cost_function( noise_distribution: Union[str, Callable] ) -> Callable[[str], str]: """ Parse noise distribution string to a cost function definition amici can work with. The noise distributions listed in the following are supported. :math:`m` denotes the measurement, :math:`y` the simulation, and :math:`\\sigma` a distribution scale parameter (currently, AMICI only supports a single distribution parameter). - `'normal'`, `'lin-normal'`: A normal distribution: .. math:: \\pi(m|y,\\sigma) = \\frac{1}{\\sqrt{2\\pi}\\sigma}\\ exp\\left(-\\frac{(m-y)^2}{2\\sigma^2}\\right) - `'log-normal'`: A log-normal distribution (i.e. log(m) is normally distributed): .. math:: \\pi(m|y,\\sigma) = \\frac{1}{\\sqrt{2\\pi}\\sigma m}\\ exp\\left(-\\frac{(\\log m - \\log y)^2}{2\\sigma^2}\\right) - `'log10-normal'`: A log10-normal distribution (i.e. log10(m) is normally distributed): .. math:: \\pi(m|y,\\sigma) = \\frac{1}{\\sqrt{2\\pi}\\sigma m \\log(10)}\\ exp\\left(-\\frac{(\\log_{10} m - \\log_{10} y)^2}{2\\sigma^2}\\right) - `'laplace'`, `'lin-laplace'`: A laplace distribution: .. math:: \\pi(m|y,\\sigma) = \\frac{1}{2\\sigma} \\exp\\left(-\\frac{|m-y|}{\\sigma}\\right) - `'log-laplace'`: A log-Laplace distribution (i.e. log(m) is Laplace distributed): .. math:: \\pi(m|y,\\sigma) = \\frac{1}{2\\sigma m} \\exp\\left(-\\frac{|\\log m - \\log y|}{\\sigma}\\right) - `'log10-laplace'`: A log10-Laplace distribution (i.e. log10(m) is Laplace distributed): .. math:: \\pi(m|y,\\sigma) = \\frac{1}{2\\sigma m \\log(10)} \\exp\\left(-\\frac{|\\log_{10} m - \\log_{10} y|}{\\sigma}\\right) - `'binomial'`, `'lin-binomial'`: A (continuation of a discrete) binomial distribution, parameterized via the success probability :math:`p=\\sigma`: .. math:: \\pi(m|y,\\sigma) = \\operatorname{Heaviside}(y-m) \\cdot \\frac{\\Gamma(y+1)}{\\Gamma(m+1) \\Gamma(y-m+1)} \\sigma^m (1-\\sigma)^{(y-m)} - `'negative-binomial'`, `'lin-negative-binomial'`: A (continuation of a discrete) negative binomial distribution, with with `mean = y`, parameterized via success probability `p`: .. math:: \\pi(m|y,\\sigma) = \\frac{\\Gamma(m+r)}{\\Gamma(m+1) \\Gamma(r)} (1-\\sigma)^m \\sigma^r where .. math:: r = \\frac{1-\\sigma}{\\sigma} y The distributions above are for a single data point. For a collection :math:`D=\\{m_i\\}_i` of data points and corresponding simulations :math:`Y=\\{y_i\\}_i` and noise parameters :math:`\\Sigma=\\{\\sigma_i\\}_i`, AMICI assumes independence, i.e. the full distributions is .. math:: \\pi(D|Y,\\Sigma) = \\prod_i\\pi(m_i|y_i,\\sigma_i) AMICI uses the logarithm :math:`\\log(\\pi(m|y,\\sigma)`. In addition to the above mentioned distributions, it is also possible to pass a function taking a symbol string and returning a log-distribution string with variables '{str_symbol}', 'm{str_symbol}', 'sigma{str_symbol}' for y, m, sigma, respectively. :param noise_distribution: An identifier specifying a noise model. Possible values are {`'normal'`, `'lin-normal'`, `'log-normal'`, `'log10-normal'`, `'laplace'`, `'lin-laplace'`, `'log-laplace'`, `'log10-laplace'`, `'binomial'`, `'lin-binomial'`, `'negative-binomial'`, `'lin-negative-binomial'`, `<Callable>`} For the meaning of the values see above. :return: A function that takes a strSymbol and then creates a cost function string (negative log-likelihood) from it, which can be sympified. """ if isinstance(noise_distribution, Callable): return noise_distribution if noise_distribution in ['normal', 'lin-normal']: y_string = '0.5*log(2*pi*{sigma}**2) + 0.5*(({y} - {m}) / {sigma})**2' elif noise_distribution == 'log-normal': y_string = '0.5*log(2*pi*{sigma}**2*{m}**2) ' \ '+ 0.5*((log({y}) - log({m})) / {sigma})**2' elif noise_distribution == 'log10-normal': y_string = '0.5*log(2*pi*{sigma}**2*{m}**2*log(10)**2) ' \ '+ 0.5*((log({y}, 10) - log({m}, 10)) / {sigma})**2' elif noise_distribution in ['laplace', 'lin-laplace']: y_string = 'log(2*{sigma}) + Abs({y} - {m}) / {sigma}' elif noise_distribution == 'log-laplace': y_string = 'log(2*{sigma}*{m}) + Abs(log({y}) - log({m})) / {sigma}' elif noise_distribution == 'log10-laplace': y_string = 'log(2*{sigma}*{m}*log(10)) ' \ '+ Abs(log({y}, 10) - log({m}, 10)) / {sigma}' elif noise_distribution in ['binomial', 'lin-binomial']: # Binomial noise model parameterized via success probability p y_string = '- log(Heaviside({y} - {m})) - loggamma({y}+1) ' \ '+ loggamma({m}+1) + loggamma({y}-{m}+1) ' \ '- {m} * log({sigma}) - ({y} - {m}) * log(1-{sigma})' elif noise_distribution in ['negative-binomial', 'lin-negative-binomial']: # Negative binomial noise model of the number of successes m # (data) before r=(1-sigma)/sigma * y failures occur, # with mean number of successes y (simulation), # parameterized via success probability p = sigma. r = '{y} * (1-{sigma}) / {sigma}' y_string = f'- loggamma({{m}}+{r}) + loggamma({{m}}+1) ' \ f'+ loggamma({r}) - {r} * log(1-{{sigma}}) ' \ f'- {{m}} * log({{sigma}})' else: raise ValueError( f"Cost identifier {noise_distribution} not recognized.") def nllh_y_string(str_symbol): y, m, sigma = _get_str_symbol_identifiers(str_symbol) return y_string.format(y=y, m=m, sigma=sigma) return nllh_y_string
def _get_str_symbol_identifiers(str_symbol: str) -> tuple: """Get identifiers for simulation, measurement, and sigma.""" y, m, sigma = f"{str_symbol}", f"m{str_symbol}", f"sigma{str_symbol}" return y, m, sigma
[docs]def smart_subs_dict(sym: sp.Expr, subs: SymbolDef, field: Optional[str] = None, reverse: bool = True) -> sp.Expr: """ Subsitutes expressions completely flattening them out. Requires sorting of expressions with toposort. :param sym: Symbolic expression in which expressions will be substituted :param subs: Substitutions :param field: Field of substitution expressions in subs.values(), if applicable :param reverse: Whether ordering in subs should be reversed. Note that substitution requires the reverse order of what is required for evaluation. :return: Substituted symbolic expression """ s = [ (eid, expr[field] if field is not None else expr) for eid, expr in subs.items() ] if reverse: s.reverse() for substitution in s: # note that substitution may change free symbols, so we have to do # this recursively if substitution[0] in sym.free_symbols: sym = sym.subs(*substitution) return sym
[docs]def smart_subs(element: sp.Expr, old: sp.Symbol, new: sp.Expr) -> sp.Expr: """ Optimized substitution that checks whether anything needs to be done first :param element: substitution target :param old: to be substituted :param new: subsitution value :return: substituted expression """ if old in element.free_symbols: return element.subs(old, new) return element
[docs]def toposort_symbols(symbols: SymbolDef, field: Optional[str] = None) -> SymbolDef: """ Topologically sort symbol definitions according to their interdependency :param symbols: symbol definitions :param field: field of definition.values() that is used to compute interdependency :return: ordered symbol definitions """ sorted_symbols = toposort({ identifier: { s for s in ( definition[field] if field is not None else definition ).free_symbols if s in symbols } for identifier, definition in symbols.items() }) return { s: symbols[s] for symbol_group in sorted_symbols for s in symbol_group }
def _parse_special_functions(sym: sp.Expr, toplevel: bool = True) -> sp.Expr: """ Recursively checks the symbolic expression for functions which have be to parsed in a special way, such as piecewise functions :param sym: symbolic expressions :param toplevel: as this is called recursively, are we in the top level expression? """ args = tuple(arg if arg.__class__.__name__ == 'piecewise' and sym.__class__.__name__ == 'piecewise' else _parse_special_functions(arg, False) for arg in sym.args) fun_mappings = { 'times': sp.Mul, 'xor': sp.Xor, 'abs': sp.Abs, 'min': sp.Min, 'max': sp.Max, 'ceil': sp.functions.ceiling, 'floor': sp.functions.floor, 'factorial': sp.functions.factorial, 'arcsin': sp.functions.asin, 'arccos': sp.functions.acos, 'arctan': sp.functions.atan, 'arccot': sp.functions.acot, 'arcsec': sp.functions.asec, 'arccsc': sp.functions.acsc, 'arcsinh': sp.functions.asinh, 'arccosh': sp.functions.acosh, 'arctanh': sp.functions.atanh, 'arccoth': sp.functions.acoth, 'arcsech': sp.functions.asech, 'arccsch': sp.functions.acsch, } if sym.__class__.__name__ in fun_mappings: # c++ doesnt like mixing int and double for arguments of those # functions if sym.__class__.__name__ in ['min', 'max']: args = tuple(sp.Float(arg) if arg.is_number else arg for arg in args) return fun_mappings[sym.__class__.__name__](*args) elif sym.__class__.__name__ == 'piecewise' \ or isinstance(sym, sp.Piecewise): if isinstance(sym, sp.Piecewise): # this is sympy piecewise, can't be nested denested_args = args else: # this is sbml piecewise, can be nested denested_args = _denest_piecewise(args) return _parse_piecewise_to_heaviside(denested_args) if sym.__class__.__name__ == 'plus' and not sym.args: return sp.Float(0.0) if isinstance(sym, (sp.Function, sp.Mul, sp.Add, sp.Pow)): sym._args = args elif toplevel and isinstance(sym, BooleanAtom): # Replace boolean constants by numbers so they can be differentiated # must not replace in Piecewise function. Therefore, we only replace # it the complete expression consists only of a Boolean value. sym = sp.Float(int(bool(sym))) return sym def _denest_piecewise( args: Sequence[Union[sp.Expr, sp.logic.boolalg.Boolean, bool]] ) -> Tuple[Union[sp.Expr, sp.logic.boolalg.Boolean, bool]]: """ Denest piecewise functions that contain piecewise as condition :param args: Arguments to the piecewise function :return: Arguments where conditions no longer contain piecewise functions and the conditional dependency is flattened out """ args_out = [] for coeff, cond in grouper(args, 2, True): # handling of this case is explicitely disabled in # _parse_special_functions as keeping track of coeff/cond # arguments is tricky. Simpler to just parse them out here if coeff.__class__.__name__ == 'piecewise': coeff = _parse_special_functions(coeff, False) # we can have conditions that are piecewise function # returning True or False if cond.__class__.__name__ == 'piecewise': # this keeps track of conditional that the previous # piece was picked previous_was_picked = sp.false # recursively denest those first for sub_coeff, sub_cond in grouper( _denest_piecewise(cond.args), 2, True ): # flatten the individual pieces pick_this = sp.And( sp.Not(previous_was_picked), sub_cond ) if sub_coeff == sp.true: args_out.extend([coeff, pick_this]) previous_was_picked = pick_this else: args_out.extend([coeff, cond]) # cut off last condition as that's the default return tuple(args_out[:-1]) def _parse_piecewise_to_heaviside(args: Iterable[sp.Expr]) -> sp.Expr: """ Piecewise functions cannot be transformed into C++ right away, but AMICI has a special interface for Heaviside functions, so we transform them. :param args: symbolic expressions for arguments of the piecewise function """ # how many condition-expression pairs will we have? formula = sp.Float(0.0) not_condition = sp.Float(1.0) if all(isinstance(arg, ExprCondPair) for arg in args): # sympy piecewise grouped_args = args else: # smbl piecewise grouped_args = grouper(args, 2, True) for coeff, trigger in grouped_args: if isinstance(coeff, BooleanAtom): coeff = sp.Float(int(bool(coeff))) if trigger == sp.true: return formula + coeff * not_condition if trigger == sp.false: continue tmp = _parse_heaviside_trigger(trigger) formula += coeff * sp.simplify(not_condition * tmp) not_condition *= (1-tmp) return formula def _parse_heaviside_trigger(trigger: sp.Expr) -> sp.Expr: """ Recursively translates a boolean trigger function into a real valued root function :param trigger: :return: real valued root function expression """ if trigger.is_Relational: root = trigger.args[0] - trigger.args[1] _check_unsupported_functions(root, 'sympy.Expression') # normalize such that we always implement <, # this ensures that we can correctly evaluate the condition if # simulation starts at H(0). This is achieved by translating # conditionals into Heaviside functions H that is implemented as unit # step with H(0) = 1 if isinstance(trigger, sp.core.relational.StrictLessThan): # x < y => x - y < 0 => r < 0 return 1 - sp.Heaviside(root) if isinstance(trigger, sp.core.relational.LessThan): # x <= y => not(y < x) => not(y - x < 0) => not -r < 0 return sp.Heaviside(-root) if isinstance(trigger, sp.core.relational.StrictGreaterThan): # y > x => y - x < 0 => -r < 0 return 1 - sp.Heaviside(-root) if isinstance(trigger, sp.core.relational.GreaterThan): # y >= x => not(x < y) => not(x - y < 0) => not r < 0 return sp.Heaviside(root) # or(x,y) = not(and(not(x),not(y)) if isinstance(trigger, sp.Or): return 1-sp.Mul(*[1-_parse_heaviside_trigger(arg) for arg in trigger.args]) if isinstance(trigger, sp.And): return sp.Mul(*[_parse_heaviside_trigger(arg) for arg in trigger.args]) raise RuntimeError( 'AMICI can not parse piecewise/event trigger functions with argument ' f'{trigger}.' )
[docs]def grouper(iterable: Iterable, n: int, fillvalue: Any = None) -> Iterable[Tuple[Any]]: """ Collect data into fixed-length chunks or blocks grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx" :param iterable: any iterable :param n: chunk length :param fillvalue: padding for last chunk if length < n :return: itertools.zip_longest of requested chunks """ args = [iter(iterable)] * n return itt.zip_longest(*args, fillvalue=fillvalue)
def _check_unsupported_functions(sym: sp.Expr, expression_type: str, full_sym: Optional[sp.Expr] = None): """ Recursively checks the symbolic expression for unsupported symbolic functions :param sym: symbolic expressions :param expression_type: type of expression, only used when throwing errors :param full sym: outermost symbolic expression in recursive checks, only used for errors """ if full_sym is None: full_sym = sym # note that sp.functions.factorial, sp.functions.ceiling, # sp.functions.floor applied to numbers should be simplified out and # thus pass this test unsupported_functions = ( sp.functions.factorial, sp.functions.ceiling, sp.functions.floor, sp.functions.sec, sp.functions.csc, sp.functions.cot, sp.functions.asec, sp.functions.acsc, sp.functions.acot, sp.functions.acsch, sp.functions.acoth, sp.Mod, sp.core.function.UndefinedFunction ) if isinstance(sym.func, unsupported_functions) \ or isinstance(sym, unsupported_functions): raise RuntimeError(f'Encountered unsupported expression ' f'"{sym.func}" of type ' f'"{type(sym.func)}" as part of a ' f'{expression_type}: "{full_sym}"!') for arg in list(sym.args): _check_unsupported_functions(arg, expression_type)