Source code for amici.sbml_import

"""
SBML Import
-----------
This module provides all necessary functionality to import a model specified
in the `Systems Biology Markup Language (SBML) <http://sbml.org/Main_Page>`_.
"""


import sympy as sp
import libsbml as sbml
import re
import math
import itertools as itt
import warnings
import logging
import copy
from toposort import toposort
from typing import (
    Dict, List, Callable, Any, Iterable, Union, Optional, Tuple
)

from .ode_export import (
    ODEExporter, ODEModel, generate_measurement_symbol,
    symbol_with_assumptions, SymbolDef, smart_subs_dict
)
from .constants import SymbolId
from .logging import get_logger, log_execution_time, set_log_level
from . import has_clibs

from sympy.logic.boolalg import BooleanAtom


class SBMLException(Exception):
    pass


SymbolicFormula = Dict[sp.Symbol, sp.Expr]


default_symbols = {
    symbol: {} for symbol in SymbolId
}

ConservationLaw = Dict[str, Union[str, sp.Expr]]

logger = get_logger(__name__, logging.ERROR)


[docs]class SbmlImporter: """ Class to generate AMICI C++ files for a model provided in the Systems Biology Markup Language (SBML). :ivar show_sbml_warnings: indicates whether libSBML warnings should be displayed :ivar symbols: dict carrying symbolic definitions :ivar sbml_reader: The libSBML sbml reader .. warning:: Not storing this may result in a segfault. :ivar sbml_doc: document carrying the sbml definition .. warning:: Not storing this may result in a segfault. :ivar sbml: SBML model to import :ivar compartments: dict of compartment ids and compartment volumes :ivar stoichiometric_matrix: stoichiometric matrix of the model :ivar flux_vector: reaction kinetic laws :ivar _local_symbols: model symbols for sympy to consider during sympification see `locals`argument in `sympy.sympify` :ivar species_assignment_rules: Assignment rules for species. Key is symbolic identifier and value is assignment value :ivar compartment_assignment_rules: Assignment rules for compartments. Key is symbolic identifier and value is assignment value :ivar parameter_assignment_rules: assignment rules for parameters, these parameters are not permissible for sensitivity analysis :ivar initial_assignments: initial assignments for parameters, these parameters are not permissible for sensitivity analysis :ivar sbml_parser_settings: sets behaviour of SBML Formula parsing """
[docs] def __init__(self, sbml_source: Union[str, sbml.Model], show_sbml_warnings: bool = False, from_file: bool = True) -> None: """ Create a new Model instance. :param sbml_source: Either a path to SBML file where the model is specified, or a model string as created by sbml.sbmlWriter( ).writeSBMLToString() or an instance of `libsbml.Model`. :param show_sbml_warnings: Indicates whether libSBML warnings should be displayed. :param from_file: Whether `sbml_source` is a file name (True, default), or an SBML string """ if isinstance(sbml_source, sbml.Model): self.sbml_doc: sbml.Document = sbml_source.getSBMLDocument() else: self.sbml_reader: sbml.SBMLReader = sbml.SBMLReader() if from_file: sbml_doc = self.sbml_reader.readSBMLFromFile(sbml_source) else: sbml_doc = self.sbml_reader.readSBMLFromString(sbml_source) self.sbml_doc = sbml_doc self.show_sbml_warnings: bool = show_sbml_warnings # process document self._process_document() self.sbml: sbml.Model = self.sbml_doc.getModel() # Long and short names for model components self.symbols: Dict[SymbolId, Dict[sp.Symbol, Dict[str, Any]]] = {} self._local_symbols: Dict[str, Union[sp.Expr, sp.Function]] = {} self.compartments: SymbolicFormula = {} self.compartment_assignment_rules: SymbolicFormula = {} self.species_assignment_rules: SymbolicFormula = {} self.parameter_assignment_rules: SymbolicFormula = {} self.initial_assignments: SymbolicFormula = {} self._reset_symbols() # http://sbml.org/Software/libSBML/5.18.0/docs/python-api/classlibsbml_1_1_l3_parser_settings.html#abcfedd34efd3cae2081ba8f42ea43f52 # all defaults except disable unit parsing self.sbml_parser_settings = sbml.L3ParserSettings( self.sbml, sbml.L3P_PARSE_LOG_AS_LOG10, sbml.L3P_EXPAND_UNARY_MINUS, sbml.L3P_NO_UNITS, sbml.L3P_AVOGADRO_IS_CSYMBOL, sbml.L3P_COMPARE_BUILTINS_CASE_INSENSITIVE, None, sbml.L3P_MODULO_IS_PIECEWISE )
def _process_document(self) -> None: """ Validate and simplify document. """ # Ensure we got a valid SBML model, otherwise further processing # might lead to undefined results self.sbml_doc.validateSBML() _check_lib_sbml_errors(self.sbml_doc, self.show_sbml_warnings) # apply several model simplifications that make our life substantially # easier if self.sbml_doc.getModel().getNumFunctionDefinitions(): convert_config = sbml.SBMLFunctionDefinitionConverter()\ .getDefaultProperties() self.sbml_doc.convert(convert_config) convert_config = sbml.SBMLLocalParameterConverter().\ getDefaultProperties() self.sbml_doc.convert(convert_config) # If any of the above calls produces an error, this will be added to # the SBMLError log in the sbml document. Thus, it is sufficient to # check the error log just once after all conversion/validation calls. _check_lib_sbml_errors(self.sbml_doc, self.show_sbml_warnings) def _reset_symbols(self) -> None: """ Reset the symbols attribute to default values """ self.symbols = copy.deepcopy(default_symbols) self._local_symbols = dict()
[docs] def sbml2amici(self, model_name: str = None, output_dir: str = None, observables: Dict[str, Dict[str, str]] = None, constant_parameters: Iterable[str] = None, sigmas: Dict[str, Union[str, float]] = None, noise_distributions: Dict[str, Union[str, Callable]] = None, verbose: Union[int, bool] = logging.ERROR, assume_pow_positivity: bool = False, compiler: str = None, allow_reinit_fixpar_initcond: bool = True, compile: bool = True, compute_conservation_laws: bool = True, simplify: Callable = lambda x: sp.powsimp(x, deep=True), log_as_log10: bool = True, **kwargs) -> None: """ Generate and compile AMICI C++ files for the model provided to the constructor. The resulting model can be imported as a regular Python module (if `compile=True`), or used from Matlab or C++ as described in the documentation of the respective AMICI interface. Note that this generates model ODEs for changes in concentrations, not amounts unless the `hasOnlySubstanceUnits` attribute has been defined for a particular species. Sensitivity analysis for local parameters is enabled by creating global parameters _{reactionId}_{localParameterName}. :param model_name: name of the model/model directory :param output_dir: see :meth:`amici.ode_export.ODEExporter.set_paths` :param observables: dictionary( observableId:{'name':observableName (optional), 'formula':formulaString)}) to be added to the model :param constant_parameters: list of SBML Ids identifying constant parameters :param sigmas: dictionary(observableId: sigma value or (existing) parameter name) :param noise_distributions: dictionary(observableId: noise type). If nothing is passed for some observable id, a normal model is assumed as default. Either pass a noise type identifier, or a callable generating a custom noise string. :param verbose: verbosity level for logging, True/False default to logging.Error/logging.DEBUG :param assume_pow_positivity: if set to True, a special pow function is used to avoid problems with state variables that may become negative due to numerical errors :param compiler: distutils/setuptools compiler selection to build the python extension :param allow_reinit_fixpar_initcond: see :class:`amici.ode_export.ODEExporter` :param compile: If True, compile the generated Python package, if False, just generate code. :param compute_conservation_laws: if set to true, conservation laws are automatically computed and applied such that the state-jacobian of the ODE right-hand-side has full rank. This option should be set to True when using the newton algorithm to compute steadystate sensitivities. :param simplify: see :attr:`ODEModel._simplify` :param log_as_log10: If True, log in the SBML model will be parsed as `log10` (default), if False, log will be parsed as natural logarithm `ln` """ set_log_level(logger, verbose) if 'constantParameters' in kwargs: logger.warning('Use of `constantParameters` as argument name ' 'is deprecated and will be removed in a future ' 'version. Please use `constant_parameters` as ' 'argument name.') if constant_parameters is not None: raise ValueError('Cannot specify constant parameters using ' 'both `constantParameters` and ' '`constant_parameters` as argument names.') constant_parameters = kwargs.pop('constantParameters', []) elif constant_parameters is None: constant_parameters = [] constant_parameters = list(constant_parameters) if sigmas is None: sigmas = {} if noise_distributions is None: noise_distributions = {} if model_name is None: model_name = kwargs.pop('modelName', None) if model_name is None: raise ValueError('Missing argument: `model_name`') else: logger.warning('Use of `modelName` as argument name is ' 'deprecated and will be removed in a future' ' version. Please use `model_name` as ' 'argument name.') else: if 'modelName' in kwargs: raise ValueError('Cannot specify model name using both ' '`modelName` and `model_name` as argument ' 'names.') if len(kwargs): raise ValueError(f'Unknown arguments {kwargs.keys()}.') self._reset_symbols() self.sbml_parser_settings.setParseLog( sbml.L3P_PARSE_LOG_AS_LOG10 if log_as_log10 else sbml.L3P_PARSE_LOG_AS_LN ) self._process_sbml(constant_parameters) self._process_observables(observables, sigmas, noise_distributions) self._replace_compartments_with_volumes() self._clean_reserved_symbols() self._process_time() ode_model = ODEModel(verbose=verbose, simplify=simplify) ode_model.import_from_sbml_importer( self, compute_cls=compute_conservation_laws) exporter = ODEExporter( ode_model, outdir=output_dir, verbose=verbose, assume_pow_positivity=assume_pow_positivity, compiler=compiler, allow_reinit_fixpar_initcond=allow_reinit_fixpar_initcond ) exporter.set_name(model_name) exporter.set_paths(output_dir) exporter.generate_model_code() if compile: if not has_clibs: warnings.warn('AMICI C++ extensions have not been built. ' 'Generated model code, but unable to compile.') exporter.compile_model()
@log_execution_time('importing SBML', logger) def _process_sbml(self, constant_parameters: List[str] = None) -> None: """ Read parameters, species, reactions, and so on from SBML model :param constant_parameters: SBML Ids identifying constant parameters """ if constant_parameters is None: constant_parameters = [] self.check_support() self._gather_locals() self._process_parameters(constant_parameters) self._process_compartments() self._process_species() self._process_reactions() self._process_rules() self._process_initial_assignments() self._process_species_references()
[docs] def check_support(self) -> None: """ Check whether all required SBML features are supported. Also ensures that the SBML contains at least one reaction, or rate rule, or assignment rule, to produce change in the system over time. """ if hasattr(self.sbml, 'all_elements_from_plugins') \ and self.sbml.all_elements_from_plugins.getSize(): raise SBMLException('SBML extensions are currently not supported!') if self.sbml.getNumEvents(): raise SBMLException('Events are currently not supported!') if any([not rule.isAssignment() and not isinstance( self.sbml.getElementBySId(rule.getVariable()), (sbml.Compartment, sbml.Species, sbml.Parameter) ) for rule in self.sbml.getListOfRules()]): raise SBMLException('Algebraic rules are currently not supported, ' 'and rate rules are only supported for ' 'species, compartments, and parameters.') if any([not (rule.isAssignment() or rule.isRate()) and isinstance( self.sbml.getElementBySId(rule.getVariable()), (sbml.Compartment, sbml.Species, sbml.Parameter) ) for rule in self.sbml.getListOfRules()]): raise SBMLException('Only assignment and rate rules are ' 'currently supported for compartments, ' 'species, and parameters! Error ' f'occurred with rule: {rule.getVariable()}') if any([r.getFast() for r in self.sbml.getListOfReactions()]): raise SBMLException('Fast reactions are currently not supported!')
@log_execution_time('gathering local SBML symbols', logger) def _gather_locals(self) -> None: """ Populate self.local_symbols with all model entities. This is later used during sympifications to avoid sympy builtins shadowing model entities as well as to avoid possibly costly symbolic substitutions """ self._gather_base_locals() self._gather_dependent_locals() def _gather_base_locals(self): """ Populate self.local_symbols with pure symbol definitions that do not depend on any other symbol. """ special_symbols_and_funs = { # oo is sympy infinity 'INF': sp.oo, 'NaN': sp.nan, 'rem': sp.Mod, 'times': sp.Mul, 'time': symbol_with_assumptions('time'), # SBML L3 explicitly defines this value, which is not equal # to the most recent SI definition. 'avogadro': sp.Float(6.02214179e23), 'exponentiale': sp.E, } for s, v in special_symbols_and_funs.items(): self.add_local_symbol(s, v) species_references = _get_list_of_species_references(self.sbml) for c in itt.chain(self.sbml.getListOfSpecies(), self.sbml.getListOfParameters(), self.sbml.getListOfCompartments(), species_references): if not c.getId(): continue self.add_local_symbol(c.getId(), _get_identifier_symbol(c)) for r in self.sbml.getListOfRules(): if not isinstance(self.sbml.getElementBySId(r.getVariable()), sbml.SpeciesReference) \ or self._sympy_from_sbml_math(r) is None: continue self.add_local_symbol(r.getVariable(), symbol_with_assumptions(r.getVariable())) for r in self.sbml.getListOfReactions(): for e in itt.chain(r.getListOfReactants(), r.getListOfProducts()): if not (e.isSetId() and e.isSetStoichiometry()) or \ self.is_assignment_rule_target(e): continue self.add_local_symbol(e.getId(), sp.Float(e.getStoichiometry())) def _gather_dependent_locals(self): """ Populate self.local_symbols with symbol definitions that may depend on other symbol definitions. """ for r in self.sbml.getListOfReactions(): if not r.isSetId(): continue self.add_local_symbol( r.getId(), self._sympy_from_sbml_math(r.getKineticLaw()) )
[docs] def add_local_symbol(self, key: str, value: sp.Expr): """ Add local symbols with some sanity checking for duplication which would indicate redefinition of internals, which SBML permits, but we don't. :param key: local symbol key :param value: local symbol value """ if key in itt.chain(self._local_symbols.keys(), ['True', 'False', 'pi']): raise SBMLException( 'AMICI does not support SBML models containing ' f'variables with SId {key}.' ) self._local_symbols[key] = value
@log_execution_time('processing SBML compartments', logger) def _process_compartments(self) -> None: """ Get compartment information, stoichiometric matrix and fluxes from SBML model. """ compartments = self.sbml.getListOfCompartments() self.compartments = {} for comp in compartments: init = sp.Float(1.0) if comp.isSetVolume(): init = self._sympy_from_sbml_math(comp.getVolume()) ia_sym = self._get_element_initial_assignment(comp.getId()) if ia_sym is not None: init = ia_sym self.compartments[_get_identifier_symbol(comp)] = init @log_execution_time('processing SBML species', logger) def _process_species(self) -> None: """ Get species information from SBML model. """ if self.sbml.isSetConversionFactor(): conversion_factor = symbol_with_assumptions( self.sbml.getConversionFactor() ) else: conversion_factor = 1.0 for s in self.sbml.getListOfSpecies(): if self.is_assignment_rule_target(s): continue self.symbols[SymbolId.SPECIES][_get_identifier_symbol(s)] = { 'name': s.getName() if s.isSetName() else s.getId(), 'compartment': _get_species_compartment_symbol(s), 'constant': s.getConstant() or s.getBoundaryCondition(), 'amount': s.getHasOnlySubstanceUnits(), 'conversion_factor': symbol_with_assumptions( s.getConversionFactor() ) if s.isSetConversionFactor() else conversion_factor, 'index': len(self.symbols[SymbolId.SPECIES]), } self._process_species_initial() self._process_rate_rules() @log_execution_time('processing SBML species initials', logger) def _process_species_initial(self): """ Extract initial values and initial assignments from species """ for species_variable in self.sbml.getListOfSpecies(): initial = _get_species_initial(species_variable) species_id = _get_identifier_symbol(species_variable) # If species_id is a target of an AssignmentRule, species will be # None, but we don't have to account for the initial definition # of the species itself and SBML doesn't permit AssignmentRule # targets to have InitialAssignments. species = self.symbols[SymbolId.SPECIES].get(species_id, None) ia_initial = self._get_element_initial_assignment( species_variable.getId() ) if ia_initial is not None: if species and species['amount'] \ and 'compartment' in species: ia_initial *= self.compartments.get( species['compartment'], species['compartment'] ) initial = ia_initial if species: species['init'] = initial # don't assign this since they need to stay in order sorted_species = toposort_symbols(self.symbols[SymbolId.SPECIES], 'init') for species in self.symbols[SymbolId.SPECIES].values(): species['init'] = smart_subs_dict(species['init'], sorted_species, 'init') @log_execution_time('processing SBML rate rules', logger) def _process_rate_rules(self): """ Process assignment and rate rules for species, compartments and parameters. Compartments and parameters with rate rules are implemented as species. Note that, in the case of species, rate rules may describe the change in amount, not concentration, of a species. """ rules = self.sbml.getListOfRules() # compartments with rules are replaced with constants in the relevant # equations during the _replace_in_all_expressions call inside # _process_rules for rule in rules: if rule.getTypeCode() != sbml.SBML_RATE_RULE: continue variable = symbol_with_assumptions(rule.getVariable()) formula = self._sympy_from_sbml_math(rule) if formula is None: continue # Species rules are processed first, to avoid processing # compartments twice (as compartments with rate rules are # implemented as species). ia_init = self._get_element_initial_assignment(rule.getVariable()) if variable in self.symbols[SymbolId.SPECIES]: init = self.symbols[SymbolId.SPECIES][variable]['init'] name = None if variable in self.compartments: init = self.compartments[variable] name = str(variable) del self.compartments[variable] elif variable in self.symbols[SymbolId.PARAMETER]: init = self._sympy_from_sbml_math( self.symbols[SymbolId.PARAMETER][variable]['value'], ) name = self.symbols[SymbolId.PARAMETER][variable]['name'] del self.symbols[SymbolId.PARAMETER][variable] # parameter with initial assignment, cannot use # self.initial_assignments as it is not filled at this # point elif ia_init is not None: init = ia_init par = self.sbml.getElementBySId(rule.getVariable()) name = par.getName() if par.isSetName() else par.getId() self.add_d_dt(formula, variable, init, name)
[docs] def add_d_dt( self, d_dt: sp.Expr, variable: sp.Symbol, variable0: Union[float, sp.Expr], name: str, ) -> None: """ Creates or modifies species, to implement rate rules for compartments and species, respectively. :param d_dt: The rate rule (or, right-hand side of an ODE). :param variable: The subject of the rate rule. :param variable0: The initial value of the variable. :param name: Species name, only applicable if this function generates a new species """ if variable in self.symbols[SymbolId.SPECIES]: # only update dt if species was already generated self.symbols[SymbolId.SPECIES][variable]['dt'] = d_dt else: # update initial values for species_id, species in self.symbols[SymbolId.SPECIES].items(): variable0 = smart_subs(variable0, species_id, species['init']) for species in self.symbols[SymbolId.SPECIES].values(): species['init'] = smart_subs(species['init'], variable, variable0) # add compartment/parameter species self.symbols[SymbolId.SPECIES][variable] = { 'name': name, 'init': variable0, 'amount': False, 'conversion_factor': 1.0, 'constant': False, 'index': len(self.symbols[SymbolId.SPECIES]), 'dt': d_dt, }
@log_execution_time('processing SBML parameters', logger) def _process_parameters(self, constant_parameters: List[str] = None) -> None: """ Get parameter information from SBML model. :param constant_parameters: SBML Ids identifying constant parameters """ if constant_parameters is None: constant_parameters = [] # Ensure specified constant parameters exist in the model for parameter in constant_parameters: if not self.sbml.getParameter(parameter): raise KeyError('Cannot make %s a constant parameter: ' 'Parameter does not exist.' % parameter) fixed_parameters = [ parameter for parameter in self.sbml.getListOfParameters() if parameter.getId() in constant_parameters ] parameters = [ parameter for parameter in self.sbml.getListOfParameters() if parameter.getId() not in constant_parameters and self._get_element_initial_assignment(parameter.getId()) is None and not self.is_assignment_rule_target(parameter) ] loop_settings = { SymbolId.PARAMETER: {'var': parameters, 'name': 'parameter'}, SymbolId.FIXED_PARAMETER: {'var': fixed_parameters, 'name': 'fixed_parameter'} } for partype, settings in loop_settings.items(): for par in settings['var']: self.symbols[partype][_get_identifier_symbol(par)] = { 'name': par.getName() if par.isSetName() else par.getId(), 'value': par.getValue() } @log_execution_time('processing SBML reactions', logger) def _process_reactions(self): """ Get reactions from SBML model. """ reactions = self.sbml.getListOfReactions() # nr (number of reactions) should have a minimum length of 1. This is # to ensure that, if there are no reactions, the stoichiometric matrix # and flux vector multiply to a zero vector with dimensions (nx, 1). nr = max(1, len(reactions)) nx = len(self.symbols[SymbolId.SPECIES]) # stoichiometric matrix self.stoichiometric_matrix = sp.SparseMatrix(sp.zeros(nx, nr)) self.flux_vector = sp.zeros(nr, 1) reaction_ids = [ reaction.getId() for reaction in reactions if reaction.isSetId() ] for reaction_index, reaction in enumerate(reactions): for element_list, sign in [(reaction.getListOfReactants(), -1.0), (reaction.getListOfProducts(), 1.0)]: for element in element_list: stoichiometry = self._get_element_stoichiometry( element ) species_id = _get_identifier_symbol( self.sbml.getSpecies(element.getSpecies()) ) species = self.symbols[SymbolId.SPECIES][species_id] if species['constant']: continue # Division by species compartment size (to find the # rate of change in species concentration) now occurs # in the `dx_dt` method in "ode_export.py", which also # accounts for possibly variable compartments. self.stoichiometric_matrix[species['index'], reaction_index] += \ sign * stoichiometry * species['conversion_factor'] if reaction.isSetId(): sym_math = self._local_symbols[reaction.getId()] else: sym_math = self._sympy_from_sbml_math(reaction.getKineticLaw()) self.flux_vector[reaction_index] = sym_math if any([ str(symbol) in reaction_ids for symbol in self.flux_vector[reaction_index].free_symbols ]): raise SBMLException( 'Kinetic laws involving reaction ids are currently' ' not supported!' ) @log_execution_time('processing SBML rules', logger) def _process_rules(self) -> None: """ Process Rules defined in the SBML model. """ for rule in self.sbml.getListOfRules(): # rate rules are processed in _process_species if rule.getTypeCode() == sbml.SBML_RATE_RULE: continue sbml_var = self.sbml.getElementBySId(rule.getVariable()) sym_id = symbol_with_assumptions(rule.getVariable()) formula = self._sympy_from_sbml_math(rule) if formula is None: continue if isinstance(sbml_var, sbml.Species): self.species_assignment_rules[sym_id] = formula elif isinstance(sbml_var, sbml.Compartment): self.compartment_assignment_rules[sym_id] = formula self.compartments[sym_id] = formula elif isinstance(sbml_var, sbml.Parameter): self.parameter_assignment_rules[sym_id] = formula self.symbols[SymbolId.EXPRESSION][sym_id] = { 'name': str(sym_id), 'value': formula } self.symbols[SymbolId.EXPRESSION] = toposort_symbols( self.symbols[SymbolId.EXPRESSION], 'value' ) # expressions must not occur in definition of x0 for species in self.symbols[SymbolId.SPECIES].values(): species['init'] = self._make_initial( smart_subs_dict(species['init'], self.symbols[SymbolId.EXPRESSION], 'value') ) def _process_time(self) -> None: """ Convert time_symbol into cpp variable. """ sbml_time_symbol = symbol_with_assumptions('time') amici_time_symbol = symbol_with_assumptions('t') self.amici_time_symbol = amici_time_symbol self._replace_in_all_expressions(sbml_time_symbol, amici_time_symbol) @log_execution_time('processing SBML observables', logger) def _process_observables( self, observables: Optional[Dict[str, Dict[str, str]]] = None, sigmas: Optional[Dict[str, Union[str, float]]] = None, noise_distributions: Optional[Dict[str, str]] = None ) -> None: """ Perform symbolic computations required for observable and objective function evaluation. :param observables: dictionary(observableId: {'name':observableName (optional), 'formula':formulaString)}) to be added to the model :param sigmas: dictionary(observableId: sigma value or (existing) parameter name) :param noise_distributions: dictionary(observableId: noise type) See :py:func:`sbml2amici`. """ if sigmas is None or observables is None: sigmas = {} else: # Ensure no non-existing observableIds have been specified # (no problem here, but usually an upstream bug) unknown_ids = set(sigmas.keys()) - set(observables.keys()) if unknown_ids: raise ValueError( f"Sigma provided for unknown observableIds: " f"{unknown_ids}.") if noise_distributions is None or observables is None: noise_distributions = {} else: # Ensure no non-existing observableIds have been specified # (no problem here, but usually an upstream bug) unknown_ids = set(noise_distributions.keys()) - \ set(observables.keys()) if unknown_ids: raise ValueError( f"Noise distribution provided for unknown observableIds: " f"{unknown_ids}.") # add user-provided observables or make all species, and compartments # with assignment rules, observable if observables: self.symbols[SymbolId.OBSERVABLE] = { symbol_with_assumptions(obs): { 'name': definition.get('name', f'y{iobs}'), # Replace logX(.) by log(., X) since sympy cannot parse the # former. 'value': self._sympy_from_sbml_math( definition['formula'] ) } for iobs, (obs, definition) in enumerate(observables.items()) } elif observables is None: self._generate_default_observables() self._process_log_likelihood(sigmas, noise_distributions) def _generate_default_observables(self): """ Generate default observables from species, compartments and (initial) assignment rules. """ self.symbols[SymbolId.OBSERVABLE] = { symbol_with_assumptions(f'y{species_id}'): { 'name': specie['name'], 'value': species_id } for ix, (species_id, specie) in enumerate(self.symbols[SymbolId.SPECIES].items()) } for variable, formula in itt.chain( self.parameter_assignment_rules.items(), self.initial_assignments.items(), self.compartment_assignment_rules.items(), self.species_assignment_rules.items(), self.compartments.items() ): symbol = symbol_with_assumptions(f'y{variable}') # Assignment rules take precedence over compartment volume # definitions, so they need to be evaluated first. # Species assignment rules always overwrite. if symbol in self.symbols[SymbolId.OBSERVABLE] \ and variable not in self.species_assignment_rules: continue self.symbols[SymbolId.OBSERVABLE][symbol] = { 'name': str(variable), 'value': formula } def _process_log_likelihood(self, sigmas: Dict[str, Union[str, float]], noise_distributions: Dict[str, str]): """ Perform symbolic computations required for objective function evaluation. :param sigmas: See :py:func:`SBMLImporter._process_observables` :param noise_distributions: See :py:func:`SBMLImporter._process_observables` """ for obs_id, obs in self.symbols[SymbolId.OBSERVABLE].items(): obs['measurement_symbol'] = generate_measurement_symbol(obs_id) self.symbols[SymbolId.SIGMAY] = { symbol_with_assumptions(f'sigma_{obs_id}'): { 'name': f'sigma_{obs["name"]}', 'value': self._sympy_from_sbml_math( sigmas.get(str(obs_id), '1.0') ) } for obs_id, obs in self.symbols[SymbolId.OBSERVABLE].items() } self.symbols[SymbolId.LLHY] = {} for (obs_id, obs), (sigma_id, sigma) in zip( self.symbols[SymbolId.OBSERVABLE].items(), self.symbols[SymbolId.SIGMAY].items() ): symbol = symbol_with_assumptions(f'J{obs_id}') dist = noise_distributions.get(str(obs_id), 'normal') cost_fun = noise_distribution_to_cost_function(dist)(obs_id) value = sp.sympify(cost_fun, locals=dict(zip( _get_str_symbol_identifiers(obs_id), (obs_id, obs['measurement_symbol'], sigma_id) ))) self.symbols[SymbolId.LLHY][symbol] = { 'name': f'J{obs["name"]}', 'value': value, 'dist': dist, } @log_execution_time('processing SBML initial assignments', logger) def _process_initial_assignments(self): """ Accounts for initial assignments of parameters and species references. Initial assignments for species and compartments are processed in :py:func:`amici.SBMLImporter._process_initial_species` and :py:func:`amici.SBMLImporter._process_compartments` respectively. """ for ia in self.sbml.getListOfInitialAssignments(): identifier = _get_identifier_symbol(ia) if identifier in itt.chain(self.symbols[SymbolId.SPECIES], self.compartments): continue sym_math = self._get_element_initial_assignment(ia.getId()) if sym_math is None: continue sym_math = self._make_initial(smart_subs_dict( sym_math, self.symbols[SymbolId.EXPRESSION], 'value' )) self.initial_assignments[_get_identifier_symbol(ia)] = sym_math # sort and flatten self.initial_assignments = toposort_symbols(self.initial_assignments) for ia_id, ia in self.initial_assignments.items(): self.initial_assignments[ia_id] = smart_subs_dict( ia, self.initial_assignments ) for identifier, sym_math in list(self.initial_assignments.items()): self._replace_in_all_expressions(identifier, sym_math) @log_execution_time('processing SBML species references', logger) def _process_species_references(self): """ Replaces species references that define anything but stoichiometries. Species references for stoichiometries are processed in :py:func:`amici.SBMLImporter._process_reactions`. """ # doesnt look like there is a better way to get hold of those lists: species_references = _get_list_of_species_references(self.sbml) for species_reference in species_references: if hasattr(species_reference, 'getStoichiometryMath') and \ species_reference.getStoichiometryMath() is not None: raise SBMLException('StoichiometryMath is currently not ' 'supported for species references.') if species_reference.getId() == '': continue stoich = self._get_element_stoichiometry(species_reference) self._replace_in_all_expressions( _get_identifier_symbol(species_reference), self._sympy_from_sbml_math(stoich) ) def _make_initial(self, sym_math: Union[sp.Expr, None, float] ) -> Union[sp.Expr, None, float]: """ Transforms an expression to its value at the initial time point by replacing species by their initial values. :param sym_math: symbolic expression :return: transformed expression """ if not isinstance(sym_math, sp.Expr): return sym_math for species_id, species in self.symbols[SymbolId.SPECIES].items(): if 'init' in species: sym_math = smart_subs(sym_math, species_id, species['init']) sym_math = smart_subs(sym_math, self._local_symbols['time'], sp.Float(0)) return sym_math
[docs] def process_conservation_laws(self, ode_model, volume_updates) -> List: """ Find conservation laws in reactions and species. :param ode_model: ODEModel object with basic definitions :param volume_updates: List with updates for the stoichiometric matrix accounting for compartment volumes :returns volume_updates_solver: List (according to reduced stoichiometry) with updates for the stoichiometric matrix accounting for compartment volumes """ conservation_laws = [] # So far, only conservation laws for constant species are supported species_solver = _add_conservation_for_constant_species( ode_model, conservation_laws ) # Check, whether species_solver is empty now. As currently, AMICI # cannot handle ODEs without species, CLs must switched in this case if len(species_solver) == 0: conservation_laws = [] species_solver = list(range(ode_model.num_states_rdata())) # prune out species from stoichiometry and volume_updates_solver = self._reduce_stoichiometry(species_solver, volume_updates) # add the found CLs to the ode_model for cl in conservation_laws: ode_model.add_conservation_law(**cl) return volume_updates_solver
def _reduce_stoichiometry(self, species_solver, volume_updates) -> List: """ Reduces the stoichiometry with respect to conserved quantities :param species_solver: List of species indices which remain later in the ODE solver :param volume_updates: List with updates for the stoichiometric matrix accounting for compartment volumes :returns volume_updates_solver: List (according to reduced stoichiometry) with updates for the stoichiometric matrix accounting for compartment volumes """ # prune out constant species from stoichiometric matrix self.stoichiometric_matrix = \ self.stoichiometric_matrix[species_solver, :] # updates of stoichiometry (later dxdotdw in ode_exporter) must be # corrected for conserved quantities: volume_updates_solver = [(species_solver.index(ix), iw, val) for (ix, iw, val) in volume_updates if ix in species_solver] return volume_updates_solver def _replace_compartments_with_volumes(self): """ Replaces compartment symbols in expressions with their respective (possibly variable) volumes. """ for comp, vol in self.compartments.items(): if comp in self.symbols[SymbolId.SPECIES]: # for comps with rate rules volume is only initial for species in self.symbols[SymbolId.SPECIES].values(): if isinstance(species['init'], sp.Expr): species['init'] = smart_subs(species['init'], comp, vol) continue self._replace_in_all_expressions(comp, vol) def _replace_in_all_expressions(self, old: sp.Symbol, new: sp.Expr, replace_identifiers=False) -> None: """ Replace 'old' by 'new' in all symbolic expressions. :param old: symbolic variables to be replaced :param new: replacement symbolic variables """ fields = [ 'stoichiometric_matrix', 'flux_vector', ] for field in fields: if field in dir(self): self.__setattr__(field, smart_subs( self.__getattribute__(field), old, new )) dictfields = [ 'compartment_assignment_rules', 'parameter_assignment_rules', 'initial_assignments' ] for dictfield in dictfields: d = getattr(self, dictfield) # replace identifiers if old in d and replace_identifiers: d[new] = d[old] del d[old] if dictfield == 'initial_assignments': tmp_new = self._make_initial(new) else: tmp_new = new # replace values for k in d: d[k] = smart_subs(d[k], old, tmp_new) # replace in identifiers if replace_identifiers: for symbol in [SymbolId.EXPRESSION, SymbolId.SPECIES]: # completely recreate the dict to keep ordering consistent if old not in self.symbols[symbol]: continue self.symbols[symbol] = { smart_subs(k, old, new): v for k, v in self.symbols[symbol].items() } for symbol in [SymbolId.OBSERVABLE, SymbolId.LLHY, SymbolId.SIGMAY]: if old not in self.symbols[symbol]: continue self.symbols[symbol][new] = self.symbols[symbol][old] del self.symbols[symbol][old] # replace in values for symbol in [SymbolId.OBSERVABLE, SymbolId.LLHY, SymbolId.SIGMAY, SymbolId.EXPRESSION]: if not self.symbols.get(symbol, None): continue for element in self.symbols[symbol].values(): element['value'] = smart_subs(element['value'], old, new) if SymbolId.SPECIES in self.symbols: for species in self.symbols[SymbolId.SPECIES].values(): species['init'] = smart_subs(species['init'], old, self._make_initial(new)) fields = ['dt'] if replace_identifiers: fields.append('compartment') for field in ['dt']: if field in species: species[field] = smart_subs(species[field], old, new) # Initial compartment volume may also be specified with an assignment # rule (at the end of the _process_species method), hence needs to be # processed here too. self.compartments = {smart_subs(c, old, new) if replace_identifiers else c: smart_subs(v, old, self._make_initial(new)) for c, v in self.compartments.items()} def _clean_reserved_symbols(self) -> None: """ Remove all reserved symbols from self.symbols """ reserved_symbols = ['x', 'k', 'p', 'y', 'w', 'h', 't'] for sym in reserved_symbols: old_symbol = symbol_with_assumptions(sym) new_symbol = symbol_with_assumptions(f'amici_{sym}') self._replace_in_all_expressions(old_symbol, new_symbol, replace_identifiers=True) for symbols_ids, symbols in self.symbols.items(): if old_symbol in symbols: # reconstitute the whole dict in order to keep the ordering self.symbols[symbols_ids] = { new_symbol if k is old_symbol else k: v for k, v in symbols.items() } def _sympy_from_sbml_math(self, var_or_math: [sbml.SBase, str] ) -> Union[sp.Expr, float, None]: """ Sympify Math of SBML variables with all sanity checks and transformations :param var_or_math: SBML variable that has a getMath() function or math string :return: sympfified symbolic expression """ if isinstance(var_or_math, sbml.SBase): math_string = sbml.formulaToL3StringWithSettings( var_or_math.getMath(), self.sbml_parser_settings ) ele_name = var_or_math.element_name else: math_string = var_or_math ele_name = 'string' math_string = replace_logx(math_string) try: formula = sp.sympify(_parse_logical_operators( math_string ), locals=self._local_symbols) except (sp.SympifyError, TypeError, ZeroDivisionError) as err: raise SBMLException(f'{ele_name} "{math_string}" ' 'contains an unsupported expression: ' f'{err}.') if isinstance(formula, sp.Expr): formula = _parse_special_functions(formula) _check_unsupported_functions(formula, expression_type=ele_name) return formula def _get_element_initial_assignment(self, element_id: str) -> Union[sp.Expr, None]: """ Extract value of sbml variable according to its initial assignment :param element_id: sbml variable name :return: """ assignment = self.sbml.getInitialAssignment( element_id ) if assignment is None: return None sym = self._sympy_from_sbml_math(assignment) # this is an initial assignment so we need to use # initial conditions sym = self._make_initial(sym) return sym def _get_element_stoichiometry(self, ele: sbml.SBase) -> sp.Expr: """ Computes the stoichiometry of a reactant or product of a reaction :param ele: reactant or product :return: symbolic variable that defines stoichiometry """ if ele.isSetId(): sym = self._get_element_initial_assignment(ele.getId()) if sym is not None: return sym if self.is_assignment_rule_target(ele): return _get_identifier_symbol(ele) if ele.isSetStoichiometry(): return sp.Float(ele.getStoichiometry()) return sp.Float(1.0)
[docs] def is_assignment_rule_target(self, element: sbml.SBase) -> bool: """ Checks if an element has a valid assignment rule in the specified model. :param element: SBML variable :return: boolean indicating truth of function name """ a = self.sbml.getAssignmentRuleByVariable(element.getId()) if a is None or self._sympy_from_sbml_math(a) is None: return False return True
def _check_lib_sbml_errors(sbml_doc: sbml.SBMLDocument, show_warnings: bool = False) -> None: """ Checks the error log in the current self.sbml_doc. :param sbml_doc: SBML document :param show_warnings: display SBML warnings """ num_warning = sbml_doc.getNumErrors(sbml.LIBSBML_SEV_WARNING) num_error = sbml_doc.getNumErrors(sbml.LIBSBML_SEV_ERROR) num_fatal = sbml_doc.getNumErrors(sbml.LIBSBML_SEV_FATAL) if num_warning + num_error + num_fatal: for i_error in range(0, sbml_doc.getNumErrors()): error = sbml_doc.getError(i_error) # we ignore any info messages for now if error.getSeverity() >= sbml.LIBSBML_SEV_ERROR \ or (show_warnings and error.getSeverity() >= sbml.LIBSBML_SEV_WARNING): logger.error(f'libSBML {error.getCategoryAsString()} ' f'({error.getSeverityAsString()}):' f' {error.getMessage()}') if num_error + num_fatal: raise SBMLException( 'SBML Document failed to load (see error messages above)' ) def _check_unsupported_functions(sym: sp.Expr, expression_type: str, full_sym: Optional[sp.Expr] = None): """ Recursively checks the symbolic expression for unsupported symbolic functions :param sym: symbolic expressions :param expression_type: type of expression, only used when throwing errors :param full sym: outermost symbolic expression in recursive checks, only used for errors """ if full_sym is None: full_sym = sym unsupported_functions = ( sp.functions.factorial, sp.functions.ceiling, sp.functions.floor, sp.core.function.UndefinedFunction ) if isinstance(sym.func, unsupported_functions): raise SBMLException(f'Encountered unsupported expression ' f'"{sym.func}" of type ' f'"{type(sym.func)}" as part of a ' f'{expression_type}: "{full_sym}"!') for fun in list(sym._args) + [sym]: if isinstance(fun, unsupported_functions): raise SBMLException(f'Encountered unsupported expression ' f'"{fun}" of type ' f'"{type(fun)}" as part of a ' f'{expression_type}: "{full_sym}"!') if fun is not sym: _check_unsupported_functions(fun, expression_type) def _parse_special_functions(sym: sp.Expr, toplevel: bool = True) -> sp.Expr: """ Recursively checks the symbolic expression for functions which have be to parsed in a special way, such as piecewise functions :param sym: symbolic expressions :param toplevel: as this is called recursively, are we in the top level expression? """ args = tuple(_parse_special_functions(arg, False) for arg in sym._args) if sym.__class__.__name__ == 'abs': return sp.Abs(sym._args[0]) elif sym.__class__.__name__ == 'xor': return sp.Xor(*sym.args) elif sym.__class__.__name__ == 'piecewise': # how many condition-expression pairs will we have? return sp.Piecewise(*(( piece[0], sp.sympify(bool(piece[1])) if isinstance(piece[1], sp.Basic) and piece[1].is_number else piece[1] ) for piece in grouper(args, 2, True))) elif isinstance(sym, (sp.Function, sp.Mul, sp.Add)): sym._args = args elif toplevel and isinstance(sym, BooleanAtom): # Replace boolean constants by numbers so they can be differentiated # must not replace in Piecewise function. Therefore, we only replace # it the complete expression consists only of a Boolean value. sym = sp.Float(int(bool(sym))) return sym def _parse_logical_operators(math_str: Union[str, float, None] ) -> Union[str, float, None]: """ Parses a math string in order to replace logical operators by a form parsable for sympy :param math_str: str with mathematical expression :param math_str: parsed math_str """ if not isinstance(math_str, str): return math_str if ' xor(' in math_str or ' Xor(' in math_str: raise SBMLException('Xor is currently not supported as logical ' 'operation.') return (math_str.replace('&&', '&')).replace('||', '|')
[docs]def grouper(iterable: Iterable, n: int, fillvalue: Any = None) -> Iterable[Tuple[Any]]: """ Collect data into fixed-length chunks or blocks grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx" :param iterable: any iterable :param n: chunk length :param fillvalue: padding for last chunk if length < n :return: itertools.zip_longest of requested chunks """ args = [iter(iterable)] * n return itt.zip_longest(*args, fillvalue=fillvalue)
[docs]def assignmentRules2observables(sbml_model: sbml.Model, filter_function: Callable = lambda *_: True): """ Turn assignment rules into observables. :param sbml_model: Model to operate on :param filter_function: Callback function taking assignment variable as input and returning ``True``/``False`` to indicate if the respective rule should be turned into an observable. :return: A dictionary(observableId:{ 'name': observableName, 'formula': formulaString }) """ observables = {} for p in sbml_model.getListOfParameters(): parameter_id = p.getId() if filter_function(p): observables[parameter_id] = { 'name': p.getName() if p.isSetName() else p.getId(), 'formula': sbml_model.getAssignmentRuleByVariable( parameter_id ).getFormula() } for parameter_id in observables: sbml_model.removeRuleByVariable(parameter_id) sbml_model.removeParameter(parameter_id) return observables
[docs]def noise_distribution_to_cost_function( noise_distribution: str ) -> Callable[[str], str]: """ Parse noise distribution string to a cost function definition amici can work with. The noise distributions listed in the following are supported. :math:`m` denotes the measurement, :math:`y` the simulation, and :math:`\\sigma` a distribution scale parameter (currently, AMICI only supports a single distribution parameter). - `'normal'`, `'lin-normal'`: A normal distribution: .. math:: \\pi(m|y,\\sigma) = \\frac{1}{\\sqrt{2\\pi}\\sigma}\\ exp\\left(-\\frac{(m-y)^2}{2\\sigma^2}\\right) - `'log-normal'`: A log-normal distribution (i.e. log(m) is normally distributed): .. math:: \\pi(m|y,\\sigma) = \\frac{1}{\\sqrt{2\\pi}\\sigma m}\\ exp\\left(-\\frac{(\\log m - \\log y)^2}{2\\sigma^2}\\right) - `'log10-normal'`: A log10-normal distribution (i.e. log10(m) is normally distributed): .. math:: \\pi(m|y,\\sigma) = \\frac{1}{\\sqrt{2\\pi}\\sigma m \\log(10)}\\ exp\\left(-\\frac{(\\log_{10} m - \\log_{10} y)^2}{2\\sigma^2}\\right) - `'laplace'`, `'lin-laplace'`: A laplace distribution: .. math:: \\pi(m|y,\\sigma) = \\frac{1}{2\\sigma} \\exp\\left(-\\frac{|m-y|}{\\sigma}\\right) - `'log-laplace'`: A log-Laplace distribution (i.e. log(m) is Laplace distributed): .. math:: \\pi(m|y,\\sigma) = \\frac{1}{2\\sigma m} \\exp\\left(-\\frac{|\\log m - \\log y|}{\\sigma}\\right) - `'log10-laplace'`: A log10-Laplace distribution (i.e. log10(m) is Laplace distributed): .. math:: \\pi(m|y,\\sigma) = \\frac{1}{2\\sigma m \\log(10)} \\exp\\left(-\\frac{|\\log_{10} m - \\log_{10} y|}{\\sigma}\\right) - `'binomial'`, `'lin-binomial'`: A (continuation of a discrete) binomial distribution, parameterized via the success probability :math:`p=\\sigma`: .. math:: \\pi(m|y,\\sigma) = \\operatorname{Heaviside}(y-m) \\cdot \\frac{\\Gamma(y+1)}{\\Gamma(m+1) \\Gamma(y-m+1)} \\sigma^m (1-\\sigma)^{(y-m)} - `'negative-binomial'`, `'lin-negative-binomial'`: A (continuation of a discrete) negative binomial distribution, with with `mean = y`, parameterized via success probability `p`: .. math:: \\pi(m|y,\\sigma) = \\frac{\\Gamma(m+r)}{\\Gamma(m+1) \\Gamma(r)} (1-\\sigma)^m \\sigma^r where .. math:: r = \\frac{1-\\sigma}{\\sigma} y The distributions above are for a single data point. For a collection :math:`D=\\{m_i\\}_i` of data points and corresponding simulations :math:`Y=\\{y_i\\}_i` and noise parameters :math:`\\Sigma=\\{\\sigma_i\\}_i`, AMICI assumes independence, i.e. the full distributions is .. math:: \\pi(D|Y,\\Sigma) = \\prod_i\\pi(m_i|y_i,\\sigma_i) AMICI uses the logarithm :math:`\\log(\\pi(m|y,\\sigma)`. In addition to the above mentioned distributions, it is also possible to pass a function taking a symbol string and returning a log-distribution string with variables '{str_symbol}', 'm{str_symbol}', 'sigma{str_symbol}' for y, m, sigma, respectively. :param noise_distribution: An identifier specifying a noise model. Possible values are {`'normal'`, `'lin-normal'`, `'log-normal'`, `'log10-normal'`, `'laplace'`, `'lin-laplace'`, `'log-laplace'`, `'log10-laplace'`, `'binomial'`, `'lin-binomial'`, `'negative-binomial'`, `'lin-negative-binomial'`, `<Callable>`} For the meaning of the values see above. :return: A function that takes a strSymbol and then creates a cost function string (negative log-likelihood) from it, which can be sympified. """ if isinstance(noise_distribution, Callable): return noise_distribution if noise_distribution in ['normal', 'lin-normal']: y_string = '0.5*log(2*pi*{sigma}**2) + 0.5*(({y} - {m}) / {sigma})**2' elif noise_distribution == 'log-normal': y_string = '0.5*log(2*pi*{sigma}**2*{m}**2) ' \ '+ 0.5*((log({y}) - log({m})) / {sigma})**2' elif noise_distribution == 'log10-normal': y_string = '0.5*log(2*pi*{sigma}**2*{m}**2*log(10)**2) ' \ '+ 0.5*((log({y}, 10) - log({m}, 10)) / {sigma})**2' elif noise_distribution in ['laplace', 'lin-laplace']: y_string = 'log(2*{sigma}) + Abs({y} - {m}) / {sigma}' elif noise_distribution == 'log-laplace': y_string = 'log(2*{sigma}*{m}) + Abs(log({y}) - log({m})) / {sigma}' elif noise_distribution == 'log10-laplace': y_string = 'log(2*{sigma}*{m}*log(10)) ' \ '+ Abs(log({y}, 10) - log({m}, 10)) / {sigma}' elif noise_distribution in ['binomial', 'lin-binomial']: # Binomial noise model parameterized via success probability p y_string = '- log(Heaviside({y} - {m})) - loggamma({y}+1) ' \ '+ loggamma({m}+1) + loggamma({y}-{m}+1) ' \ '- {m} * log({sigma}) - ({y} - {m}) * log(1-{sigma})' elif noise_distribution in ['negative-binomial', 'lin-negative-binomial']: # Negative binomial noise model of the number of successes m # (data) before r=(1-sigma)/sigma * y failures occur, # with mean number of successes y (simulation), # parameterized via success probability p = sigma. r = '{y} * (1-{sigma}) / {sigma}' y_string = f'- loggamma({{m}}+{r}) + loggamma({{m}}+1) ' \ f'+ loggamma({r}) - {r} * log(1-{{sigma}}) ' \ f'- {{m}} * log({{sigma}})' else: raise ValueError( f"Cost identifier {noise_distribution} not recognized.") def nllh_y_string(str_symbol): y, m, sigma = _get_str_symbol_identifiers(str_symbol) return y_string.format(y=y, m=m, sigma=sigma) return nllh_y_string
def _get_str_symbol_identifiers(str_symbol: str) -> tuple: """Get identifiers for simulation, measurement, and sigma.""" y, m, sigma = f"{str_symbol}", f"m{str_symbol}", f"sigma{str_symbol}" return y, m, sigma def _add_conservation_for_constant_species( ode_model: ODEModel, conservation_laws: List[ConservationLaw] ) -> List[int]: """ Adds constant species to conservations laws :param ode_model: ODEModel object with basic definitions :param conservation_laws: List of already known conservation laws :returns species_solver: List of species indices which remain later in the ODE solver """ # decide which species to keep in stoichiometry species_solver = list(range(ode_model.num_states_rdata())) # iterate over species, find constant ones for ix in reversed(range(ode_model.num_states_rdata())): if ode_model.state_is_constant(ix): # dont use sym('x') here since conservation laws need to be # added before symbols are generated target_state = ode_model._states[ix].get_id() total_abundance = symbol_with_assumptions(f'tcl_{target_state}') conservation_laws.append({ 'state': target_state, 'total_abundance': total_abundance, 'state_expr': total_abundance, 'abundance_expr': target_state, }) # mark species to delete from stoichiometric matrix species_solver.pop(ix) return species_solver def _get_species_compartment_symbol(species: sbml.Species) -> sp.Symbol: """ Generate compartment symbol for the compartment of a specific species. This function will always return the same unique python object for a given species name. :param species: sbml species :return: compartment symbol """ return symbol_with_assumptions(species.getCompartment()) def _get_identifier_symbol(var: sbml.SBase) -> sp.Symbol: """ Generate identifier symbol for a sbml variable. This function will always return the same unique python object for a given entity. :param var: sbml variable :return: identifier symbol """ return symbol_with_assumptions(var.getId()) def _get_species_initial(species: sbml.Species) -> sp.Expr: """ Extract the initial concentration from a given species :param species: species index :return: initial species concentration """ if species.isSetInitialConcentration(): conc = species.getInitialConcentration() if species.getHasOnlySubstanceUnits(): return sp.Float(conc) * _get_species_compartment_symbol(species) else: return sp.Float(conc) if species.isSetInitialAmount(): amt = species.getInitialAmount() if math.isnan(amt): return sp.Float(0.0) if species.getHasOnlySubstanceUnits(): return sp.Float(amt) else: return sp.Float(amt) / _get_species_compartment_symbol(species) return sp.Float(0.0) def _get_list_of_species_references(sbml_model: sbml.Model) \ -> List[sbml.SpeciesReference]: """ Extracts list of species references as SBML doesn't provide a native function for this. :param sbml_model: SBML model instance :return: ListOfSpeciesReferences """ return [ reference for element in sbml_model.all_elements if isinstance(element, sbml.ListOfSpeciesReferences) for reference in element ]
[docs]def replace_logx(math_str: Union[str, float, None]) -> Union[str, float, None]: """ Replace logX(.) by log(., X) since sympy cannot parse the former :param math_str: string for sympification :return: sympifiable string """ if not isinstance(math_str, str): return math_str return re.sub( r'(^|\W)log(\d+)\(', r'\g<1>1/ln(\2)*ln(', math_str )
[docs]def smart_subs(element: sp.Expr, old: sp.Symbol, new: sp.Expr) -> sp.Expr: """ Optimized substitution that checks whether anything needs to be done first :param element: substitution target :param old: to be substituted :param new: subsitution value :return: substituted expression """ if old in element.free_symbols: return element.subs(old, new) return element
[docs]def toposort_symbols(symbols: SymbolDef, field: Optional[str] = None) -> SymbolDef: """ Topologically sort symbol definitions according to their interdependency :param symbols: symbol definitions :param field: field of definition.values() that is used to compute interdependency :return: ordered symbol definitions """ sorted_symbols = toposort({ identifier: { s for s in ( definition[field] if field is not None else definition ).free_symbols if s in symbols } for identifier, definition in symbols.items() }) return { s: symbols[s] for symbol_group in sorted_symbols for s in symbol_group }