"""
C++ Export
----------
This module provides all necessary functionality specify an ODE model and
generate executable C++ simulation code. The user generally won't have to
directly call any function from this module as this will be done by
:py:func:`amici.pysb_import.pysb2amici`,
:py:func:`amici.sbml_import.SbmlImporter.sbml2amici` and
:py:func:`amici.petab_import.import_model`
"""
import sympy as sp
import numpy as np
import re
import shutil
import subprocess
import sys
import os
import copy
import numbers
import logging
import itertools
import contextlib
try:
import pysb
except ImportError:
pysb = None
from typing import (
Callable, Optional, Union, List, Dict, Tuple, SupportsFloat, Sequence,
Set, Any
)
from string import Template
import sympy.printing.cxxcode as cxxcode
from sympy.matrices.immutable import ImmutableDenseMatrix
from sympy.matrices.dense import MutableDenseMatrix
from sympy.logic.boolalg import BooleanAtom
from itertools import chain
from . import (
amiciSwigPath, amiciSrcPath, amiciModulePath, __version__, __commit__,
sbml_import
)
from .logging import get_logger, log_execution_time, set_log_level
from .constants import SymbolId
# Template for model simulation main.cpp file
CXX_MAIN_TEMPLATE_FILE = os.path.join(amiciSrcPath, 'main.template.cpp')
# Template for model/swig/CMakeLists.txt
SWIG_CMAKE_TEMPLATE_FILE = os.path.join(amiciSwigPath,
'CMakeLists_model.cmake')
# Template for model/CMakeLists.txt
MODEL_CMAKE_TEMPLATE_FILE = os.path.join(amiciSrcPath,
'CMakeLists.template.cmake')
# prototype for generated C++ functions, keys are the names of functions
#
# signature: defines the argument part of the function signature,
# input variables
# should have a const flag
#
# assume_pow_positivity: identifies the functions on which
# assume_pow_positivity will have an effect when specified during model
# generation. generally these are functions that are used for solving the
# ODE, where negative values may negatively affect convergence of the
# integration algorithm
#
# sparse: specifies whether the result of this function will be stored in
# sparse format. sparse format means that the function will only return an
# array of nonzero values and not a full matrix.
functions = {
'Jy': {
'signature':
'(realtype *Jy, const int iy, const realtype *p, '
'const realtype *k, const realtype *y, const realtype *sigmay, '
'const realtype *my)',
},
'dJydsigmay': {
'signature':
'(realtype *dJydsigmay, const int iy, const realtype *p, '
'const realtype *k, const realtype *y, const realtype *sigmay, '
'const realtype *my)',
},
'dJydy': {
'signature':
'(realtype *dJydy, const int iy, const realtype *p, '
'const realtype *k, const realtype *y, '
'const realtype *sigmay, const realtype *my)',
'flags': ['sparse']
},
'dwdp': {
'signature':
'(realtype *dwdp, const realtype t, const realtype *x, '
'const realtype *p, const realtype *k, const realtype *h, '
'const realtype *w, const realtype *tcl, const realtype *dtcldp)',
'flags': ['assume_pow_positivity', 'sparse']
},
'dwdx': {
'signature':
'(realtype *dwdx, const realtype t, const realtype *x, '
'const realtype *p, const realtype *k, const realtype *h, '
'const realtype *w, const realtype *tcl)',
'flags': ['assume_pow_positivity', 'sparse']
},
'dwdw': {
'signature':
'(realtype *dwdw, const realtype t, const realtype *x, '
'const realtype *p, const realtype *k, const realtype *h, '
'const realtype *w, const realtype *tcl)',
'flags': ['assume_pow_positivity', 'sparse']
},
'dxdotdw': {
'signature':
'(realtype *dxdotdw, const realtype t, const realtype *x, '
'const realtype *p, const realtype *k, const realtype *h, '
'const realtype *w)',
'flags': ['assume_pow_positivity', 'sparse']
},
'dxdotdx_explicit': {
'signature':
'(realtype *dxdotdx_explicit, const realtype t, '
'const realtype *x, const realtype *p, const realtype *k, '
'const realtype *h, const realtype *w)',
'flags': ['assume_pow_positivity', 'sparse']
},
'dxdotdp_explicit': {
'signature':
'(realtype *dxdotdp_explicit, const realtype t, '
'const realtype *x, const realtype *p, const realtype *k, '
'const realtype *h, const realtype *w)',
'flags': ['assume_pow_positivity', 'sparse']
},
'dydx': {
'signature':
'(realtype *dydx, const realtype t, const realtype *x, '
'const realtype *p, const realtype *k, const realtype *h, '
'const realtype *w, const realtype *dwdx)',
},
'dydp': {
'signature':
'(realtype *dydp, const realtype t, const realtype *x, '
'const realtype *p, const realtype *k, const realtype *h, '
'const int ip, const realtype *w, const realtype *dtcldp)',
},
'dsigmaydp': {
'signature':
'(realtype *dsigmaydp, const realtype t, const realtype *p, '
'const realtype *k, const int ip)',
},
'sigmay': {
'signature':
'(realtype *sigmay, const realtype t, const realtype *p, '
'const realtype *k)',
},
'w': {
'signature':
'(realtype *w, const realtype t, const realtype *x, '
'const realtype *p, const realtype *k, '
'const realtype *h, const realtype *tcl)',
'flags': ['assume_pow_positivity']
},
'x0': {
'signature':
'(realtype *x0, const realtype t, const realtype *p, '
'const realtype *k)',
},
'x0_fixedParameters': {
'signature':
'(realtype *x0_fixedParameters, const realtype t, '
'const realtype *p, const realtype *k)',
},
'sx0': {
'signature':
'(realtype *sx0, const realtype t,const realtype *x, '
'const realtype *p, const realtype *k, const int ip)',
},
'sx0_fixedParameters': {
'signature':
'(realtype *sx0_fixedParameters, const realtype t, '
'const realtype *x0, const realtype *p, const realtype *k, '
'const int ip)',
},
'xdot': {
'signature':
'(realtype *xdot, const realtype t, const realtype *x, '
'const realtype *p, const realtype *k, const realtype *h, '
'const realtype *w)',
'flags': ['assume_pow_positivity']
},
'y': {
'signature':
'(realtype *y, const realtype t, const realtype *x, '
'const realtype *p, const realtype *k, '
'const realtype *h, const realtype *w)',
},
'x_rdata': {
'signature':
'(realtype *x_rdata, const realtype *x, const realtype *tcl)',
},
'total_cl': {
'signature':
'(realtype *total_cl, const realtype *x_rdata)',
},
'x_solver': {
'signature':
'(realtype *x_solver, const realtype *x_rdata)',
},
}
# list of sparse functions
sparse_functions = [
function for function in functions
if 'sparse' in functions[function].get('flags', [])
]
# list of nobody functions
nobody_functions = [
function for function in functions
if 'dont_generate_body' in functions[function].get('flags', [])
]
# list of sensitivity functions
sensi_functions = [
function for function in functions
if 'const int ip' in functions[function]['signature']
and function != 'sxdot'
]
# list of multiobs functions
multiobs_functions = [
function for function in functions
if 'const int iy' in functions[function]['signature']
]
# list of equations that have ids which may not be unique
non_unique_id_symbols = [
'x_rdata', 'y'
]
# custom c++ function replacements
CUSTOM_FUNCTIONS = [
{'sympy': 'polygamma',
'c++': 'boost::math::polygamma',
'include': '#include <boost/math/special_functions/polygamma.hpp>',
'build_hint': 'Using polygamma requires libboost-math header files.'
},
{'sympy': 'Heaviside',
'c++': 'amici::heaviside'},
{'sympy': 'DiracDelta',
'c++': 'amici::dirac'}
]
# python log manager
logger = get_logger(__name__, logging.ERROR)
[docs]def var_in_function_signature(name: str, varname: str) -> bool:
"""
Checks if the values for a symbolic variable is passed in the signature
of a function
:param name:
name of the function
:param varname:
name of the symbolic variable
:return:
boolean indicating whether the variable occurs in the function
signature
"""
return name in functions \
and re.search(
rf'const (realtype|double) \*{varname}[0]*[,)]+',
functions[name]['signature']
)
[docs]class ModelQuantity:
"""
Base class for model components
"""
[docs] def __init__(self,
identifier: sp.Symbol,
name: str,
value: Union[SupportsFloat, numbers.Number, sp.Expr]):
"""
Create a new ModelQuantity instance.
:param identifier:
unique identifier of the quantity
:param name:
individual name of the quantity (does not need to be unique)
:param value:
either formula, numeric value or initial value
"""
if not isinstance(identifier, sp.Symbol):
raise TypeError(f'identifier must be sympy.Symbol, was '
f'{type(identifier)}')
self._identifier: sp.Symbol = identifier
if not isinstance(name, str):
raise TypeError(f'name must be str, was {type(name)}')
self._name: str = name
self._value: sp.Expr = cast_to_sym(value, 'value')
def __repr__(self) -> str:
"""
Representation of the ModelQuantity object
:return:
string representation of the ModelQuantity
"""
return str(self._identifier)
[docs] def get_id(self) -> sp.Symbol:
"""
ModelQuantity identifier
:return:
identifier of the ModelQuantity
"""
return self._identifier
[docs] def get_name(self) -> str:
"""
ModelQuantity name
:return:
name of the ModelQuantity
"""
return self._name
[docs] def get_val(self) -> sp.Expr:
"""
ModelQuantity value
:return:
value of the ModelQuantity
"""
return self._value
[docs]class State(ModelQuantity):
"""
A State variable defines an entity that evolves with time according to
the provided time derivative, abbreviated by `x`
:ivar _conservation_law:
algebraic formula that allows computation of this
state according to a conservation law
:ivar _dt:
algebraic formula that defines the temporal derivative of this state
"""
_dt: Union[sp.Expr, None] = None
_conservation_law: Union[sp.Expr, None] = None
[docs] def __init__(self,
identifier: sp.Symbol,
name: str,
init: sp.Expr,
dt: sp.Expr):
"""
Create a new State instance. Extends :meth:`ModelQuantity.__init__`
by dt
:param identifier:
unique identifier of the state
:param name:
individual name of the state (does not need to be unique)
:param init:
initial value
:param dt:
time derivative
"""
super(State, self).__init__(identifier, name, init)
self._dt = cast_to_sym(dt, 'dt')
self._conservation_law = None
[docs] def set_conservation_law(self,
law: sp.Expr) -> None:
"""
Sets the conservation law of a state. If the a conservation law
is set, the respective state will be replaced by an algebraic
formula according to the respective conservation law.
:param law:
linear sum of states that if added to this state remain
constant over time
"""
if not isinstance(law, sp.Expr):
raise TypeError(f'conservation law must have type sympy.Expr, '
f'was {type(law)}')
self._conservation_law = law
[docs] def set_dt(self,
dt: sp.Expr) -> None:
"""
Sets the time derivative
:param dt:
time derivative
"""
self._dt = cast_to_sym(dt, 'dt')
[docs] def get_dt(self) -> sp.Expr:
"""
Gets the time derivative
:return:
time derivative
"""
return self._dt
[docs] def get_free_symbols(self) -> Set[sp.Symbol]:
"""
Gets the set of free symbols in time derivative and initial conditions
:return:
free symbols
"""
return self._dt.free_symbols.union(self._value.free_symbols)
[docs]class ConservationLaw(ModelQuantity):
"""
A conservation law defines the absolute the total amount of a
(weighted) sum of states
"""
[docs] def __init__(self,
identifier: sp.Symbol,
name: str,
value: sp.Expr):
"""
Create a new ConservationLaw instance.
:param identifier:
unique identifier of the ConservationLaw
:param name:
individual name of the ConservationLaw (does not need to be
unique)
:param value: formula (sum of states)
"""
super(ConservationLaw, self).__init__(identifier, name, value)
[docs]class Observable(ModelQuantity):
"""
An Observable links model simulations to experimental measurements,
abbreviated by `y`
:ivar _measurement_symbol:
sympy symbol used in the objective function to represent
measurements to this observable
"""
_measurement_symbol: Union[sp.Symbol, None] = None
[docs] def __init__(self,
identifier: sp.Symbol,
name: str,
value: sp.Expr,
measurement_symbol: Optional[sp.Symbol] = None):
"""
Create a new Observable instance.
:param identifier:
unique identifier of the Observable
:param name:
individual name of the Observable (does not need to be unique)
:param value:
formula
"""
super(Observable, self).__init__(identifier, name, value)
self._measurement_symbol = measurement_symbol
[docs] def get_measurement_symbol(self) -> sp.Symbol:
if self._measurement_symbol is None:
self._measurement_symbol = generate_measurement_symbol(
self.get_id()
)
return self._measurement_symbol
[docs]class SigmaY(ModelQuantity):
"""
A Standard Deviation SigmaY rescales the distance between simulations
and measurements when computing residuals or objective functions,
abbreviated by `sigmay`
"""
[docs] def __init__(self,
identifier: sp.Symbol,
name: str,
value: sp.Expr):
"""
Create a new Standard Deviation instance.
:param identifier:
unique identifier of the Standard Deviation
:param name:
individual name of the Standard Deviation (does not need to
be unique)
:param value:
formula
"""
super(SigmaY, self).__init__(identifier, name, value)
[docs]class Expression(ModelQuantity):
"""
An Expressions is a recurring elements in symbolic formulas. Specifying
this may yield more compact expression which may lead to substantially
shorter model compilation times, but may also reduce model simulation time,
abbreviated by `w`
"""
[docs] def __init__(self,
identifier: sp.Symbol,
name: str,
value: sp.Expr):
"""
Create a new Expression instance.
:param identifier:
unique identifier of the Expression
:param name:
individual name of the Expression (does not need to be unique)
:param value:
formula
"""
super(Expression, self).__init__(identifier, name, value)
[docs]class Parameter(ModelQuantity):
"""
A Parameter is a free variable in the model with respect to which
sensitivities may be computed, abbreviated by `p`
"""
[docs] def __init__(self,
identifier: sp.Symbol,
name: str,
value: numbers.Number):
"""
Create a new Expression instance.
:param identifier:
unique identifier of the Parameter
:param name:
individual name of the Parameter (does not need to be
unique)
:param value:
numeric value
"""
super(Parameter, self).__init__(identifier, name, value)
[docs]class Constant(ModelQuantity):
"""
A Constant is a fixed variable in the model with respect to which
sensitivities cannot be computed, abbreviated by `k`
"""
[docs] def __init__(self,
identifier: sp.Symbol,
name: str,
value: numbers.Number):
"""
Create a new Expression instance.
:param identifier:
unique identifier of the Constant
:param name:
individual name of the Constant (does not need to be unique)
:param value:
numeric value
"""
super(Constant, self).__init__(identifier, name, value)
[docs]class LogLikelihood(ModelQuantity):
"""
A LogLikelihood defines the distance between measurements and
experiments for a particular observable. The final LogLikelihood value
in the simulation will be the sum of all specified LogLikelihood
instances evaluated at all timepoints, abbreviated by `Jy`
"""
[docs] def __init__(self,
identifier: sp.Symbol,
name: str,
value: sp.Expr):
"""
Create a new Expression instance.
:param identifier:
unique identifier of the LogLikelihood
:param name:
individual name of the LogLikelihood (does not need to be
unique)
:param value:
formula
"""
super(LogLikelihood, self).__init__(identifier, name, value)
# defines the type of some attributes in ODEModel
symbol_to_type = {
SymbolId.SPECIES: State,
SymbolId.PARAMETER: Parameter,
SymbolId.FIXED_PARAMETER: Constant,
SymbolId.OBSERVABLE: Observable,
SymbolId.SIGMAY: SigmaY,
SymbolId.LLHY: LogLikelihood,
SymbolId.EXPRESSION: Expression,
}
[docs]@log_execution_time('running smart_jacobian', logger)
def smart_jacobian(eq: sp.MutableDenseMatrix,
sym_var: sp.MutableDenseMatrix) -> sp.MutableDenseMatrix:
"""
Wrapper around symbolic jacobian with some additional checks that reduce
computation time for large matrices
:param eq:
equation
:param sym_var:
differentiation variable
:return:
jacobian of eq wrt sym_var
"""
if min(eq.shape) and min(sym_var.shape) \
and not smart_is_zero_matrix(eq) \
and not smart_is_zero_matrix(sym_var) \
and not sym_var.free_symbols.isdisjoint(eq.free_symbols):
return eq.jacobian(sym_var)
return sp.zeros(eq.shape[0], sym_var.shape[0])
[docs]@log_execution_time('running smart_multiply', logger)
def smart_multiply(x: Union[sp.MutableDenseMatrix, sp.MutableSparseMatrix],
y: sp.MutableDenseMatrix
) -> Union[sp.MutableDenseMatrix, sp.MutableSparseMatrix]:
"""
Wrapper around symbolic multiplication with some additional checks that
reduce computation time for large matrices
:param x:
educt 1
:param y:
educt 2
:return:
product
"""
if not x.shape[0] or not y.shape[1] or smart_is_zero_matrix(x) or \
smart_is_zero_matrix(y):
return sp.zeros(x.shape[0], y.shape[1])
return x.multiply(y)
[docs]def smart_is_zero_matrix(x: Union[sp.MutableDenseMatrix,
sp.MutableSparseMatrix]) -> bool:
"""A faster implementation of sympy's is_zero_matrix
Avoids repeated indexer type checks and double iteration to distinguish
False/None. Found to be about 100x faster for large matrices.
:param x: Matrix to check
"""
if isinstance(x, sp.MutableDenseMatrix):
nonzero = any(xx.is_zero is not True for xx in x._mat)
else:
nonzero = x.nnz() > 0
return not nonzero
[docs]class ODEModel:
"""
Defines an Ordinary Differential Equation as set of ModelQuantities.
This class provides general purpose interfaces to ompute arbitrary
symbolic derivatives that are necessary for model simulation or
sensitivity computation
:ivar _states:
list of state variables
:ivar _observables:
list of observables
:ivar _sigmays:
list of sigmays
:ivar _parameters:
list of parameters
:ivar _loglikelihoods:
list of loglikelihoods
:ivar _expressions:
list of expressions instances
:ivar _conservationlaws:
list of conservation laws
:ivar _symboldim_funs:
define functions that compute model dimensions, these
are functions as the underlying symbolic expressions have not been
populated at compile time
:ivar _eqs:
carries symbolic formulas of the symbolic variables of the model
:ivar _sparseeqs:
carries linear list of all symbolic formulas for sparsified
variables
:ivar _vals:
carries numeric values of symbolic identifiers of the symbolic
variables of the model
:ivar _names:
carries names of symbolic identifiers of the symbolic variables
of the model
:ivar _syms:
carries symbolic identifiers of the symbolic variables of the
model
:ivar _strippedsyms:
carries symbolic identifiers that were stripped of additional class
information
:ivar _sparsesyms:
carries linear list of all symbolic identifiers for sparsified
variables
:ivar _colptrs:
carries column pointers for sparsified variables. See
SUNMatrixContent_Sparse definition in <sunmatrix/sunmatrix_sparse.h>
:ivar _rowvals:
carries row values for sparsified variables. See
SUNMatrixContent_Sparse definition in <sunmatrix/sunmatrix_sparse.h>
:ivar _equation_prototype:
defines the attribute from which an equation should be generated via
list comprehension (see :meth:`ODEModel._generate_equation`)
:ivar _variable_prototype:
defines the attribute from which a variable should be generated via
list comprehension (see :meth:`ODEModel._generate_symbol`)
:ivar _value_prototype:
defines the attribute from which a value should be generated via
list comprehension (see :meth:`ODEModel._generate_value`)
:ivar _total_derivative_prototypes:
defines how a total derivative equation is computed for an equation,
key defines the name and values should be arguments for
ODEModel.totalDerivative()
:ivar _lock_total_derivative:
add chainvariables to this set when computing total derivative from
a partial derivative call to enforce a partial derivative in the
next recursion. prevents infinite recursion
:ivar _simplify:
If not None, this function will be used to simplify symbolic
derivative expressions. Receives sympy expressions as only argument.
To apply multiple simplifications, wrap them in a lambda expression.
:ivar _x0_fixedParameters_idx:
Index list of subset of states for which x0_fixedParameters was
computed
:ivar _w_recursion_depth:
recursion depth in w, quantified as nilpotency of dwdw
:ivar _has_quadratic_nllh:
whether all observables have a gaussian noise model, i.e. whether
res and FIM make sense.
"""
[docs] def __init__(self, verbose: Optional[Union[bool, int]] = False,
simplify: Optional[Callable] = sp.powsimp):
"""
Create a new ODEModel instance.
:param verbose:
verbosity level for logging, True/False default to
``logging.DEBUG``/``logging.ERROR``
:param simplify:
see :meth:`ODEModel._simplify`
"""
self._states: List[State] = []
self._observables: List[Observable] = []
self._sigmays: List[SigmaY] = []
self._parameters: List[Parameter] = []
self._constants: List[Constant] = []
self._loglikelihoods: List[LogLikelihood] = []
self._expressions: List[Expression] = []
self._conservationlaws: List[ConservationLaw] = []
self._symboldim_funs: Dict[str, Callable[[], int]] = {
'sx': self.num_states_solver,
'v': self.num_states_solver,
'vB': self.num_states_solver,
'xB': self.num_states_solver,
'sigmay': self.num_obs,
}
self._eqs: Dict[str, sp.Matrix] = dict()
self._sparseeqs: Dict[str, Union[sp.Matrix, List[sp.Matrix]]] = dict()
self._vals: Dict[str, List[float]] = dict()
self._names: Dict[str, List[str]] = dict()
self._syms: Dict[str, Union[sp.Matrix, List[sp.Matrix]]] = dict()
self._strippedsyms: Dict[str, sp.Matrix] = dict()
self._sparsesyms: Dict[str, Union[List[str], List[List[str]]]] = dict()
self._colptrs: Dict[str, Union[List[int], List[List[int]]]] = dict()
self._rowvals: Dict[str, Union[List[int], List[List[int]]]] = dict()
self._equation_prototype: Dict[str, str] = {
'total_cl': '_conservationlaws',
'x0': '_states',
'y': '_observables',
'Jy': '_loglikelihoods',
'w': '_expressions',
'sigmay': '_sigmays',
}
self._variable_prototype: Dict[str, str] = {
'tcl': '_conservationlaws',
'x_rdata': '_states',
'y': '_observables',
'p': '_parameters',
'k': '_constants',
'w': '_expressions',
'sigmay': '_sigmays'
}
self._value_prototype: Dict[str, str] = {
'p': '_parameters',
'k': '_constants',
}
self._total_derivative_prototypes: \
Dict[str, Dict[str, Union[str, List[str]]]] = {
'sx_rdata': {
'eq': 'x_rdata',
'chainvars': ['x'],
'var': 'p',
'dxdz_name': 'sx',
},
}
self._lock_total_derivative: List[str] = list()
self._simplify: Callable = simplify
self._x0_fixedParameters_idx: Union[None, Sequence[int]]
self._w_recursion_depth: int = 0
self._has_quadratic_nllh: bool = True
set_log_level(logger, verbose)
[docs] @log_execution_time('importing SbmlImporter', logger)
def import_from_sbml_importer(self,
si: 'sbml_import.SbmlImporter',
compute_cls: Optional[bool] = True) -> None:
"""
Imports a model specification from a
:class:`amici.sbml_import.SbmlImporter`
instance.
:param si:
imported SBML model
"""
# get symbolic expression from SBML importers
symbols = copy.copy(si.symbols)
nexpr = len(symbols[SymbolId.EXPRESSION])
# assemble fluxes and add them as expressions to the model
fluxes = []
for ir, flux in enumerate(si.flux_vector):
flux_id = generate_flux_symbol(ir)
fluxes.append(flux_id)
nr = len(fluxes)
# correct time derivatives for compartment changes
dxdotdw_updates = []
def transform_dxdt_to_concentration(specie_id, dxdt):
"""
Produces the appropriate expression for the first derivative of a
species with respect to time, for species that reside in
compartments with a constant volume, or a volume that is defined by
an assignment or rate rule.
:param specie_id:
The identifier of the species (generated in "sbml_import.py").
:param dxdt:
The element-wise product of the row in the stoichiometric
matrix that corresponds to the species (row x_index) and the
flux (kinetic laws) vector. Ignored in the case of rate rules.
"""
# The derivation of the below return expressions can be found in
# the documentation. They are found by rearranging
# $\frac{d}{dt} (vx) = Sw$ for $\frac{dx}{dt}$, where $v$ is the
# vector of species compartment volumes, $x$ is the vector of
# species concentrations, $S$ is the stoichiometric matrix, and $w$
# is the flux vector. The conditional below handles the cases of
# species in (i) compartments with a rate rule, (ii) compartments
# with an assignment rule, and (iii) compartments with a constant
# volume, respectively.
specie = si.symbols[SymbolId.SPECIES][specie_id]
comp = specie['compartment']
x_index = specie['index']
if comp in si.symbols[SymbolId.SPECIES]:
dv_dt = si.symbols[SymbolId.SPECIES][comp]['dt']
xdot = (dxdt - dv_dt * specie_id) / comp
dxdotdw_updates.extend(
(x_index, w_index, xdot.diff(r_flux))
for w_index, r_flux in enumerate(fluxes)
)
return xdot
elif comp in si.compartment_assignment_rules:
v = si.compartment_assignment_rules[comp]
# we need to flatten out assignments in the compartment in
# order to ensure that we catch all species dependencies
v = smart_subs_dict(v, si.symbols[SymbolId.EXPRESSION],
'value')
dv_dt = v.diff(si.amici_time_symbol)
# we may end up with a time derivative of the compartment
# volume due to parameter rate rules
comp_rate_vars = [p for p in v.free_symbols
if p in si.symbols[SymbolId.SPECIES]]
for var in comp_rate_vars:
dv_dt += \
v.diff(var) * si.symbols[SymbolId.SPECIES][var]['dt']
dv_dx = v.diff(specie_id)
xdot = (dxdt - dv_dt * specie_id) / (dv_dx * specie_id + v)
dxdotdw_updates.extend(
(x_index, w_index, xdot.diff(r_flux))
for w_index, r_flux in enumerate(fluxes)
)
return xdot
else:
v = si.compartments[comp]
if v == 1.0:
return dxdt
dxdotdw_updates.extend(
(x_index, w_index,
si.stoichiometric_matrix[x_index, w_index] / v)
for w_index in range(si.stoichiometric_matrix.shape[1])
if si.stoichiometric_matrix[x_index, w_index] != 0
)
return dxdt / v
# create dynamics without respecting conservation laws first
dxdt = smart_multiply(si.stoichiometric_matrix,
MutableDenseMatrix(fluxes))
for ix, ((specie_id, specie), formula) in enumerate(zip(
symbols[SymbolId.SPECIES].items(),
dxdt
)):
assert ix == specie['index'] # check that no reordering occurred
# rate rules and amount species don't need to be updated
if 'dt' in specie:
continue
if specie['amount']:
specie['dt'] = formula
else:
specie['dt'] = transform_dxdt_to_concentration(specie_id,
formula)
# create all basic components of the ODE model and add them.
for symbol_name in symbols:
# transform dict of lists into a list of dicts
args = ['name', 'identifier']
if symbol_name == SymbolId.SPECIES:
args += ['dt', 'init']
else:
args += ['value']
protos = [
{
'identifier': var_id,
**{k: v for k, v in var.items() if k in args}
}
for var_id, var in symbols[symbol_name].items()
]
for proto in protos:
self.add_component(symbol_to_type[symbol_name](**proto))
# add fluxes as expressions, this needs to happen after base
# expressions from symbols have been parsed
for flux_id, flux in zip(fluxes, si.flux_vector):
self.add_component(Expression(
identifier=flux_id,
name=str(flux_id),
value=flux
))
# process conservation laws
if compute_cls:
dxdotdw_updates = si.process_conservation_laws(self,
dxdotdw_updates)
nx_solver = si.stoichiometric_matrix.shape[0]
nw = len(self._expressions)
ncl = nw - nr - nexpr
# set derivatives of xdot, if applicable. We do this as we can save
# a substantial amount of computations by exploiting the structure
# of the right hand side.
# the tricky part is that the expressions w do not only contain the
# flux entries, but also assignment rules and conservation laws.
# assignment rules are added before the fluxes and
# _process_conservation_laws is called after the fluxes,
# but conservation law expressions are inserted at the beginning
# of the self.eq['w']. Accordingly we concatenate a zero matrix (for
# rule assignments and conservation laws) with the stoichiometric
# matrix and then apply the necessary updates from
# transform_dxdt_to_concentration
if not any(s in [e.get_id() for e in self._expressions]
for s in si.stoichiometric_matrix.free_symbols):
self._eqs['dxdotdw'] = sp.zeros(nx_solver, ncl + nexpr).row_join(
si.stoichiometric_matrix
)
for ix, iw, val in dxdotdw_updates:
# offset update according to concatenated zero matrix
self._eqs['dxdotdw'][ix, ncl + nexpr + iw] = val
# fill in 'self._sym' based on prototypes and components in ode_model
self.generate_basic_variables(from_sbml=True)
self._has_quadratic_nllh = all(
llh['dist'] in ['normal', 'lin-normal']
for llh in si.symbols[SymbolId.LLHY].values()
)
[docs] def add_component(self, component: ModelQuantity,
insert_first: Optional[bool] = False) -> None:
"""
Adds a new ModelQuantity to the model.
:param component:
model quantity to be added
:param insert_first:
whether to add quantity first or last, relevant when components
may refer to other components of the same type.
"""
for comp_type in [Observable, Expression, Parameter, Constant, State,
LogLikelihood, SigmaY, ConservationLaw]:
if isinstance(component, comp_type):
component_list = getattr(
self, f'_{type(component).__name__.lower()}s'
)
if insert_first:
component_list.insert(0, component)
else:
component_list.append(component)
return
raise ValueError(f'Invalid component type {type(component)}')
[docs] def add_conservation_law(self,
state: sp.Symbol,
total_abundance: sp.Symbol,
state_expr: sp.Expr,
abundance_expr: sp.Expr) -> None:
"""
Adds a new conservation law to the model. A conservation law is defined
by the conserved quantity T = sum_i(a_i * x_i), where a_i are
coefficients and x_i are different state variables.
:param state:
symbolic identifier of the state that should be replaced by
the conservation law (x_j)
:param total_abundance:
symbolic identifier of the total abundance (T/a_j)
:param state_expr:
symbolic algebraic formula that replaces the the state. This is
used to compute the numeric value of of `state` during simulations.
x_j = T/a_j - sum_i≠j(a_i * x_i)/a_j
:param abundance_expr:
symbolic algebraic formula that computes the value of the
conserved quantity. This is used to update the numeric value for
`total_abundance` after (re-)initialization.
T/a_j = sum_i≠j(a_i * x_i)/a_j + x_j
"""
try:
ix = [
s.get_id()
for s in self._states
].index(state)
except ValueError:
raise ValueError(f'Specified state {state} was not found in the '
f'model states.')
state_id = self._states[ix].get_id()
self.add_component(
Expression(state_id, str(state_id), state_expr),
insert_first=True
)
self.add_component(
ConservationLaw(
total_abundance,
f'total_{state_id}',
abundance_expr
)
)
self._states[ix].set_conservation_law(state_expr)
[docs] def num_states_rdata(self) -> int:
"""
Number of states.
:return:
number of state variable symbols
"""
return len(self.sym('x_rdata'))
[docs] def num_states_solver(self) -> int:
"""
Number of states after applying conservation laws.
:return:
number of state variable symbols
"""
return len(self.sym('x'))
[docs] def num_cons_law(self) -> int:
"""
Number of conservation laws.
:return:
number of conservation laws
"""
return self.num_states_rdata() - self.num_states_solver()
[docs] def num_state_reinits(self) -> int:
"""
Number of solver states which would be reinitialized after
preequilibration
:return:
number of state variable symbols with reinitialization
"""
reinit_states = self.eq('x0_fixedParameters')
solver_states = self.eq('x_solver')
return sum([1 for ix in reinit_states if ix in solver_states])
[docs] def num_obs(self) -> int:
"""
Number of Observables.
:return:
number of observable symbols
"""
return len(self.sym('y'))
[docs] def num_const(self) -> int:
"""
Number of Constants.
:return:
number of constant symbols
"""
return len(self.sym('k'))
[docs] def num_par(self) -> int:
"""
Number of Parameters.
:return:
number of parameter symbols
"""
return len(self.sym('p'))
[docs] def num_expr(self) -> int:
"""
Number of Expressions.
:return:
number of expression symbols
"""
return len(self.sym('w'))
[docs] def sym(self,
name: str,
stripped: Optional[bool] = False) -> sp.Matrix:
"""
Returns (and constructs if necessary) the identifiers for a symbolic
entity.
:param name:
name of the symbolic variable
:param stripped:
should additional class information be stripped from the
symbolic variables (default=False)
:return:
matrix of symbolic identifiers
"""
if name not in self._syms:
self._generate_symbol(name)
if stripped and name in self._variable_prototype:
return self._strippedsyms[name]
else:
return self._syms[name]
[docs] def sparsesym(self, name: str) -> List[str]:
"""
Returns (and constructs if necessary) the sparsified identifiers for
a sparsified symbolic variable.
:param name:
name of the symbolic variable
:return:
linearized Matrix containing the symbolic identifiers
"""
if name not in sparse_functions:
raise ValueError(f'{name} is not marked as sparse')
if name not in self._sparsesyms:
self._generate_sparse_symbol(name)
return self._sparsesyms[name]
[docs] def eq(self, name: str) -> sp.Matrix:
"""
Returns (and constructs if necessary) the formulas for a symbolic
entity.
:param name:
name of the symbolic variable
:return:
matrix of symbolic formulas
"""
if name not in self._eqs:
dec = log_execution_time(f'computing {name}', logger)
dec(self._compute_equation)(name)
return self._eqs[name]
[docs] def sparseeq(self, name) -> sp.Matrix:
"""
Returns (and constructs if necessary) the sparsified formulas for a
sparsified symbolic variable.
:param name:
name of the symbolic variable
:return:
linearized matrix containing the symbolic formulas
"""
if name not in sparse_functions:
raise ValueError(f'{name} is not marked as sparse')
if name not in self._sparseeqs:
self._generate_sparse_symbol(name)
return self._sparseeqs[name]
[docs] def colptrs(self, name: str) -> Union[List[sp.Number],
List[List[sp.Number]]]:
"""
Returns (and constructs if necessary) the column pointers for
a sparsified symbolic variable.
:param name:
name of the symbolic variable
:return:
list containing the column pointers
"""
if name not in sparse_functions:
raise ValueError(f'{name} is not marked as sparse')
if name not in self._sparseeqs:
self._generate_sparse_symbol(name)
return self._colptrs[name]
[docs] def rowvals(self, name: str) -> Union[List[sp.Number],
List[List[sp.Number]]]:
"""
Returns (and constructs if necessary) the row values for a
sparsified symbolic variable.
:param name:
name of the symbolic variable
:return:
list containing the row values
"""
if name not in sparse_functions:
raise ValueError(f'{name} is not marked as sparse')
if name not in self._sparseeqs:
self._generate_sparse_symbol(name)
return self._rowvals[name]
[docs] def val(self, name: str) -> List[float]:
"""
Returns (and constructs if necessary) the numeric values of a
symbolic entity
:param name:
name of the symbolic variable
:return:
list containing the numeric values
"""
if name not in self._vals:
self._generate_value(name)
return self._vals[name]
[docs] def name(self, name: str) -> List[str]:
"""
Returns (and constructs if necessary) the names of a symbolic
variable
:param name:
name of the symbolic variable
:return:
list of names
"""
if name not in self._names:
self._generate_name(name)
return self._names[name]
[docs] def free_symbols(self) -> Set[sp.Basic]:
"""
Returns list of free symbols that appear in ODE rhs and initial
conditions.
"""
return set(chain.from_iterable(
state.get_free_symbols()
for state in self._states
))
def _generate_symbol(self, name: str, *, from_sbml: bool = False) -> None:
"""
Generates the symbolic identifiers for a symbolic variable
:param name:
name of the symbolic variable
"""
if name in self._variable_prototype:
component = self._variable_prototype[name]
self._syms[name] = sp.Matrix([
comp.get_id()
for comp in getattr(self, component)
])
# this gives us access to the "stripped" symbols that were
# generated by pysb (if compiling a pysb model). To ensure
# correctness of derivatives, the same assumptions as in pysb
# have to be used (currently no assumptions)
# NB if we are compiling a SBML model,
# then it will be the same as the "non-stripped"
# in order to preserve assumptions
if from_sbml:
self._strippedsyms[name] = self._syms[name]
else:
self._strippedsyms[name] = sp.Matrix([
sp.Symbol(comp.get_name())
for comp in getattr(self, component)
])
if name == 'y':
self._syms['my'] = sp.Matrix([
comp.get_measurement_symbol()
for comp in getattr(self, component)
])
return
elif name == 'x':
self._syms[name] = sp.Matrix([
state.get_id()
for state in self._states
if state._conservation_law is None
])
return
elif name == 'sx0':
self._syms[name] = sp.Matrix([
f's{state.get_id()}_0'
for state in self._states
if state._conservation_law is None
])
return
elif name == 'dtcldp':
# check, whether the CL consists of only one state. Then,
# sensitivities drop out, otherwise generate symbols
self._syms[name] = sp.Matrix([
[sp.Symbol(f's{strip_pysb(tcl.get_id())}__'
f'{strip_pysb(par.get_id())}', real=True)
for par in self._parameters]
if self.conservation_law_has_multispecies(tcl)
else [0] * self.num_par()
for tcl in self._conservationlaws
])
return
elif name in sparse_functions:
self._generate_sparse_symbol(name)
return
elif name in self._symboldim_funs:
length = self._symboldim_funs[name]()
elif name in sensi_functions:
length = self.eq(name).shape[0]
else:
length = len(self.eq(name))
self._syms[name] = sp.Matrix([
sp.Symbol(f'{name}{i}', real=True) for i in range(length)
])
[docs] def generate_basic_variables(self, *, from_sbml: bool = False) -> None:
"""
Generates the symbolic identifiers for all variables in
ODEModel.variable_prototype
"""
for var in self._variable_prototype:
if var not in self._syms:
self._generate_symbol(var, from_sbml=from_sbml)
self._generate_symbol('x', from_sbml=from_sbml)
[docs] def get_appearance_counts(self, idxs: List[int]) -> List[int]:
"""
Counts how often a state appears in the time derivative of
another state and expressions for a subset of states
:param idxs:
list of state indices for which counts are to be computed
:return:
list of counts for the states ordered according to the provided
indices
"""
free_symbols_dt = list(itertools.chain.from_iterable(
[
str(symbol)
for symbol in state.get_dt().free_symbols
]
for state in self._states
))
free_symbols_expr = list(itertools.chain.from_iterable(
[
str(symbol)
for symbol in expr.get_val().free_symbols
]
for expr in self._expressions
))
return [
free_symbols_dt.count(str(self._states[idx].get_id()))
+
free_symbols_expr.count(str(self._states[idx].get_id()))
for idx in idxs
]
def _generate_sparse_symbol(self, name: str) -> None:
"""
Generates the sparse symbolic identifiers, symbolic identifiers,
sparse equations, column pointers and row values for a symbolic
variable
:param name:
name of the symbolic variable
"""
matrix = self.eq(name)
match_deriv = re.match(r'd([\w]+)d([a-z]+)', name)
if match_deriv:
rownames = self.sym(match_deriv.group(1))
colnames = self.sym(match_deriv.group(2))
if name == 'dJydy':
# One entry per y-slice
self._colptrs[name] = []
self._rowvals[name] = []
self._sparseeqs[name] = []
self._sparsesyms[name] = []
self._syms[name] = []
for iy in range(self.num_obs()):
symbol_col_ptrs, symbol_row_vals, sparse_list, symbol_list, \
sparse_matrix = csc_matrix(matrix[iy, :],
rownames=rownames,
colnames=colnames,
identifier=iy)
self._colptrs[name].append(symbol_col_ptrs)
self._rowvals[name].append(symbol_row_vals)
self._sparseeqs[name].append(sparse_list)
self._sparsesyms[name].append(symbol_list)
self._syms[name].append(sparse_matrix)
else:
symbol_col_ptrs, symbol_row_vals, sparse_list, symbol_list, \
sparse_matrix = csc_matrix(
matrix, rownames=rownames, colnames=colnames,
pattern_only=name in nobody_functions
)
self._colptrs[name] = symbol_col_ptrs
self._rowvals[name] = symbol_row_vals
self._sparseeqs[name] = sparse_list
self._sparsesyms[name] = symbol_list
self._syms[name] = sparse_matrix
def _compute_equation(self, name: str) -> None:
"""
computes the symbolic formula for a symbolic variable
:param name:
name of the symbolic variable
"""
match_deriv = re.match(r'd([\w_]+)d([a-z_]+)', name)
if name in self._equation_prototype:
self._equation_from_component(name, self._equation_prototype[name])
elif name in self._total_derivative_prototypes:
args = self._total_derivative_prototypes[name]
args['name'] = name
self._lock_total_derivative += args['chainvars']
self._total_derivative(**args)
for cv in args['chainvars']:
self._lock_total_derivative.remove(cv)
elif name == 'xdot':
self._eqs[name] = sp.Matrix([
s.get_dt() for s in self._states
if s._conservation_law is None
])
elif name == 'x_rdata':
self._eqs[name] = sp.Matrix([
state.get_id()
if state._conservation_law is None
else state._conservation_law
for state in self._states
])
elif name == 'x_solver':
self._eqs[name] = sp.Matrix([
state.get_id()
for state in self._states
if state._conservation_law is None
])
elif name == 'sx_solver':
self._eqs[name] = sp.Matrix([
self.sym('sx_rdata')[ix]
for ix, state in enumerate(self._states)
if state._conservation_law is None
])
elif name == 'sx0':
self._derivative(name[1:], 'p', name=name)
elif name == 'sx0_fixedParameters':
# deltax = -x+x0_fixedParameters if x0_fixedParameters>0 else 0
# deltasx = -sx+dx0_fixed_parametersdx*sx+dx0_fixedParametersdp
# if x0_fixedParameters>0 else 0
# sx0_fixedParameters = sx+deltasx =
# dx0_fixed_parametersdx*sx+dx0_fixedParametersdp
self._eqs[name] = smart_jacobian(
self.eq('x0_fixedParameters'), self.sym('p')
)
dx0_fixed_parametersdx = smart_jacobian(
self.eq('x0_fixedParameters'), self.sym('x')
)
if not smart_is_zero_matrix(dx0_fixed_parametersdx):
if isinstance(self._eqs[name], ImmutableDenseMatrix):
self._eqs[name] = MutableDenseMatrix(self._eqs[name])
for ip in range(self._eqs[name].shape[1]):
self._eqs[name][:, ip] += smart_multiply(
dx0_fixed_parametersdx, self.sym('sx0')
)
elif name == 'x0_fixedParameters':
k = self.sym('k')
self._x0_fixedParameters_idx = [
ix
for ix, eq in enumerate(self.eq('x0'))
if any([sym in eq.free_symbols for sym in k])
]
eq = self.eq('x0')
self._eqs[name] = sp.Matrix([eq[ix] for ix in
self._x0_fixedParameters_idx])
elif name == 'dtotal_cldx_rdata':
# not correctly parsed in regex
self._derivative('total_cl', 'x_rdata')
elif name == 'dtcldx':
# this is always zero
self._eqs[name] = sp.zeros(self.num_cons_law(), self.num_states_solver())
elif name == 'dtcldp':
# force symbols
self._eqs[name] = self.sym(name)
elif name == 'dxdotdx_explicit':
# force symbols
self._derivative('xdot', 'x', name=name)
elif name == 'dxdotdp_explicit':
# force symbols
self._derivative('xdot', 'p', name=name)
elif match_deriv:
self._derivative(match_deriv.group(1), match_deriv.group(2))
else:
raise ValueError(f'Unknown equation {name}')
if name in ['Jy', 'dydx']:
# do not transpose if we compute the partial derivative as part of
# a total derivative
if not len(self._lock_total_derivative):
self._eqs[name] = self._eqs[name].transpose()
if self._simplify:
dec = log_execution_time(f'simplifying {name}', logger)
self._eqs[name] = dec(self._eqs[name].applyfunc)(self._simplify)
[docs] def sym_names(self) -> List[str]:
"""
Returns a list of names of generated symbolic variables
:return:
list of names
"""
return list(self._syms.keys())
def _derivative(self, eq: str, var: str, name: str = None) -> None:
"""
Creates a new symbolic variable according to a derivative
:param eq:
name of the symbolic variable that defines the formula
:param var:
name of the symbolic variable that defines the identifiers
with respect to which the derivatives are to be computed
:param name:
name of resulting symbolic variable, default is d{eq}d{var}
"""
if not name:
name = f'd{eq}d{var}'
# automatically detect chainrule
chainvars = []
ignore_chainrule = {
('xdot', 'p'): 'w', # has generic implementation in c++ code
('xdot', 'x'): 'w', # has generic implementation in c++ code
('w', 'w'): 'tcl', # dtcldw = 0
('w', 'x'): 'tcl', # dtcldx = 0
}
for cv in ['w', 'tcl']:
if var_in_function_signature(eq, cv) \
and cv not in self._lock_total_derivative \
and var is not cv \
and min(self.sym(cv).shape) \
and (
(eq, var) not in ignore_chainrule
or ignore_chainrule[(eq, var)] != cv
):
chainvars.append(cv)
if len(chainvars):
self._lock_total_derivative += chainvars
self._total_derivative(name, eq, chainvars, var)
for cv in chainvars:
self._lock_total_derivative.remove(cv)
return
# this is the basic requirement check
needs_stripped_symbols = eq == 'xdot' and var != 'x'
# partial derivative
if eq == 'Jy':
sym_eq = self.eq(eq).transpose()
else:
sym_eq = self.eq(eq)
if pysb is not None and needs_stripped_symbols:
needs_stripped_symbols = not any(
isinstance(sym, pysb.Component)
for sym in sym_eq.free_symbols
)
# now check whether we are working with energy_modeling branch
# where pysb class info is already stripped
# TODO: fixme as soon as energy_modeling made it to the main pysb
# branch
sym_var = self.sym(var, needs_stripped_symbols)
derivative = smart_jacobian(sym_eq, sym_var)
self._eqs[name] = derivative
# compute recursion depth based on nilpotency of jacobian. computing
# nilpotency can be done more efficiently on numerical sparsity pattern
if name == 'dwdw':
nonzeros = np.asarray(
derivative.applyfunc(lambda x: int(not x.is_zero))
).astype(np.int64)
if max(nonzeros.shape):
while nonzeros.max():
nonzeros = nonzeros.dot(nonzeros)
self._w_recursion_depth += 1
if self._w_recursion_depth > len(sym_eq):
raise RuntimeError(
'dwdw is not nilpotent. Something, somewhere went '
'terribly wrong. Please file a bug report at '
'https://github.com/AMICI-dev/AMICI/issues and '
'attach this model.'
)
if name == 'dydw' and not smart_is_zero_matrix(derivative):
dwdw = self.eq('dwdw')
# h(k) = d{eq}dw*dwdw^k* (k=1)
h = smart_multiply(derivative, dwdw)
while not smart_is_zero_matrix(h):
self._eqs[name] += h
# h(k+1) = d{eq}dw*dwdw^(k+1) = h(k)*dwdw
h = smart_multiply(h, dwdw)
def _total_derivative(self, name: str, eq: str, chainvars: List[str],
var: str, dydx_name: str = None,
dxdz_name: str = None) -> None:
"""
Creates a new symbolic variable according to a total derivative
using the chain rule
:param name:
name of resulting symbolic variable
:param eq:
name of the symbolic variable that defines the formula
:param chainvars:
names of the symbolic variable that define the
identifiers with respect to which the chain rules are applied
:param var:
name of the symbolic variable that defines the identifiers
whith respect to which the derivatives are to be computed
:param dydx_name:
defines the name of the symbolic variable that
defines the derivative of the `eq` with respect to `chainvar`,
default is d{eq}d{chainvar}
:param dxdz_name:
defines the name of the symbolic variable that
defines the derivative of the `chainvar` with respect to `var`,
default is d{chainvar}d{var}
"""
# compute total derivative according to chainrule
# Dydz = dydx*dxdz + dydz
# initialize with partial derivative dydz without chain rule
self._eqs[name] = self.sym_or_eq(name, f'd{eq}d{var}')
if not isinstance(self._eqs[name], sp.Symbol):
# if not a Symbol, create a copy using sympy API
# NB deepcopy does not work safely, see sympy issue #7672
self._eqs[name] = self._eqs[name].copy()
for chainvar in chainvars:
if dydx_name is None:
dydx_name = f'd{eq}d{chainvar}'
if dxdz_name is None:
dxdz_name = f'd{chainvar}d{var}'
dydx = self.sym_or_eq(name, dydx_name)
dxdz = self.sym_or_eq(name, dxdz_name)
# Save time for for large models if one multiplicand is zero,
# which is not checked for by sympy
if not smart_is_zero_matrix(dydx) and not \
smart_is_zero_matrix(dxdz):
if dxdz.shape[1] == 1 and \
self._eqs[name].shape[1] != dxdz.shape[1]:
for iz in range(self._eqs[name].shape[1]):
self._eqs[name][:, iz] += smart_multiply(dydx, dxdz)
else:
self._eqs[name] += smart_multiply(dydx, dxdz)
[docs] def sym_or_eq(self, name: str, varname: str) -> sp.Matrix:
"""
Returns symbols or equations depending on whether a given
variable appears in the function signature or not.
:param name:
name of function for which the signature should be checked
:param varname:
name of the variable which should be contained in the
function signature
:return:
the variable symbols if the variable is part of the signature and
the variable equations otherwise.
"""
# dwdx and dwdp will be dynamically computed and their ordering
# within a column may differ from the initialization of symbols here,
# so those are not safe to use. Not removing them from signature as
# this would break backwards compatibility.
if var_in_function_signature(name, varname) \
and varname not in ['dwdx', 'dwdp']:
return self.sym(varname)
else:
return self.eq(varname)
def _multiplication(self, name: str, x: str, y: str,
transpose_x: Optional[bool] = False,
sign: Optional[int] = 1):
"""
Creates a new symbolic variable according to a multiplication
:param name:
name of resulting symbolic variable, default is d{eq}d{var}
:param x:
name of the symbolic variable that defines the first factor
:param y:
name of the symbolic variable that defines the second factor
:param transpose_x:
indicates whether the first factor should be
transposed before multiplication
:param sign:
defines the sign of the product, should be +1 or -1
"""
if sign not in [-1, 1]:
raise TypeError(f'sign must be +1 or -1, was {sign}')
variables = dict()
for varname in [x, y]:
if var_in_function_signature(name, varname):
variables[varname] = self.sym(varname)
else:
variables[varname] = self.eq(varname)
if transpose_x:
xx = variables[x].transpose()
else:
xx = variables[x]
yy = variables[y]
self._eqs[name] = sign * smart_multiply(xx, yy)
def _equation_from_component(self, name: str, component: str) -> None:
"""
Generates the formulas of a symbolic variable from the attributes
:param name:
name of resulting symbolic variable
:param component:
name of the attribute
"""
self._eqs[name] = sp.Matrix(
[comp.get_val() for comp in getattr(self, component)]
)
[docs] def get_conservation_laws(self) -> List[Tuple[sp.Symbol, sp.Basic]]:
""" Returns a list of states with conservation law set
:return:
list of state identifiers
"""
return [
(state.get_id(), state._conservation_law)
for state in self._states
if state._conservation_law is not None
]
def _generate_value(self, name: str) -> None:
"""
Generates the numeric values of a symbolic variable from value
prototypes
:param name:
name of resulting symbolic variable
"""
if name in self._value_prototype:
component = self._value_prototype[name]
else:
raise ValueError(f'No values for {name}')
self._vals[name] = [comp.get_val()
for comp in getattr(self, component)]
def _generate_name(self, name: str) -> None:
"""
Generates the names of a symbolic variable from variable prototypes or
equation prototypes
:param name:
name of resulting symbolic variable
"""
if name in self._variable_prototype:
component = self._variable_prototype[name]
elif name in self._equation_prototype:
component = self._equation_prototype[name]
else:
raise ValueError(f'No names for {name}')
self._names[name] = [comp.get_name()
for comp in getattr(self, component)]
[docs] def state_has_fixed_parameter_initial_condition(self, ix: int) -> bool:
"""
Checks whether the state at specified index has a fixed parameter
initial condition
:param ix:
state index
:return:
boolean indicating if any of the initial condition free
variables is contained in the model constants
"""
ic = self._states[ix].get_val()
if not isinstance(ic, sp.Basic):
return False
return any([
fp in [c.get_id() for c in self._constants]
for fp in ic.free_symbols
])
[docs] def state_has_conservation_law(self, ix: int) -> bool:
"""
Checks whether the state at specified index has a conservation
law set
:param ix:
state index
:return:
boolean indicating if conservation_law is not None
"""
return self._states[ix]._conservation_law is not None
[docs] def state_is_constant(self, ix: int) -> bool:
"""
Checks whether the temporal derivative of the state is zero
:param ix:
state index
:return:
boolean indicating if constant over time
"""
return self._states[ix].get_dt() == 0.0
[docs] def conservation_law_has_multispecies(self,
tcl: ConservationLaw) -> bool:
"""
Checks whether a conservation law has multiple species or it just
defines one constant species
:param tcl:
conservation law
:return:
boolean indicating if conservation_law is not None
"""
state_set = set(self.sym('x_rdata'))
n_species = len(state_set.intersection(tcl.get_val().free_symbols))
return n_species > 1
def _print_with_exception(math: sp.Expr) -> str:
"""
Generate C++ code for a symbolic expression
:param math:
symbolic expression
:return:
C++ code for the specified expression
"""
# get list of custom replacements
user_functions = {fun['sympy']: fun['c++'] for fun in CUSTOM_FUNCTIONS}
try:
ret = cxxcode(math, standard='c++11', user_functions=user_functions)
ret = re.sub(r'(^|\W)M_PI(\W|$)', r'\1amici::pi\2', ret)
return ret
except TypeError as e:
raise ValueError(
f'Encountered unsupported function in expression "{math}": '
f'{e}!'
)
def _get_sym_lines_array(equations: sp.Matrix,
variable: str,
indent_level: int) -> List[str]:
"""
Generate C++ code for assigning symbolic terms in symbols to C++ array
`variable`.
:param equations:
vectors of symbolic expressions
:param variable:
name of the C++ array to assign to
:param indent_level:
indentation level (number of leading blanks)
:return:
C++ code as list of lines
"""
return [' ' * indent_level + f'{variable}[{index}] = '
f'{_print_with_exception(math)};'
for index, math in enumerate(equations)
if not (math == 0 or math == 0.0)]
def _get_sym_lines_symbols(symbols: sp.Matrix,
equations: sp.Matrix,
variable: str,
indent_level: int) -> List[str]:
"""
Generate C++ code for where array elements are directly replaced with
their corresponding macro symbol
:param symbols:
vectors of symbols that equations are assigned to
:param equations:
vectors of expressions
:param variable:
name of the C++ array to assign to, only used in comments
:param indent_level:
indentation level (number of leading blanks)
:return:
C++ code as list of lines
"""
return [f'{" " * indent_level}{sym} = {_print_with_exception(math)};'
f' // {variable}[{index}]'.replace('\n',
'\n' + ' ' * indent_level)
for index, (sym, math) in enumerate(zip(symbols, equations))
if not (math == 0 or math == 0.0)]
[docs]class ODEExporter:
"""
The ODEExporter class generates AMICI C++ files for ODE model as
defined in symbolic expressions.
:ivar model:
ODE definition
:ivar outdir:
see :meth:`amici.ode_export.ODEExporter.set_paths`
:ivar verbose:
more verbose output if True
:ivar assume_pow_positivity:
if set to true, a special pow function is
used to avoid problems with state variables that may become negative
due to numerical errors
compiler: distutils/setuptools compiler selection to build the
python extension
:ivar functions:
carries C++ function signatures and other specifications
:ivar model_name:
name of the model that will be used for compilation
:ivar model_path:
path to the generated model specific files
:ivar model_swig_path:
path to the generated swig files
:ivar allow_reinit_fixpar_initcond:
indicates whether reinitialization of
initial states depending on fixedParameters is allowed for this model
:ivar _build_hints:
If the given model uses special functions, this set contains hints for
model building.
"""
[docs] def __init__(
self,
ode_model: ODEModel,
outdir: Optional[str] = None,
verbose: Optional[Union[bool, int]] = False,
assume_pow_positivity: Optional[bool] = False,
compiler: Optional[str] = None,
allow_reinit_fixpar_initcond: Optional[bool] = True
):
"""
Generate AMICI C++ files for the ODE provided to the constructor.
:param ode_model:
ODE definition
:param outdir:
see :meth:`amici.ode_export.ODEExporter.set_paths`
:param verbose:
verbosity level for logging, True/False default to
logging.Error/logging.DEBUG
:param assume_pow_positivity:
if set to true, a special pow function is
used to avoid problems with state variables that may become
negative due to numerical errors
:param compiler: distutils/setuptools compiler selection to build the
python extension
:param allow_reinit_fixpar_initcond:
see :class:`amici.ode_export.ODEExporter`
"""
set_log_level(logger, verbose)
self.outdir: str = outdir
self.verbose: bool = logger.getEffectiveLevel() <= logging.DEBUG
self.assume_pow_positivity: bool = assume_pow_positivity
self.compiler: str = compiler
self.model_name: str = 'model'
output_dir = os.path.join(os.getcwd(),
f'amici-{self.model_name}')
self.model_path: str = os.path.abspath(output_dir)
self.model_swig_path: str = os.path.join(self.model_path, 'swig')
# Signatures and properties of generated model functions (see
# include/amici/model.h for details)
self.model: ODEModel = ode_model
# To only generate a subset of functions, apply subselection here
self.functions: Dict[str, Dict[str, Union[str, List[str]]]] = \
copy.deepcopy(functions)
self.allow_reinit_fixpar_initcond: bool = allow_reinit_fixpar_initcond
self._build_hints = set()
[docs] @log_execution_time('generating cpp code', logger)
def generate_model_code(self) -> None:
"""
Generates the native C++ code for the loaded model and a Matlab
script that can be run to compile a mex file from the C++ code
"""
with _monkeypatched(sp.Pow, '_eval_derivative',
_custom_pow_eval_derivative):
self._prepare_model_folder()
self._generate_c_code()
self._generate_m_code()
[docs] @log_execution_time('compiling cpp code', logger)
def compile_model(self) -> None:
"""
Compiles the generated code it into a simulatable module
"""
self._compile_c_code(compiler=self.compiler,
verbose=self.verbose)
def _prepare_model_folder(self) -> None:
"""
Remove all files from the model folder.
"""
for file in os.listdir(self.model_path):
file_path = os.path.join(self.model_path, file)
if os.path.isfile(file_path):
os.remove(file_path)
def _generate_c_code(self) -> None:
"""
Create C++ code files for the model based on ODEExporter.model
"""
for function in self.functions.keys():
if 'dont_generate_body' not in \
self.functions[function].get('flags', []):
dec = log_execution_time(f'writing {function}.cpp', logger)
dec(self._write_function_file)(function)
if function in sparse_functions:
self._write_function_index(function, 'colptrs')
self._write_function_index(function, 'rowvals')
for name in self.model.sym_names():
self._write_index_files(name)
self._write_wrapfunctions_cpp()
self._write_wrapfunctions_header()
self._write_model_header_cpp()
self._write_c_make_file()
self._write_swig_files()
self._write_module_setup()
shutil.copy(CXX_MAIN_TEMPLATE_FILE,
os.path.join(self.model_path, 'main.cpp'))
def _compile_c_code(self,
verbose: Optional[Union[bool, int]] = False,
compiler: Optional[str] = None) -> None:
"""
Compile the generated model code
:param verbose:
Make model compilation verbose
:param compiler:
distutils/setuptools compiler selection to build the python
extension
"""
# setup.py assumes it is run from within the model directory
module_dir = self.model_path
script_args = [sys.executable, os.path.join(module_dir, 'setup.py')]
if verbose:
script_args.append('--verbose')
else:
script_args.append('--quiet')
script_args.extend(['build_ext', f'--build-lib={module_dir}'])
if compiler is not None:
script_args.extend([f'--compiler={compiler}'])
# distutils.core.run_setup looks nicer, but does not let us check the
# result easily
try:
result = subprocess.run(script_args,
cwd=module_dir,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
check=True)
except subprocess.CalledProcessError as e:
print(e.output.decode('utf-8'))
print("Failed building the model extension.")
if self._build_hints:
print("Note:")
print('\n'.join(self._build_hints))
raise
if verbose:
print(result.stdout.decode('utf-8'))
def _generate_m_code(self) -> None:
"""
Create a Matlab script for compiling code files to a mex file
"""
# creating the code lines for the Matlab compile script
lines = []
# Events are not yet implemented. Once this is done, the variable nz
# will have to be replaced by "self.model.nz()"
nz = 0
# Second order code is not yet implemented. Once this is done,
# those variables will have to be replaced by
# "self.model.<var>true()", or the corresponding "model.self.o2flag"
nxtrue_rdata = self.model.num_states_rdata()
nytrue = self.model.num_obs()
o2flag = 0
# a preliminary comment
lines.append('% This compile script was automatically created from'
' Python SBML import.')
lines.append('% If mex compiler is set up within MATLAB, it can be run'
' from MATLAB ')
lines.append('% in order to compile a mex-file from the Python'
' generated C++ files.')
lines.append('')
# write the actual compiling code
lines.append(f"modelName = '{self.model_name}';")
lines.append("amimodel.compileAndLinkModel"
"(modelName, '', [], [], [], []);")
lines.append(f"amimodel.generateMatlabWrapper({nxtrue_rdata}, "
f"{nytrue}, {self.model.num_par()}, {self.model.num_const()}, "
f"{nz}, {o2flag}, ...\n [], "
"['simulate_' modelName '.m'], modelName, ...\n"
" 'lin', 1, 1);")
# write compile script (for mex)
compile_script = os.path.join(self.model_path, 'compileMexFile.m')
with open(compile_script, 'w') as fileout:
fileout.write('\n'.join(lines))
def _write_index_files(self, name: str) -> None:
"""
Write index file for a symbolic array.
:param name:
key in self.model._syms for which the respective file should
be written
"""
lines = []
if name in self.model.sym_names():
if name in sparse_functions:
symbols = self.model.sparsesym(name)
else:
symbols = self.model.sym(name).T
# flatten multiobs
if isinstance(next(iter(symbols), None), list):
symbols = [symbol for obs in symbols for symbol in obs]
else:
raise ValueError(f'Unknown symbolic array: {name}')
for index, symbol in enumerate(symbols):
symbol_name = strip_pysb(symbol)
if str(symbol) == '0':
continue
lines.append(
f'#define {symbol_name} {name}[{index}]'
)
with open(os.path.join(self.model_path, f'{name}.h'), 'w') as fileout:
fileout.write('\n'.join(lines))
def _write_function_file(self, function: str) -> None:
"""
Generate equations and write the C++ code for the function
`function`.
:param function:
name of the function to be written (see self.functions)
"""
# first generate the equations to make sure we have everything we
# need in subsequent steps
if function in sparse_functions:
equations = self.model.sparseeq(function)
elif not self.allow_reinit_fixpar_initcond \
and function == 'sx0_fixedParameters':
# Not required. Will create empty function body.
equations = sp.Matrix()
else:
equations = self.model.eq(function)
# function header
lines = [
'#include "amici/symbolic_functions.h"',
'#include "amici/defines.h"',
'#include "sundials/sundials_types.h"',
'',
'#include <array>',
]
# function signature
signature = self.functions[function]['signature']
lines.append('')
for sym in self.model.sym_names():
# added |double for data
# added '[0]*' for initial conditions
if re.search(
fr'const (realtype|double) \*{sym}[0]*[,)]+', signature
) or (function == sym and function not in non_unique_id_symbols):
lines.append(f'#include "{sym}.h"')
lines.extend([
'',
'namespace amici {',
f'namespace model_{self.model_name} {{',
'',
])
lines.append(f'void {function}_{self.model_name}{signature}{{')
# function body
body = self._get_function_body(function, equations)
if self.assume_pow_positivity and 'assume_pow_positivity' \
in self.functions[function].get('flags', []):
body = [re.sub(r'(^|\W)std::pow\(', r'\1amici::pos_pow(', line)
for line in body]
# execute this twice to catch cases where the ending ( would be the
# starting (^|\W) for the following match
body = [re.sub(r'(^|\W)std::pow\(', r'\1amici::pos_pow(', line)
for line in body]
self.functions[function]['body'] = body
lines += body
lines.extend([
'}',
'',
'} // namespace amici',
f'}} // namespace model_{self.model_name}',
])
# check custom functions
for fun in CUSTOM_FUNCTIONS:
if 'include' in fun and any(fun['c++'] in line for line in lines):
if 'build_hint' in fun:
self._build_hints.add(fun['build_hint'])
lines.insert(0, fun['include'])
# if not body is None:
with open(os.path.join(
self.model_path, f'{self.model_name}_{function}.cpp'), 'w'
) as fileout:
fileout.write('\n'.join(lines))
def _write_function_index(self, function: str, indextype: str) -> None:
"""
Generate equations and write the C++ code for the function
`function`.
:param function:
name of the function to be written (see self.functions)
:param indextype:
type of index {'colptrs', 'rowvals'}
"""
if indextype == 'colptrs':
values = self.model.colptrs(function)
setter = 'indexptrs'
elif indextype == 'rowvals':
values = self.model.rowvals(function)
setter = 'indexvals'
else:
raise ValueError('Invalid value for indextype, must be colptrs or '
f'rowvals: {indextype}')
# function signature
if function in multiobs_functions:
signature = f'(SUNMatrixWrapper &{function}, int index)'
else:
signature = f'(SUNMatrixWrapper &{function})'
lines = [
'#include "amici/sundials_matrix_wrapper.h"',
'#include "sundials/sundials_types.h"',
'',
'#include <array>',
'#include <algorithm>',
'',
'namespace amici {',
f'namespace model_{self.model_name} {{',
'',
]
# Generate static array with indices
if len(values):
static_array_name = f"{function}_{indextype}_{self.model_name}_"
if function in multiobs_functions:
# list of index vectors
lines.append("static constexpr std::array<std::array<sunindextype, "
f"{len(values[0])}>, {len(values)}> "
f"{static_array_name} = {{{{")
lines.extend([' {'
+ ', '.join(map(str, index_vector)) + '}, '
for index_vector in values])
lines.append("}};")
else:
# single index vector
lines.append("static constexpr std::array<sunindextype, "
f"{len(values)}> {static_array_name} = {{")
lines.append(' ' + ', '.join(map(str, values)))
lines.append("};")
lines.extend([
'',
f'void {function}_{indextype}_{self.model_name}{signature}{{',
])
if len(values):
if function in multiobs_functions:
lines.append(f" {function}.set_{setter}(gsl::make_span({static_array_name}[index]));")
else:
lines.append(f" {function}.set_{setter}(gsl::make_span({static_array_name}));")
lines.extend([
'}'
'',
'} // namespace amici',
f'}} // namespace model_{self.model_name}',
])
filename = f'{self.model_name}_{function}_{indextype}.cpp'
filename = os.path.join(self.model_path, filename)
with open(filename, 'w') as fileout:
fileout.write('\n'.join(lines))
def _get_function_body(self,
function: str,
equations: sp.Matrix) -> List[str]:
"""
Generate C++ code for body of function `function`.
:param function:
name of the function to be written (see self.functions)
:param equations:
symbolic definition of the function body
:return:
generated C++ code
"""
lines = []
if len(equations) == 0 or (isinstance(equations, (sp.Matrix,
sp.ImmutableDenseMatrix))
and min(equations.shape) == 0):
# dJydy is a list
return lines
if not self.allow_reinit_fixpar_initcond \
and function in ['sx0_fixedParameters', 'x0_fixedParameters']:
return lines
if function == 'sx0_fixedParameters':
# here we only want to overwrite values where x0_fixedParameters
# was applied
lines.extend([
# Keep list of indices of fixed parameters occurring in x0
" static const std::array<int, "
+ str(len(self.model._x0_fixedParameters_idx))
+ "> _x0_fixedParameters_idxs = {",
" "
+ ', '.join(str(x)
for x in self.model._x0_fixedParameters_idx),
" };",
"",
# Set all parameters that are to be reset to 0, so that the
# switch statement below only needs to handle non-zero entries
# (which usually reduces file size and speeds up
# compilation significantly).
" for(auto idx: _x0_fixedParameters_idxs) {",
" sx0_fixedParameters[idx] = 0.0;",
" }"])
cases = dict()
for ipar in range(self.model.num_par()):
expressions = []
for index, formula in zip(
self.model._x0_fixedParameters_idx,
equations[:, ipar]
):
if not formula.is_zero:
expressions.append(
f'{function}[{index}] = '
f'{_print_with_exception(formula)};')
cases[ipar] = expressions
lines.extend(get_switch_statement('ip', cases, 1))
elif function == 'x0_fixedParameters':
for index, formula in zip(
self.model._x0_fixedParameters_idx,
equations
):
lines.append(f'{function}[{index}] = '
f'{_print_with_exception(formula)};')
elif function in sensi_functions:
cases = {ipar: _get_sym_lines_array(equations[:, ipar], function,
0)
for ipar in range(self.model.num_par())
if not smart_is_zero_matrix(equations[:, ipar])}
lines.extend(get_switch_statement('ip', cases, 1))
elif function in multiobs_functions:
if function == 'dJydy':
cases = {iobs: _get_sym_lines_array(equations[iobs], function,
0)
for iobs in range(self.model.num_obs())
if not smart_is_zero_matrix(equations[iobs])}
else:
cases = {iobs: _get_sym_lines_array(equations[:, iobs], function,
0)
for iobs in range(self.model.num_obs())
if not smart_is_zero_matrix(equations[:, iobs])}
lines.extend(get_switch_statement('iy', cases, 1))
elif function in self.model.sym_names() \
and function not in non_unique_id_symbols:
if function in sparse_functions:
symbols = self.model.sparsesym(function)
else:
symbols = self.model.sym(function, stripped=True)
lines += _get_sym_lines_symbols(symbols, equations, function, 4)
else:
lines += _get_sym_lines_array(equations, function, 4)
return [line for line in lines if line]
def _write_wrapfunctions_cpp(self) -> None:
"""
Write model-specific 'wrapper' file (wrapfunctions.cpp).
"""
template_data = {'MODELNAME': self.model_name}
apply_template(
os.path.join(amiciSrcPath, 'wrapfunctions.template.cpp'),
os.path.join(self.model_path, 'wrapfunctions.cpp'),
template_data
)
def _write_wrapfunctions_header(self) -> None:
"""
Write model-specific header file (wrapfunctions.h).
"""
template_data = {'MODELNAME': str(self.model_name)}
apply_template(
os.path.join(amiciSrcPath, 'wrapfunctions.ODE_template.h'),
os.path.join(self.model_path, 'wrapfunctions.h'),
template_data
)
def _write_model_header_cpp(self) -> None:
"""
Write model-specific header and cpp file (MODELNAME.{h,cpp}).
"""
tpl_data = {
'MODELNAME': str(self.model_name),
'NX_RDATA': str(self.model.num_states_rdata()),
'NXTRUE_RDATA': str(self.model.num_states_rdata()),
'NX_SOLVER': str(self.model.num_states_solver()),
'NXTRUE_SOLVER': str(self.model.num_states_solver()),
'NX_SOLVER_REINIT': str(self.model.num_state_reinits()),
'NY': str(self.model.num_obs()),
'NYTRUE': str(self.model.num_obs()),
'NZ': '0',
'NZTRUE': '0',
'NEVENT': '0',
'NOBJECTIVE': '1',
'NW': str(len(self.model.sym('w'))),
'NDWDP': str(len(self.model.sparsesym('dwdp'))),
'NDWDX': str(len(self.model.sparsesym('dwdx'))),
'NDWDW': str(len(self.model.sparsesym('dwdw'))),
'NDXDOTDW': str(len(self.model.sparsesym('dxdotdw'))),
'NDXDOTDP_EXPLICIT': str(len(self.model.sparsesym(
'dxdotdp_explicit'))),
'NDXDOTDX_EXPLICIT': str(len(self.model.sparsesym(
'dxdotdx_explicit'))),
'NDJYDY': 'std::vector<int>{%s}'
% ','.join(str(len(x))
for x in self.model.sparsesym('dJydy')),
'UBW': str(self.model.num_states_solver()),
'LBW': str(self.model.num_states_solver()),
'NP': str(self.model.num_par()),
'NK': str(self.model.num_const()),
'O2MODE': 'amici::SecondOrderMode::none',
# using cxxcode ensures proper handling of nan/inf
'PARAMETERS': _print_with_exception(self.model.val('p'))[1:-1],
'FIXED_PARAMETERS': _print_with_exception(self.model.val('k'))[
1:-1],
'PARAMETER_NAMES_INITIALIZER_LIST':
self._get_symbol_name_initializer_list('p'),
'STATE_NAMES_INITIALIZER_LIST':
self._get_symbol_name_initializer_list('x_rdata'),
'FIXED_PARAMETER_NAMES_INITIALIZER_LIST':
self._get_symbol_name_initializer_list('k'),
'OBSERVABLE_NAMES_INITIALIZER_LIST':
self._get_symbol_name_initializer_list('y'),
'PARAMETER_IDS_INITIALIZER_LIST':
self._get_symbol_id_initializer_list('p'),
'STATE_IDS_INITIALIZER_LIST':
self._get_symbol_id_initializer_list('x_rdata'),
'FIXED_PARAMETER_IDS_INITIALIZER_LIST':
self._get_symbol_id_initializer_list('k'),
'OBSERVABLE_IDS_INITIALIZER_LIST':
self._get_symbol_id_initializer_list('y'),
'REINIT_FIXPAR_INITCOND':
'true' if self.allow_reinit_fixpar_initcond else
'false',
'AMICI_VERSION_STRING': __version__,
'AMICI_COMMIT_STRING': __commit__,
'W_RECURSION_DEPTH': self.model._w_recursion_depth,
'QUADRATIC_LLH': 'true'
if self.model._has_quadratic_nllh else 'false',
}
for fun in [
'w', 'dwdp', 'dwdx', 'dwdw', 'x_rdata', 'x_solver', 'total_cl',
'dxdotdw', 'dxdotdp_explicit', 'dxdotdx_explicit',
'dJydy'
]:
tpl_data[f'{fun.upper()}_DEF'] = \
get_function_extern_declaration(fun, self.model_name)
tpl_data[f'{fun.upper()}_IMPL'] = \
get_model_override_implementation(fun, self.model_name)
if fun in sparse_functions:
tpl_data[f'{fun.upper()}_COLPTRS_DEF'] = \
get_sunindex_extern_declaration(fun, self.model_name,
'colptrs')
tpl_data[f'{fun.upper()}_COLPTRS_IMPL'] = \
get_sunindex_override_implementation(fun, self.model_name,
'colptrs')
tpl_data[f'{fun.upper()}_ROWVALS_DEF'] = \
get_sunindex_extern_declaration(fun, self.model_name,
'rowvals')
tpl_data[f'{fun.upper()}_ROWVALS_IMPL'] = \
get_sunindex_override_implementation(fun, self.model_name,
'rowvals')
if self.model.num_states_solver() == self.model.num_states_rdata():
tpl_data['X_RDATA_DEF'] = ''
tpl_data['X_RDATA_IMPL'] = ''
apply_template(
os.path.join(amiciSrcPath, 'model_header.ODE_template.h'),
os.path.join(self.model_path, f'{self.model_name}.h'),
tpl_data
)
apply_template(
os.path.join(amiciSrcPath, 'model.ODE_template.cpp'),
os.path.join(self.model_path, f'{self.model_name}.cpp'),
tpl_data
)
def _get_symbol_name_initializer_list(self, name: str) -> str:
"""
Get SBML name initializer list for vector of names for the given
model entity
:param name:
any key present in self.model._syms
:return:
Template initializer list of names
"""
return '\n'.join(
[
f'"{symbol}",'
for symbol in self.model.name(name)
]
)
def _get_symbol_id_initializer_list(self, name: str) -> str:
"""
Get C++ initializer list for vector of names for the given model
entity
:param name:
any key present in self.model._syms
:return:
Template initializer list of ids
"""
return '\n'.join(
[
f'"{strip_pysb(symbol)}",'
for symbol in self.model.sym(name)
]
)
def _write_c_make_file(self):
"""
Write CMake CMakeLists.txt file for this model.
"""
sources = [self.model_name + '_' + function + '.cpp '
for function in self.functions.keys()
if self.functions[function].get('body', None) is not None]
# add extra source files for sparse matrices
for function in sparse_functions:
sources.append(self.model_name + '_' + function
+ '_colptrs.cpp')
sources.append(self.model_name + '_' + function
+ '_rowvals.cpp ')
sources.append(f'{self.model_name}.cpp')
template_data = {'MODELNAME': self.model_name,
'SOURCES': '\n'.join(sources),
'AMICI_VERSION': __version__}
apply_template(
MODEL_CMAKE_TEMPLATE_FILE,
os.path.join(self.model_path, 'CMakeLists.txt'),
template_data
)
def _write_swig_files(self) -> None:
"""
Write SWIG interface files for this model.
"""
if not os.path.exists(self.model_swig_path):
os.makedirs(self.model_swig_path)
template_data = {'MODELNAME': self.model_name}
apply_template(
os.path.join(amiciSwigPath, 'modelname.template.i'),
os.path.join(self.model_swig_path, self.model_name + '.i'),
template_data
)
shutil.copy(SWIG_CMAKE_TEMPLATE_FILE,
os.path.join(self.model_swig_path, 'CMakeLists.txt'))
def _write_module_setup(self) -> None:
"""
Create a distutils setup.py file for compile the model module.
"""
template_data = {'MODELNAME': self.model_name,
'AMICI_VERSION': __version__,
'PACKAGE_VERSION': '0.1.0'}
apply_template(os.path.join(amiciModulePath, 'setup.template.py'),
os.path.join(self.model_path, 'setup.py'),
template_data)
apply_template(os.path.join(amiciModulePath, 'MANIFEST.template.in'),
os.path.join(self.model_path, 'MANIFEST.in'), {})
# write __init__.py for the model module
if not os.path.exists(os.path.join(self.model_path, self.model_name)):
os.makedirs(os.path.join(self.model_path, self.model_name))
apply_template(
os.path.join(amiciModulePath, '__init__.template.py'),
os.path.join(self.model_path, self.model_name, '__init__.py'),
template_data
)
[docs] def set_paths(self, output_dir: str) -> None:
"""
Set output paths for the model and create if necessary
:param output_dir:
relative or absolute path where the generated model
code is to be placed. will be created if does not exists.
"""
self.model_path = os.path.abspath(output_dir)
self.model_swig_path = os.path.join(self.model_path, 'swig')
for directory in [self.model_path, self.model_swig_path]:
if not os.path.exists(directory):
os.makedirs(directory)
[docs] def set_name(self, model_name: str) -> None:
"""
Sets the model name
:param model_name:
name of the model (may only contain upper and lower case letters,
digits and underscores, and must not start with a digit)
"""
if not is_valid_identifier(model_name):
raise ValueError(
f"'{model_name}' is not a valid model name. "
"Model name may only contain upper and lower case letters, "
"digits and underscores, and must not start with a digit.")
self.model_name = model_name
[docs]class TemplateAmici(Template):
"""
Template format used in AMICI (see string.template for more details).
:ivar delimiter:
delimiter that identifies template variables
"""
delimiter = 'TPL_'
[docs]def apply_template(source_file: str,
target_file: str,
template_data: Dict[str, str]) -> None:
"""
Load source file, apply template substitution as provided in
templateData and save as targetFile.
:param source_file:
relative or absolute path to template file
:param target_file:
relative or absolute path to output file
:param template_data:
template keywords to substitute (key is template
variable without :attr:`TemplateAmici.delimiter`)
"""
with open(source_file) as filein:
src = TemplateAmici(filein.read())
result = src.safe_substitute(template_data)
with open(target_file, 'w') as fileout:
fileout.write(result)
[docs]def strip_pysb(symbol: sp.Basic) -> sp.Basic:
"""
Strips pysb info from a :class:`pysb.Component` object
:param symbol:
symbolic expression
:return:
stripped expression
"""
# strip pysb type and transform into a flat sympy.Symbol.
# this ensures that the pysb type specific __repr__ is used when converting
# to string
if pysb and isinstance(symbol, pysb.Component):
return sp.Symbol(symbol.name, real=True)
else:
# in this case we will use sympy specific transform anyways
return symbol
[docs]def get_function_extern_declaration(fun: str, name: str) -> str:
"""
Constructs the extern function declaration for a given function
:param fun:
function name
:param name:
model name
:return:
c++ function definition string
"""
return \
f'extern void {fun}_{name}{functions[fun]["signature"]};'
[docs]def get_sunindex_extern_declaration(fun: str, name: str,
indextype: str) -> str:
"""
Constructs the function declaration for an index function of a given
function
:param fun:
function name
:param name:
model name
:param indextype:
index function {'colptrs', 'rowvals'}
:return:
c++ function declaration string
"""
index_arg = ', int index' if fun in multiobs_functions else ''
return \
f'extern void {fun}_{indextype}_{name}' \
f'(SUNMatrixWrapper &{indextype}{index_arg});'
[docs]def get_model_override_implementation(fun: str, name: str) -> str:
"""
Constructs amici::Model::* override implementation for a given function
:param fun:
function name
:param name:
model name
:return:
c++ function implementation string
"""
return \
'virtual void f{fun}{signature} override {{\n' \
'{ind8}{fun}_{name}{eval_signature};\n' \
'{ind4}}}\n'.format(
ind4=' '*4,
ind8=' '*8,
fun=fun,
name=name,
signature=functions[fun]["signature"],
eval_signature=remove_typedefs(functions[fun]["signature"])
)
[docs]def get_sunindex_override_implementation(fun: str, name: str,
indextype: str) -> str:
"""
Constructs the amici::Model:: function implementation for an index
function of a given function
:param fun:
function name
:param name:
model name
:param indextype:
index function {'colptrs', 'rowvals'}
:return:
c++ function implementation string
"""
index_arg = ', int index' if fun in multiobs_functions else ''
index_arg_eval = ', index' if fun in multiobs_functions else ''
return \
'virtual void f{fun}_{indextype}{signature} override {{\n' \
'{ind8}{fun}_{indextype}_{name}{eval_signature};\n' \
'{ind4}}}\n'.format(
ind4=' '*4,
ind8=' '*8,
fun=fun,
indextype=indextype,
name=name,
signature=f'(SUNMatrixWrapper &{indextype}{index_arg})',
eval_signature=f'({indextype}{index_arg_eval})',
)
[docs]def remove_typedefs(signature: str) -> str:
"""
Strips typedef info from a function signature
:param signature:
function signature
:return:
string that can be used to construct function calls with the same
variable names and ordering as in the function signature
"""
# remove * pefix for pointers (pointer must always be removed before
# values otherwise we will inadvertently dereference values,
# same applies for const specifications)
#
# always add whitespace after type definition for cosmetic reasons
typedefs = [
'const realtype *',
'const double *',
'const realtype ',
'double *',
'realtype *',
'const int ',
'int ',
'SUNMatrixContent_Sparse ',
]
for typedef in typedefs:
signature = signature.replace(typedef, ' ')
return signature
[docs]def get_switch_statement(condition: str, cases: Dict[int, List[str]],
indentation_level: Optional[int] = 0,
indentation_step: Optional[str] = ' ' * 4):
"""
Generate code for switch statement
:param condition:
Condition for switch
:param cases:
Cases as dict with expressions as keys and statement as
list of strings
:param indentation_level:
indentation level
:param indentation_step:
indentation whitespace per level
:return:
Code for switch expression as list of strings
"""
lines = list()
if not cases:
return lines
for expression, statements in cases.items():
if statements:
lines.append((indentation_level + 1) * indentation_step
+ f'case {expression}:')
for statement in statements:
lines.append((indentation_level + 2) * indentation_step
+ statement)
lines.append((indentation_level + 2) * indentation_step + 'break;')
if lines:
lines.insert(0, indentation_level * indentation_step
+ f'switch({condition}) {{')
lines.append(indentation_level * indentation_step + '}')
return lines
[docs]def csc_matrix(matrix: sp.Matrix,
rownames: List[sp.Symbol],
colnames: List[sp.Symbol],
identifier: Optional[int] = 0,
pattern_only: Optional[bool] = False) -> Tuple[
List[int], List[int], sp.Matrix, List[str], sp.Matrix
]:
"""
Generates the sparse symbolic identifiers, symbolic identifiers,
sparse matrix, column pointers and row values for a symbolic
variable
:param matrix:
dense matrix to be sparsified
:param rownames:
ids of the variable of which the derivative is computed (assuming
matrix is the jacobian)
:param colnames:
ids of the variable with respect to which the derivative is computed
(assuming matrix is the jacobian)
:param identifier:
additional identifier that gets appended to symbol names to
ensure their uniqueness in outer loops
:param pattern_only:
flag for computing sparsity pattern without whole matrix
:return:
symbol_col_ptrs, symbol_row_vals, sparse_list, symbol_list,
sparse_matrix
"""
idx = 0
nrows, ncols = matrix.shape
if not pattern_only:
sparse_matrix = sp.zeros(nrows, ncols)
symbol_list = []
sparse_list = []
symbol_col_ptrs = []
symbol_row_vals = []
for col in range(0, ncols):
symbol_col_ptrs.append(idx)
for row in range(0, nrows):
if matrix[row, col] == 0:
continue
symbol_row_vals.append(row)
idx += 1
symbol_name = f'd{_print_with_exception(rownames[row])}' \
f'_d{_print_with_exception(colnames[col])}'
if identifier:
symbol_name += f'_{identifier}'
symbol_list.append(symbol_name)
if pattern_only:
continue
sparse_matrix[row, col] = sp.Symbol(symbol_name, real=True)
sparse_list.append(matrix[row, col])
if idx == 0:
symbol_col_ptrs = [] # avoid bad memory access for empty matrices
else:
symbol_col_ptrs.append(idx)
if pattern_only:
sparse_matrix = None
else:
sparse_list = sp.Matrix(sparse_list)
return symbol_col_ptrs, symbol_row_vals, sparse_list, symbol_list, \
sparse_matrix
[docs]def is_valid_identifier(x: str) -> bool:
"""
Check whether `x` is a valid identifier for conditions, parameters,
observables... . Identifiers may only contain upper and lower case letters,
digits and underscores, and must not start with a digit.
:param x:
string to check
:return:
``True`` if valid, ``False`` otherwise
"""
return re.match(r'^[a-zA-Z_]\w*$', x) is not None
[docs]def generate_measurement_symbol(observable_id: Union[str, sp.Symbol]):
"""
Generates the appropriate measurement symbol for the provided observable
:param observable_id:
symbol (or string representation) of the observable
:return:
symbol for the corresponding measurement
"""
if not isinstance(observable_id, str):
observable_id = strip_pysb(observable_id)
return symbol_with_assumptions(f'm{observable_id}')
[docs]def generate_flux_symbol(reaction_index: int) -> sp.Symbol:
"""
Generate identifier symbol for a reaction flux.
This function will always return the same unique python object for a
given entity.
:param reaction_index:
index of the reaction to which the flux corresponds
:return:
identifier symbol
"""
return symbol_with_assumptions(f'flux_r{reaction_index}')
[docs]def symbol_with_assumptions(name: str):
"""
Central function to create symbols with consistent, canonical assumptions
:param name:
name of the symbol
:return:
symbol with canonical assumptions
"""
return sp.Symbol(name, real=True)
[docs]def cast_to_sym(value: Union[SupportsFloat, sp.Expr, BooleanAtom],
input_name: str) -> sp.Expr:
"""
Typecasts the value to sympy.Float if possible, and ensures the
value is a symbolic expression.
:param value:
value to be cast
:param input_name:
name of input variable
:return:
typecast value
"""
if isinstance(value, (sp.RealNumber, numbers.Number)):
value = sp.Float(float(value))
elif isinstance(value, BooleanAtom):
value = sp.Float(float(bool(value)))
if not isinstance(value, sp.Expr):
raise TypeError(f"Couldn't cast {input_name} to sympy.Expr, was "
f"{type(value)}")
return value
SymbolDef = Dict[sp.Symbol, Union[Dict[str, sp.Expr], sp.Expr]]
[docs]def smart_subs_dict(sym: sp.Expr,
subs: SymbolDef,
field: Optional[str] = None,
reverse: bool = True) -> sp.Expr:
"""
Subsitutes expressions completely flattening them out. Requires
sorting of expressions with toposort.
:param sym:
Symbolic expression in which expressions will be substituted
:param subs:
Substitutions
:param field:
Field of substitution expressions in subs.values(), if applicable
:param reverse:
Whether ordering in subs should be reversed. Note that substitution
requires the reverse order of what is required for evaluation.
:return:
Substituted symbolic expression
"""
s = [
(eid, expr[field] if field is not None else expr)
for eid, expr in subs.items()
]
if reverse:
s.reverse()
for substitution in s:
# note that substitution may change free symbols, so we have to do
# this recursively
if substitution[0] in sym.free_symbols:
sym = sym.subs(*substitution)
return sym
@contextlib.contextmanager
def _monkeypatched(obj: object, name: str, patch: Any):
"""
Temporarily monkeypatches an object.
:param obj:
object to be patched
:param name:
name of the attribute to be patched
:param patch:
patched value
"""
pre_patched_value = getattr(obj, name)
setattr(obj, name, patch)
try:
yield object
finally:
setattr(obj, name, pre_patched_value)
def _custom_pow_eval_derivative(self, s):
"""
Custom Pow derivative that removes a removeable singularity for
self.base == 0 and self.base.diff(s) == 0. This function is intended to
be monkeypatched into sp.Pow._eval_derivative.
:param self:
sp.Pow class
:param s:
variable with respect to which the derivative will be computed
"""
dbase = self.base.diff(s)
dexp = self.exp.diff(s)
part1 = sp.Pow(self.base, self.exp - 1) * self.exp * dbase
part2 = self * dexp * sp.log(self.base)
if self.base.is_nonzero or dbase.is_nonzero or part2.is_zero:
# first piece never applies or is zero anyways
return part1 + part2
return part1 + sp.Piecewise(
(self.base, sp.And(sp.Eq(self.base, 0), sp.Eq(dbase, 0))),
(part2, True)
)