Source code for amici.pysb_import

"""
PySB Import
------------
This module provides all necessary functionality to import a model specified
in the :class:`pysb.core.Model` format.
"""

from .ode_export import (
    ODEExporter, ODEModel, State, Constant, Parameter, Observable, SigmaY,
    Expression, LogLikelihood, generate_measurement_symbol
)

from .import_utils import (
    noise_distribution_to_cost_function, _get_str_symbol_identifiers
)
import logging
from .logging import get_logger, log_execution_time, set_log_level

import sympy as sp
import numpy as np
import itertools
import os
import sys

from typing import (
    List, Union, Dict, Tuple, Set, Iterable, Any, Callable, Optional
)

CL_Prototype = Dict[str, Dict[str, Any]]
ConservationLaw = Dict[str, Union[str, sp.Basic]]

import pysb.bng
import pysb
import pysb.pattern

logger = get_logger(__name__, logging.ERROR)


[docs]def pysb2amici( model: pysb.Model, output_dir: str = None, observables: List[str] = None, constant_parameters: List[str] = None, sigmas: Dict[str, str] = None, noise_distributions: Optional[Dict[str, Union[str, Callable]]] = None, verbose: Union[int, bool] = False, assume_pow_positivity: bool = False, compiler: str = None, compute_conservation_laws: bool = True, compile: bool = True, simplify: Callable = lambda x: sp.powsimp(x, deep=True), generate_sensitivity_code: bool = True, ): r""" Generate AMICI C++ files for the provided model. .. warning:: **PySB models with Compartments** When importing a PySB model with ``pysb.Compartment``\ s, BioNetGen scales reaction fluxes with the compartment size. Instead of using the respective symbols, the compartment size Parameter or Expression is evaluated when generating equations. This may lead to unexpected results if the compartment size parameter is changed for AMICI simulations. :param model: pysb model, :attr:`pysb.Model.name` will determine the name of the generated module :param output_dir: see :meth:`amici.ode_export.ODEExporter.set_paths` :param observables: list of :class:`pysb.core.Expression` or :class:`pysb.core.Observable` names in the provided model that should be mapped to observables :param sigmas: dict of :class:`pysb.core.Expression` names that should be mapped to sigmas :param noise_distributions: dict with names of observable Expressions as keys and a noise type identifier, or a callable generating a custom noise formula string (see :py:func:`amici.import_utils.noise_distribution_to_cost_function` ). If nothing is passed for some observable id, a normal model is assumed as default. :param constant_parameters: list of :class:`pysb.core.Parameter` names that should be mapped as fixed parameters :param verbose: verbosity level for logging, True/False default to :attr:`logging.DEBUG`/:attr:`logging.ERROR` :param assume_pow_positivity: if set to ``True``, a special pow function is used to avoid problems with state variables that may become negative due to numerical errors :param compiler: distutils/setuptools compiler selection to build the python extension :param compute_conservation_laws: if set to ``True``, conservation laws are automatically computed and applied such that the state-jacobian of the ODE right-hand-side has full rank. This option should be set to ``True`` when using the Newton algorithm to compute steadystates :param compile: If ``True``, build the python module for the generated model. If false, just generate the source code. :param simplify: see :attr:`amici.ODEModel._simplify` :param generate_sensitivity_code: if set to ``False``, code for sensitivity computation will not be generated """ if observables is None: observables = [] if constant_parameters is None: constant_parameters = [] if sigmas is None: sigmas = {} set_log_level(logger, verbose) ode_model = ode_model_from_pysb_importer( model, constant_parameters=constant_parameters, observables=observables, sigmas=sigmas, noise_distributions=noise_distributions, compute_conservation_laws=compute_conservation_laws, simplify=simplify, verbose=verbose, ) exporter = ODEExporter( ode_model, outdir=output_dir, verbose=verbose, assume_pow_positivity=assume_pow_positivity, compiler=compiler, generate_sensitivity_code=generate_sensitivity_code ) exporter.set_name(model.name) exporter.set_paths(output_dir) exporter.generate_model_code() if compile: exporter.compile_model()
[docs]@log_execution_time('creating ODE model', logger) def ode_model_from_pysb_importer( model: pysb.Model, constant_parameters: List[str] = None, observables: List[str] = None, sigmas: Dict[str, str] = None, noise_distributions: Optional[Dict[str, Union[str, Callable]]] = None, compute_conservation_laws: bool = True, simplify: Callable = sp.powsimp, verbose: Union[int, bool] = False, ) -> ODEModel: """ Creates an :class:`amici.ODEModel` instance from a :class:`pysb.Model` instance. :param model: see :func:`amici.pysb_import.pysb2amici` :param constant_parameters: see :func:`amici.pysb_import.pysb2amici` :param observables: see :func:`amici.pysb_import.pysb2amici` :param sigmas: dict with names of observable Expressions as keys and names of sigma Expressions as value sigma :param noise_distributions: see :func:`amici.pysb_import.pysb2amici` :param compute_conservation_laws: see :func:`amici.pysb_import.pysb2amici` :param simplify: see :attr:`amici.ODEModel._simplify` :param verbose: verbosity level for logging, True/False default to :attr:`logging.DEBUG`/:attr:`logging.ERROR` :return: New ODEModel instance according to pysbModel """ ode = ODEModel(verbose=verbose, simplify=simplify) if constant_parameters is None: constant_parameters = [] if observables is None: observables = [] if sigmas is None: sigmas = {} pysb.bng.generate_equations(model, verbose=verbose) _process_pysb_species(model, ode) _process_pysb_parameters(model, ode, constant_parameters) if compute_conservation_laws: _process_pysb_conservation_laws(model, ode) _process_pysb_observables(model, ode, observables, sigmas, noise_distributions) _process_pysb_expressions(model, ode, observables, sigmas, noise_distributions) ode._has_quadratic_nllh = not noise_distributions or all( noise_distr in ['normal', 'lin-normal'] for noise_distr in noise_distributions.values() ) ode.generate_basic_variables() return ode
@log_execution_time('processing PySB species', logger) def _process_pysb_species(pysb_model: pysb.Model, ode_model: ODEModel) -> None: """ Converts pysb Species into States and adds them to the ODEModel instance :param pysb_model: pysb model instance :param ode_model: ODEModel instance """ xdot = sp.Matrix(pysb_model.odes) for ix, specie in enumerate(pysb_model.species): init = sp.sympify('0.0') for ic in pysb_model.odes.model.initial_conditions: if pysb.pattern.match_complex_pattern(ic[0], specie, exact=True): # we don't want to allow expressions in initial conditions if ic[1] in pysb_model.expressions: init = pysb_model.expressions[ic[1].name].expand_expr() else: init = ic[1] ode_model.add_component( State( sp.Symbol(f'__s{ix}'), f'{specie}', init, xdot[ix] ) ) logger.debug(f'Finished Processing PySB species ') @log_execution_time('processing PySB parameters', logger) def _process_pysb_parameters(pysb_model: pysb.Model, ode_model: ODEModel, constant_parameters: List[str]) -> None: """ Converts pysb parameters into Parameters or Constants and adds them to the ODEModel instance :param pysb_model: pysb model :param constant_parameters: list of Parameters that should be constants :param ode_model: ODEModel instance """ for par in pysb_model.parameters: if par.name in constant_parameters: comp = Constant else: comp = Parameter ode_model.add_component( comp(par, f'{par.name}', par.value) ) @log_execution_time('processing PySB expressions', logger) def _process_pysb_expressions( pysb_model: pysb.Model, ode_model: ODEModel, observables: List[str], sigmas: Dict[str, str], noise_distributions: Optional[Dict[str, Union[str, Callable]]] = None, ) -> None: r""" Converts pysb expressions/observables into Observables (with corresponding standard deviation SigmaY and LogLikelihood) or Expressions and adds them to the ODEModel instance :param pysb_model: pysb model :param observables: list of names of :class`pysb.Expression`\ s or :class:`pysb.Observable`\ s that are to be mapped to ODEModel observables :param sigmas: dict with names of observable pysb.Expressions/pysb.Observables names as keys and names of sigma pysb.Expressions as values :param noise_distributions: see :func:`amici.pysb_import.pysb2amici` :param ode_model: ODEModel instance """ # we no longer expand expressions here. pysb/bng guarantees that # they are ordered according to their dependency and we can # evaluate them sequentially without reordering. Important to make # sure that observables are processed first though. for expr in pysb_model.expressions: _add_expression(expr, expr.name, expr.expr, pysb_model, ode_model, observables, sigmas, noise_distributions) def _add_expression( sym: sp.Symbol, name: str, expr: sp.Basic, pysb_model: pysb.Model, ode_model: ODEModel, observables: List[str], sigmas: Dict[str, str], noise_distributions: Optional[Dict[str, Union[str, Callable]]] = None, ): """ Adds expressions to the ODE model given and adds observables/sigmas if appropriate :param sym: symbol how the expression is referenced in the model :param name: name of the expression :param expr: symbolic expression that the symbol refers to :param pysb_model: see :py:func:`_process_pysb_expressions` :param observables: see :py:func:`_process_pysb_expressions` :param sigmas: see :py:func:`_process_pysb_expressions` :param noise_distributions: see :py:func:`amici.pysb_import.pysb2amici` :param ode_model: see :py:func:`_process_pysb_expressions` """ ode_model.add_component( Expression(sym, name, expr) ) if name in observables: y = sp.Symbol(f'{name}') obs = Observable(y, name, sym) ode_model.add_component(obs) sigma_name, sigma_value = _get_sigma_name_and_value( pysb_model, name, sigmas ) sigma = sp.Symbol(sigma_name) ode_model.add_component(SigmaY(sigma, f'{sigma_name}', sigma_value)) noise_dist = noise_distributions.get(name, 'normal') \ if noise_distributions else 'normal' cost_fun_str = noise_distribution_to_cost_function(noise_dist)(name) my = generate_measurement_symbol(obs.get_id()) cost_fun_expr = sp.sympify(cost_fun_str, locals=dict(zip( _get_str_symbol_identifiers(name), (y, my, sigma)))) ode_model.add_component( LogLikelihood( sp.Symbol(f'llh_{name}'), f'llh_{name}', cost_fun_expr ) ) def _get_sigma_name_and_value( pysb_model: pysb.Model, obs_name: str, sigmas: Dict[str, str]) -> Tuple[str, sp.Basic]: """ Tries to extract standard deviation symbolic identifier and formula for a given observable name from the pysb model and if no specification is available sets default values :param pysb_model: pysb model :param obs_name: name of the observable :param sigmas: dict of :class:`pysb.core.Expression` names that should be mapped to sigmas :return: tuple containing symbolic identifier and formula for the specified observable """ if obs_name in sigmas: sigma_name = sigmas[obs_name] try: # find corresponding Expression instance sigma_expr = next(x for x in pysb_model.expressions if x.name == sigma_name) except StopIteration: raise ValueError(f'value of sigma {obs_name} is not a ' f'valid expression.') sigma_value = sigma_expr.expand_expr() else: sigma_name = f'sigma_{obs_name}' sigma_value = sp.sympify(1.0) return sigma_name, sigma_value @log_execution_time('processing PySB observables', logger) def _process_pysb_observables( pysb_model: pysb.Model, ode_model: ODEModel, observables: List[str], sigmas: Dict[str, str], noise_distributions: Optional[Dict[str, Union[str, Callable]]] = None, ) -> None: """ Converts :class:`pysb.core.Observable` into :class:`ODEModel.Expressions` and adds them to the ODEModel instance :param pysb_model: pysb model :param ode_model: ODEModel instance :param observables: list of names of pysb.Expressions or pysb.Observables that are to be mapped to ODEModel observables :param sigmas: dict with names of observable pysb.Expressions/pysb.Observables names as keys and names of sigma pysb.Expressions as values :param noise_distributions: see :func:`amici.pysb_import.pysb2amici` """ # only add those pysb observables that occur in the added # Observables as expressions for obs in pysb_model.observables: _add_expression(obs, obs.name, obs.expand_obs(), pysb_model, ode_model, observables, sigmas, noise_distributions) @log_execution_time('computing PySB conservation laws', logger) def _process_pysb_conservation_laws(pysb_model: pysb.Model, ode_model: ODEModel) -> None: """ Removes species according to conservation laws to ensure that the jacobian has full rank :param pysb_model: pysb model :param ode_model: ODEModel instance """ monomers_without_conservation_law = set() for rule in pysb_model.rules: monomers_without_conservation_law |= \ _get_unconserved_monomers(rule, pysb_model) monomers_without_conservation_law |= \ _compute_monomers_with_fixed_initial_conditions(pysb_model) cl_prototypes = _generate_cl_prototypes( monomers_without_conservation_law, pysb_model, ode_model ) conservation_laws = _construct_conservation_from_prototypes( cl_prototypes, pysb_model ) _add_conservation_for_constant_species(ode_model, conservation_laws) _flatten_conservation_laws(conservation_laws) for cl in conservation_laws: ode_model.add_conservation_law(**cl) def _compute_monomers_with_fixed_initial_conditions( pysb_model: pysb.Model) -> Set[str]: """ Computes the set of monomers in a model with species that have fixed initial conditions :param pysb_model: pysb model :return: set of monomer names with fixed initial conditions """ monomers_with_fixed_initial_conditions = set() for monomer in pysb_model.monomers: # check if monomer has an initial condition that is fixed (means # that corresponding state is constant and all conservation # laws are broken) if any([ ic.fixed # true or false for ic in pysb_model.initials if monomer.name in extract_monomers(ic.pattern) ]): monomers_with_fixed_initial_conditions |= {monomer.name} return monomers_with_fixed_initial_conditions def _generate_cl_prototypes(excluded_monomers: Iterable[str], pysb_model: pysb.Model, ode_model: ODEModel) -> CL_Prototype: """ Constructs a dict that contains preprocessed information for the construction of conservation laws :param excluded_monomers: list of monomer names for which no prototypes should be computed :param pysb_model: pysb model :param ode_model: ODEModel instance :return: dict('monomer.name':{'possible_indices': ..., 'target_indices': ...} """ cl_prototypes = dict() _compute_possible_indices(cl_prototypes, pysb_model, ode_model, excluded_monomers) _compute_dependency_idx(cl_prototypes) _compute_target_index(cl_prototypes, ode_model) return cl_prototypes def _compute_possible_indices(cl_prototypes: CL_Prototype, pysb_model: pysb.Model, ode_model: ODEModel, excluded_monomers: Iterable[str]) -> None: """ Computes viable choices for target_index, ie species that could be removed and replaced by an algebraic expression according to the conservation law :param cl_prototypes: dict in which possible indices will be written :param pysb_model: pysb model :param ode_model: ODEModel instance :param excluded_monomers: monomers for which no conservation laws will be computed """ for monomer in pysb_model.monomers: if monomer.name not in excluded_monomers: compartments = [ str(mp.compartment) # string based comparison as # compartments are not hashable for cp in pysb_model.species for mp in cp.monomer_patterns if mp.monomer.name == monomer.name ] if len(set(compartments)) > 1: raise ValueError('Conservation laws involving species in ' 'multiple compartments are currently not ' 'supported! Please run pysb2amici with ' 'compute_conservation_laws=False') # TODO: implement this, multiply species by the volume of # their respective compartment and allow total_cl to depend # on parameters + constants and update the respective symbolic # derivative accordingly prototype = dict() prototype['possible_indices'] = [ ix for ix, specie in enumerate(pysb_model.species) if monomer.name in extract_monomers(specie) and not ode_model.state_is_constant(ix) ] prototype['species_count'] = len( prototype['possible_indices'] ) if prototype['possible_indices']: cl_prototypes[monomer.name] = prototype def _compute_dependency_idx(cl_prototypes: CL_Prototype) -> None: """ Compute connecting species, this allows us to efficiently compute whether the respective conservation law would induce a cyclic dependency. Adds a 'dependency_idx' field to the prototype dict that itself is a dict where keys correspond to indexes that, when used as target index yield dependencies on conservation laws of monomers in the respective values :param cl_prototypes: dict in which possible indices will be written """ # for monomer_i, prototype_i in cl_prototypes.items(): if 'dependency_idx' not in prototype_i: prototype_i['dependency_idx'] = dict() for monomer_j, prototype_j in cl_prototypes.items(): if monomer_i == monomer_j: continue if 'dependency_idx' not in prototype_j: prototype_j['dependency_idx'] = dict() idx_overlap = set(prototype_i['possible_indices']).intersection( set(prototype_j['possible_indices']) ) if len(idx_overlap) == 0: continue for idx in idx_overlap: if idx not in prototype_i['dependency_idx']: prototype_i['dependency_idx'][idx] = set() if idx not in prototype_j['dependency_idx']: prototype_j['dependency_idx'][idx] = set() prototype_i['dependency_idx'][idx] |= {monomer_j} prototype_j['dependency_idx'][idx] |= {monomer_i} def _compute_target_index(cl_prototypes: CL_Prototype, ode_model: ODEModel) -> None: """ Computes the target index for every monomer :param cl_prototypes: dict that contains possible indices for every monomer :param ode_model: ODEModel instance """ possible_indices = list(set(list(itertools.chain(*[ cl_prototypes[monomer]['possible_indices'] for monomer in cl_prototypes ])))) # Note: currently this function is supposed to also count appearances in # expressions. However, expressions are currently still empty as they # are also populated from conservation laws. In case there are many # state heavy expressions in the model (should not be the case for mass # action kinetics). This may lead to suboptimal results and could improved. # As this would require substantial code shuffling, this will only be # fixed if this becomes an actual problem appearance_counts = ode_model.get_appearance_counts(possible_indices) # in this initial guess we ignore the cost of having cyclic dependencies # between conservation laws for monomer in cl_prototypes: prototype = cl_prototypes[monomer] # extract monomer specific appearance counts prototype['appearance_counts'] = \ [ appearance_counts[possible_indices.index(idx)] for idx in prototype['possible_indices'] ] # select target index as possible index with minimal appearance count if len(prototype['appearance_counts']) == 0: raise RuntimeError(f'Failed to compute conservation law for ' f'monomer {monomer}') idx = np.argmin(prototype['appearance_counts']) # remove entries from possible indices and appearance counts so we # do not consider them again in later iterations prototype['target_index'] = prototype['possible_indices'].pop(idx) prototype['appearance_count'] = prototype['appearance_counts'].pop(idx) # this is only an approximation as the effective species count # of other conservation laws may also be affected by the chosen # target index. As long as the number of unique monomers in # multimers has a low upper bound and the species count does not # vary too much across conservation laws, this approximation # should be fine prototype['fillin'] = \ prototype['appearance_count'] * prototype['species_count'] # we might end up with the same index for multiple monomers, so loop until # we have a set of unique target indices while not _cl_prototypes_are_valid(cl_prototypes): _greedy_target_index_update(cl_prototypes) def _cl_prototypes_are_valid(cl_prototypes: CL_Prototype) -> bool: """ Checks consistency of cl_prototypes by asserting that target indices are unique and there are no cyclic dependencies :param cl_prototypes: dict that contains dependency and target indexes for every monomer """ # target indices are unique if len(cl_prototypes) != len(set(_get_target_indices(cl_prototypes))): return False # conservation law dependencies are cycle free if any( _cl_has_cycle(monomer, cl_prototypes) for monomer in cl_prototypes ): return False return True def _cl_has_cycle(monomer: str, cl_prototypes: CL_Prototype) -> bool: """ Checks whether monomer has a conservation law that is part of a cyclic dependency :param monomer: name of monomer for which conservation law is to be checked :param cl_prototypes: dict that contains dependency and target indexes for every monomer :return: boolean indicating whether the conservation law is cyclic """ prototype = cl_prototypes[monomer] if prototype['target_index'] not in prototype['dependency_idx']: return False visited = [monomer] root = monomer return any( _is_in_cycle( connecting_monomer, cl_prototypes, visited, root ) for connecting_monomer in prototype['dependency_idx'][ prototype['target_index'] ] ) def _is_in_cycle(monomer: str, cl_prototypes: CL_Prototype, visited: List[str], root: str) -> bool: """ Recursively checks for cycles in conservation law dependencies via Depth First Search :param monomer: current location in cl dependency graph :param cl_prototypes: dict that contains dependency and target indexes for every monomer :param visited: history of visited monomers with conservation laws :param root: monomer at which the cycle search was started :return: boolean indicating whether the specified monomer is part of a cyclic conservation law """ if monomer == root: return True # we found a cycle and root is part of it if monomer in visited: return False # we found a cycle but root is not part of it visited.append(monomer) prototype = cl_prototypes[monomer] if prototype['target_index'] not in prototype['dependency_idx']: return False return any( _is_in_cycle( connecting_monomer, cl_prototypes, visited, root ) for connecting_monomer in prototype['dependency_idx'][ prototype['target_index'] ] ) def _greedy_target_index_update(cl_prototypes: CL_Prototype) -> None: """ Computes unique target indices for conservation laws from possible indices such that expected fill in in symbolic derivatives is minimized :param cl_prototypes: dict that contains possible indices and non-unique target indices for every monomer """ target_indices = _get_target_indices(cl_prototypes) for monomer, prototype in cl_prototypes.items(): if target_indices.count(prototype['target_index']) > 1 or \ _cl_has_cycle(monomer, cl_prototypes): # compute how much fillin the next best target_index would yield # we exclude already existing target indices to avoid that # updating the target index removes uniqueness from already unique # target indices, this may slightly reduce chances of finding a # solution but prevents infinite loops for target_index in list(set(target_indices)): try: local_idx = prototype['possible_indices'].index( target_index ) except ValueError: local_idx = None if local_idx: del prototype['possible_indices'][local_idx] del prototype['appearance_counts'][local_idx] if len(prototype['possible_indices']) == 0: prototype['diff_fillin'] = -1 continue idx = np.argmin(prototype['appearance_counts']) prototype['local_index'] = idx prototype['alternate_target_index'] = \ prototype['possible_indices'][idx] prototype['alternate_appearance_count'] = \ prototype['appearance_counts'][idx] prototype['alternate_fillin'] = \ prototype['alternate_appearance_count'] \ * prototype['species_count'] prototype['diff_fillin'] = \ prototype['alternate_fillin'] - prototype['fillin'] else: prototype['diff_fillin'] = -1 if all( prototype['diff_fillin'] == -1 for prototype in cl_prototypes.values() ): raise RuntimeError('Could not compute a valid set of conservation ' 'laws for this model!') # this puts prototypes with high diff_fillin last cl_prototypes = sorted( cl_prototypes.items(), key=lambda kv: kv[1]['diff_fillin'] ) cl_prototypes = { proto[0]: proto[1] for proto in cl_prototypes } for monomer in cl_prototypes: prototype = cl_prototypes[monomer] # we check that we # A) have an alternative index computed, i.e. that # that monomer originally had a non-unique target_index # B) that the target_index still is not unique or part of a cyclic # dependency. due to the sorting, this will always be the monomer # with the highest diff_fillin (note that the target index counts # are recomputed on the fly) if prototype['diff_fillin'] > -1 \ and ( _get_target_indices(cl_prototypes).count( prototype['target_index'] ) > 1 or _cl_has_cycle(monomer, cl_prototypes) ): prototype['fillin'] = prototype['alternate_fillin'] prototype['target_index'] = prototype['alternate_target_index'] prototype['appearance_count'] = \ prototype['alternate_appearance_count'] del prototype['possible_indices'][prototype['local_index']] del prototype['appearance_counts'][prototype['local_index']] def _get_target_indices( cl_prototypes: CL_Prototype) -> List[List[int]]: """ Computes the list target indices for the current conservation law prototype :param cl_prototypes: dict that contains target indices for every monomer :return: List of lists of target indices """ return [ prototype['target_index'] for prototype in cl_prototypes.values() ] def _construct_conservation_from_prototypes( cl_prototypes: CL_Prototype, pysb_model: pysb.Model ) -> List[ConservationLaw]: """ Computes the algebraic expression for the total amount of a given monomer :param cl_prototypes: see return of :func:`_generate_cl_prototypes` :param pysb_model: pysb model :return: list of dicts describing conservation laws """ conservation_laws = [] for monomer_name in cl_prototypes: target_index = cl_prototypes[monomer_name]['target_index'] # T = sum_i(a_i * x_i) # x_j = (T - sum_i≠j(a_i * x_i))/a_j # law: sum_i≠j(a_i * x_i))/a_j # state: x_j target_expression = sum( sp.Symbol(f'__s{ix}') * extract_monomers(specie).count(monomer_name) for ix, specie in enumerate(pysb_model.species) if ix != target_index ) / extract_monomers(pysb_model.species[ target_index ]).count(monomer_name) # normalize by the stoichiometry of the target species target_state = sp.Symbol(f'__s{target_index}') # = x_j total_abundance = sp.Symbol(f'tcl__s{target_index}') # = T/a_j state_expr = total_abundance - target_expression # x_j = T/a_j - sum_i≠j(a_i * x_i)/a_j abundance_expr = target_expression + target_state # T/a_j = sum_i≠j(a_i * x_i)/a_j + x_j conservation_laws.append({ 'state': target_state, 'total_abundance': total_abundance, 'state_expr': state_expr, 'abundance_expr': abundance_expr, }) return conservation_laws def _add_conservation_for_constant_species( ode_model: ODEModel, conservation_laws: List[ConservationLaw] ) -> None: """ Computes the algebraic expression for the total amount of a given monomer :param ode_model: ODEModel instance to which the conservation laws will be added :param conservation_laws: see return of :func:`_construct_conservation_from_prototypes` """ for ix in range(ode_model.num_states_rdata()): if ode_model.state_is_constant(ix): target_state = sp.Symbol(f'__s{ix}') total_abundance = sp.Symbol(f'tcl__s{ix}') conservation_laws.append({ 'state': target_state, 'total_abundance': total_abundance, 'state_expr': total_abundance, 'abundance_expr': target_state, }) def _flatten_conservation_laws( conservation_laws: List[ConservationLaw]) -> None: """ Flatten the conservation laws such that the state_expr not longer depend on any states that are replaced by conservation laws :param conservation_laws: see return of :func:`_construct_conservation_from_prototypes` """ conservation_law_subs = \ _get_conservation_law_subs(conservation_laws) while len(conservation_law_subs): for cl in conservation_laws: if _sub_matches_cl( conservation_law_subs, cl['state_expr'], cl['state'] ): # this optimization is done by subs anyways, but we dont # want to recompute the subs if we did not change anything valid_subs = _select_valid_cls( conservation_law_subs, cl['state'] ) if len(valid_subs) > 0: cl['state_expr'] = cl['state_expr'].subs(valid_subs) conservation_law_subs = \ _get_conservation_law_subs(conservation_laws) def _select_valid_cls(subs: Iterable[Tuple[sp.Symbol, sp.Basic]], state: sp.Symbol) -> List[Tuple[sp.Symbol, sp.Basic]]: """ Subselect substitutions such that we do not end up with conservation laws that are self-referential :param subs: substitutions in tuple format :param state: target symbolic state to which substitutions will be applied :return: list of valid substitutions """ return [ sub for sub in subs if str(state) not in [str(symbol) for symbol in sub[1].free_symbols] ] def _sub_matches_cl(subs: Iterable[Tuple[sp.Symbol, sp.Basic]], state_expr: sp.Basic, state: sp.Basic) -> bool: """ Checks whether any of the substitutions in subs will be applied to state_expr :param subs: substitutions in tuple format :param state_expr: target symbolic expressions in which substitutions will be applied :param state: target symbolic state to which substitutions will be applied :return: boolean indicating positive match """ sub_symbols = set( sub[0] for sub in subs if str(state) not in [ str(symbol) for symbol in sub[1].free_symbols ] ) return len(sub_symbols.intersection(state_expr.free_symbols)) > 0 def _get_conservation_law_subs( conservation_laws: List[ConservationLaw] ) -> List[Tuple[sp.Symbol, sp.Basic]]: """ Computes a list of (state, law) tuples for conservation laws that still appear in other conservation laws :param conservation_laws: see return of :func:`_flatten_conservation_laws` :return: list of tuples containing substitution rules to be used with sympy subs """ free_symbols_cl = _conservation_law_variables(conservation_laws) return [ (cl['state'], cl['state_expr']) for cl in conservation_laws if cl['state'] in free_symbols_cl ] def _conservation_law_variables( conservation_laws: List[ConservationLaw]) -> Set[sp.Symbol]: """ Construct the set of all free variables from a list of conservation laws :param conservation_laws: list of conservation laws :return: free variables in conservation laws """ variables = set() for cl in conservation_laws: variables |= cl['state_expr'].free_symbols return variables
[docs]def has_fixed_parameter_ic(specie: pysb.core.ComplexPattern, pysb_model: pysb.Model, ode_model: ODEModel) -> bool: """ Wrapper to interface :meth:`ode_export.ODEModel.state_has_fixed_parameter_initial_condition` from a pysb specie/model arguments :param specie: pysb species :param pysb_model: pysb model :param ode_model: ODE model :return: ``False`` if the species does not have an initial condition at all. Otherwise the return value of :meth:`ode_export.ODEModel.state_has_fixed_parameter_initial_condition` """ # ComplexPatterns are not hashable, so we have to compare by string ic_index = next( ( ic for ic, condition in enumerate(pysb_model.initials) if pysb.pattern.match_complex_pattern(condition[0], specie, exact=True) ), None ) if ic_index is None: return False else: return ode_model.state_has_fixed_parameter_initial_condition( ic_index )
[docs]def extract_monomers( complex_patterns: Union[pysb.ComplexPattern, List[pysb.ComplexPattern]] ) -> List[str]: """ Constructs a list of monomer names contained in complex patterns. Multiplicity of names corresponds to the stoichiometry in the complex. :param complex_patterns: (list of) complex pattern(s) :return: list of monomer names """ if not isinstance(complex_patterns, list): complex_patterns = [complex_patterns] return [ mp.monomer.name for cp in complex_patterns if cp is not None for mp in cp.monomer_patterns ]
def _get_unconserved_monomers(rule: pysb.Rule, pysb_model: pysb.Model) -> Set[str]: """ Constructs the set of monomer names for which the specified rule changes the stoichiometry of the monomer in the specified model. :param rule: the pysb rule :param pysb_model: pysb model :return: set of monomer names for which the stoichiometry is not conserved """ unconserved_monomers = set() if not rule.delete_molecules \ and len(rule.product_pattern.complex_patterns) == 0: # if delete_molecules is not True but we have a degradation rule, # we have to actually go through the reactions that are created by # the rule for reaction in [r for r in pysb_model.reactions if rule.name in r['rule']]: unconserved_monomers |= _get_changed_stoichiometries( [pysb_model.species[ix] for ix in reaction['reactants']], [pysb_model.species[ix] for ix in reaction['products']] ) else: # otherwise we can simply extract all information for the rule # itself, which is computationally much more efficient unconserved_monomers |= _get_changed_stoichiometries( rule.reactant_pattern.complex_patterns, rule.product_pattern.complex_patterns ) return unconserved_monomers def _get_changed_stoichiometries( reactants: Union[pysb.ComplexPattern, List[pysb.ComplexPattern]], products: Union[pysb.ComplexPattern, List[pysb.ComplexPattern]] ) -> Set[str]: """ Constructs the set of monomer names which have different stoichiometries in reactants and products. :param reactants: (list of) complex pattern(s) :param products: (list of) complex pattern(s) :returns: set of monomer name for which the stoichiometry changed """ changed_stoichiometries = set() reactant_monomers = extract_monomers( reactants ) product_monomers = extract_monomers( products ) for monomer in set(reactant_monomers + product_monomers): if reactant_monomers.count(monomer) != product_monomers.count(monomer): changed_stoichiometries.add(monomer) return changed_stoichiometries
[docs]def pysb_model_from_path(pysb_model_file: str) -> pysb.Model: """Load a pysb model module and return the :class:`pysb.Model` instance :param pysb_model_file: Full or relative path to the PySB model module :return: The pysb Model instance """ pysb_model_module_name = \ os.path.splitext(os.path.split(pysb_model_file)[-1])[0] import importlib.util spec = importlib.util.spec_from_file_location( pysb_model_module_name, pysb_model_file) module = importlib.util.module_from_spec(spec) sys.modules[pysb_model_module_name] = module spec.loader.exec_module(module) return module.model