"""
PySB Import
------------
This module provides all necessary functionality to import a model specified
in the :class:`pysb.core.Model` format.
"""
from .ode_export import (
ODEExporter, ODEModel, State, Constant, Parameter, Observable, SigmaY,
Expression, LogLikelihood, generate_measurement_symbol
)
from .import_utils import (
noise_distribution_to_cost_function, _get_str_symbol_identifiers
)
import logging
from .logging import get_logger, log_execution_time, set_log_level
import sympy as sp
import numpy as np
import itertools
import os
import sys
from typing import (
List, Union, Dict, Tuple, Set, Iterable, Any, Callable, Optional
)
CL_Prototype = Dict[str, Dict[str, Any]]
ConservationLaw = Dict[str, Union[str, sp.Basic]]
import pysb.bng
import pysb
import pysb.pattern
logger = get_logger(__name__, logging.ERROR)
[docs]def pysb2amici(
model: pysb.Model,
output_dir: str = None,
observables: List[str] = None,
constant_parameters: List[str] = None,
sigmas: Dict[str, str] = None,
noise_distributions: Optional[Dict[str, Union[str, Callable]]] = None,
verbose: Union[int, bool] = False,
assume_pow_positivity: bool = False,
compiler: str = None,
compute_conservation_laws: bool = True,
compile: bool = True,
simplify: Callable = lambda x: sp.powsimp(x, deep=True),
):
"""
Generate AMICI C++ files for the provided model.
:param model:
pysb model, :attr:`pysb.Model.name` will determine the name of the
generated module
:param output_dir:
see :meth:`amici.ode_export.ODEExporter.set_paths`
:param observables:
list of :class:`pysb.core.Expression` or :class:`pysb.core.Observable`
names in the provided model that should be mapped to observables
:param sigmas:
dict of :class:`pysb.core.Expression` names that should be mapped to
sigmas
:param noise_distributions:
dict with names of observable Expressions as keys and a noise type
identifier, or a callable generating a custom noise formula string
(see :py:func:`amici.import_utils.noise_distribution_to_cost_function`
). If nothing is passed for some observable id, a normal model is
assumed as default.
:param constant_parameters:
list of :class:`pysb.core.Parameter` names that should be mapped as
fixed parameters
:param verbose: verbosity level for logging, True/False default to
:attr:`logging.DEBUG`/:attr:`logging.ERROR`
:param assume_pow_positivity:
if set to ``true``, a special pow function is used to avoid problems
with state variables that may become negative due to numerical
errors
:param compiler:
distutils/setuptools compiler selection to build the python
extension
:param compute_conservation_laws:
if set to ``true``, conservation laws are automatically computed and
applied such that the state-jacobian of the ODE right-hand-side has
full rank. This option should be set to ``true`` when using the Newton
algorithm to compute steadystates
:param compile:
If ``true``, build the python module for the generated model. If false,
just generate the source code.
:param simplify:
see :attr:`amici.ODEModel._simplify`
"""
if observables is None:
observables = []
if constant_parameters is None:
constant_parameters = []
if sigmas is None:
sigmas = {}
set_log_level(logger, verbose)
ode_model = ode_model_from_pysb_importer(
model, constant_parameters=constant_parameters,
observables=observables, sigmas=sigmas,
noise_distributions=noise_distributions,
compute_conservation_laws=compute_conservation_laws,
simplify=simplify,
verbose=verbose,
)
exporter = ODEExporter(
ode_model,
outdir=output_dir,
verbose=verbose,
assume_pow_positivity=assume_pow_positivity,
compiler=compiler,
)
exporter.set_name(model.name)
exporter.set_paths(output_dir)
exporter.generate_model_code()
if compile:
exporter.compile_model()
[docs]@log_execution_time('creating ODE model', logger)
def ode_model_from_pysb_importer(
model: pysb.Model,
constant_parameters: List[str] = None,
observables: List[str] = None,
sigmas: Dict[str, str] = None,
noise_distributions: Optional[Dict[str, Union[str, Callable]]] = None,
compute_conservation_laws: bool = True,
simplify: Callable = sp.powsimp,
verbose: Union[int, bool] = False,
) -> ODEModel:
"""
Creates an :class:`amici.ODEModel` instance from a :class:`pysb.Model`
instance.
:param model:
see :func:`amici.pysb_import.pysb2amici`
:param constant_parameters:
see :func:`amici.pysb_import.pysb2amici`
:param observables:
see :func:`amici.pysb_import.pysb2amici`
:param sigmas:
dict with names of observable Expressions as keys and names of sigma
Expressions as value sigma
:param noise_distributions:
see :func:`amici.pysb_import.pysb2amici`
:param compute_conservation_laws:
see :func:`amici.pysb_import.pysb2amici`
:param simplify:
see :attr:`amici.ODEModel._simplify`
:param verbose: verbosity level for logging, True/False default to
:attr:`logging.DEBUG`/:attr:`logging.ERROR`
:return:
New ODEModel instance according to pysbModel
"""
ode = ODEModel(verbose=verbose, simplify=simplify)
if constant_parameters is None:
constant_parameters = []
if observables is None:
observables = []
if sigmas is None:
sigmas = {}
pysb.bng.generate_equations(model, verbose=verbose)
_process_pysb_species(model, ode)
_process_pysb_parameters(model, ode, constant_parameters)
if compute_conservation_laws:
_process_pysb_conservation_laws(model, ode)
_process_pysb_observables(model, ode, observables, sigmas,
noise_distributions)
_process_pysb_expressions(model, ode, observables, sigmas,
noise_distributions)
ode._has_quadratic_nllh = not noise_distributions or all(
noise_distr in ['normal', 'lin-normal']
for noise_distr in noise_distributions.values()
)
ode.generate_basic_variables()
return ode
@log_execution_time('processing PySB species', logger)
def _process_pysb_species(pysb_model: pysb.Model,
ode_model: ODEModel) -> None:
"""
Converts pysb Species into States and adds them to the ODEModel instance
:param pysb_model:
pysb model instance
:param ode_model:
ODEModel instance
"""
xdot = sp.Matrix(pysb_model.odes)
for ix, specie in enumerate(pysb_model.species):
init = sp.sympify('0.0')
for ic in pysb_model.odes.model.initial_conditions:
if pysb.pattern.match_complex_pattern(ic[0], specie, exact=True):
# we don't want to allow expressions in initial conditions
if ic[1] in pysb_model.expressions:
init = pysb_model.expressions[ic[1].name].expand_expr()
else:
init = ic[1]
ode_model.add_component(
State(
sp.Symbol(f'__s{ix}'),
f'{specie}',
init,
xdot[ix]
)
)
logger.debug(f'Finished Processing PySB species ')
@log_execution_time('processing PySB parameters', logger)
def _process_pysb_parameters(pysb_model: pysb.Model,
ode_model: ODEModel,
constant_parameters: List[str]) -> None:
"""
Converts pysb parameters into Parameters or Constants and adds them to
the ODEModel instance
:param pysb_model:
pysb model
:param constant_parameters:
list of Parameters that should be constants
:param ode_model:
ODEModel instance
"""
for par in pysb_model.parameters:
if par.name in constant_parameters:
comp = Constant
else:
comp = Parameter
ode_model.add_component(
comp(par, f'{par.name}', par.value)
)
@log_execution_time('processing PySB expressions', logger)
def _process_pysb_expressions(
pysb_model: pysb.Model,
ode_model: ODEModel,
observables: List[str],
sigmas: Dict[str, str],
noise_distributions: Optional[Dict[str, Union[str, Callable]]] = None,
) -> None:
"""
Converts pysb expressions/observables into Observables (with
corresponding standard deviation SigmaY and LogLikelihood) or
Expressions and adds them to the ODEModel instance
:param pysb_model:
pysb model
:param observables:
list of names of :class`pysb.Expression`\\ s or
:class:`pysb.Observable`\\ s that are to be mapped to ODEModel
observables
:param sigmas:
dict with names of observable pysb.Expressions/pysb.Observables
names as keys and names of sigma pysb.Expressions as values
:param noise_distributions:
see :func:`amici.pysb_import.pysb2amici`
:param ode_model:
ODEModel instance
"""
# we no longer expand expressions here. pysb/bng guarantees that
# they are ordered according to their dependency and we can
# evaluate them sequentially without reordering. Important to make
# sure that observables are processed first though.
for expr in pysb_model.expressions:
_add_expression(expr, expr.name, expr.expr,
pysb_model, ode_model, observables, sigmas,
noise_distributions)
def _add_expression(
sym: sp.Symbol,
name: str,
expr: sp.Basic,
pysb_model: pysb.Model,
ode_model: ODEModel,
observables: List[str],
sigmas: Dict[str, str],
noise_distributions: Optional[Dict[str, Union[str, Callable]]] = None,
):
"""
Adds expressions to the ODE model given and adds observables/sigmas if
appropriate
:param sym:
symbol how the expression is referenced in the model
:param name:
name of the expression
:param expr:
symbolic expression that the symbol refers to
:param pysb_model:
see :py:func:`_process_pysb_expressions`
:param observables:
see :py:func:`_process_pysb_expressions`
:param sigmas:
see :py:func:`_process_pysb_expressions`
:param noise_distributions:
see :py:func:`amici.pysb_import.pysb2amici`
:param ode_model:
see :py:func:`_process_pysb_expressions`
"""
ode_model.add_component(
Expression(sym, name, expr)
)
if name in observables:
y = sp.Symbol(f'{name}')
obs = Observable(y, name, sym)
ode_model.add_component(obs)
sigma_name, sigma_value = _get_sigma_name_and_value(
pysb_model, name, sigmas
)
sigma = sp.Symbol(sigma_name)
ode_model.add_component(SigmaY(sigma, f'{sigma_name}', sigma_value))
noise_dist = noise_distributions.get(name, 'normal') \
if noise_distributions else 'normal'
cost_fun_str = noise_distribution_to_cost_function(noise_dist)(name)
my = generate_measurement_symbol(obs.get_id())
cost_fun_expr = sp.sympify(cost_fun_str,
locals=dict(zip(
_get_str_symbol_identifiers(name),
(y, my, sigma))))
ode_model.add_component(
LogLikelihood(
sp.Symbol(f'llh_{name}'),
f'llh_{name}',
cost_fun_expr
)
)
def _get_sigma_name_and_value(
pysb_model: pysb.Model,
obs_name: str,
sigmas: Dict[str, str]) -> Tuple[str, sp.Basic]:
"""
Tries to extract standard deviation symbolic identifier and formula
for a given observable name from the pysb model and if no specification is
available sets default values
:param pysb_model:
pysb model
:param obs_name:
name of the observable
:param sigmas:
dict of :class:`pysb.core.Expression` names that should be mapped to
sigmas
:return:
tuple containing symbolic identifier and formula for the specified
observable
"""
if obs_name in sigmas:
sigma_name = sigmas[obs_name]
try:
# find corresponding Expression instance
sigma_expr = next(x for x in pysb_model.expressions
if x.name == sigma_name)
except StopIteration:
raise ValueError(f'value of sigma {obs_name} is not a '
f'valid expression.')
sigma_value = sigma_expr.expand_expr()
else:
sigma_name = f'sigma_{obs_name}'
sigma_value = sp.sympify(1.0)
return sigma_name, sigma_value
@log_execution_time('processing PySB observables', logger)
def _process_pysb_observables(
pysb_model: pysb.Model,
ode_model: ODEModel,
observables: List[str],
sigmas: Dict[str, str],
noise_distributions: Optional[Dict[str, Union[str, Callable]]] = None,
) -> None:
"""
Converts :class:`pysb.core.Observable` into
:class:`ODEModel.Expressions` and adds them to the ODEModel instance
:param pysb_model:
pysb model
:param ode_model:
ODEModel instance
:param observables:
list of names of pysb.Expressions or pysb.Observables that are to be
mapped to ODEModel observables
:param sigmas:
dict with names of observable pysb.Expressions/pysb.Observables
names as keys and names of sigma pysb.Expressions as values
:param noise_distributions:
see :func:`amici.pysb_import.pysb2amici`
"""
# only add those pysb observables that occur in the added
# Observables as expressions
for obs in pysb_model.observables:
_add_expression(obs, obs.name, obs.expand_obs(),
pysb_model, ode_model, observables, sigmas,
noise_distributions)
@log_execution_time('computing PySB conservation laws', logger)
def _process_pysb_conservation_laws(pysb_model: pysb.Model,
ode_model: ODEModel) -> None:
"""
Removes species according to conservation laws to ensure that the
jacobian has full rank
:param pysb_model:
pysb model
:param ode_model:
ODEModel instance
"""
monomers_without_conservation_law = set()
for rule in pysb_model.rules:
monomers_without_conservation_law |= \
_get_unconserved_monomers(rule, pysb_model)
monomers_without_conservation_law |= \
_compute_monomers_with_fixed_initial_conditions(pysb_model)
cl_prototypes = _generate_cl_prototypes(
monomers_without_conservation_law, pysb_model, ode_model
)
conservation_laws = _construct_conservation_from_prototypes(
cl_prototypes, pysb_model
)
_add_conservation_for_constant_species(ode_model, conservation_laws)
_flatten_conservation_laws(conservation_laws)
for cl in conservation_laws:
ode_model.add_conservation_law(**cl)
def _compute_monomers_with_fixed_initial_conditions(
pysb_model: pysb.Model) -> Set[str]:
"""
Computes the set of monomers in a model with species that have fixed
initial conditions
:param pysb_model: pysb model
:return:
set of monomer names with fixed initial conditions
"""
monomers_with_fixed_initial_conditions = set()
for monomer in pysb_model.monomers:
# check if monomer has an initial condition that is fixed (means
# that corresponding state is constant and all conservation
# laws are broken)
if any([
ic.fixed # true or false
for ic in pysb_model.initials
if monomer.name in extract_monomers(ic.pattern)
]):
monomers_with_fixed_initial_conditions |= {monomer.name}
return monomers_with_fixed_initial_conditions
def _generate_cl_prototypes(excluded_monomers: Iterable[str],
pysb_model: pysb.Model,
ode_model: ODEModel) -> CL_Prototype:
"""
Constructs a dict that contains preprocessed information for the
construction of conservation laws
:param excluded_monomers:
list of monomer names for which no prototypes
should be computed
:param pysb_model:
pysb model
:param ode_model:
ODEModel instance
:return:
dict('monomer.name':{'possible_indices': ..., 'target_indices': ...}
"""
cl_prototypes = dict()
_compute_possible_indices(cl_prototypes, pysb_model, ode_model,
excluded_monomers)
_compute_dependency_idx(cl_prototypes)
_compute_target_index(cl_prototypes, ode_model)
return cl_prototypes
def _compute_possible_indices(cl_prototypes: CL_Prototype,
pysb_model: pysb.Model,
ode_model: ODEModel,
excluded_monomers: Iterable[str]) -> None:
"""
Computes viable choices for target_index, ie species that could be
removed and replaced by an algebraic expression according to the
conservation law
:param cl_prototypes:
dict in which possible indices will be written
:param pysb_model:
pysb model
:param ode_model:
ODEModel instance
:param excluded_monomers:
monomers for which no conservation laws will be
computed
"""
for monomer in pysb_model.monomers:
if monomer.name not in excluded_monomers:
compartments = [
str(mp.compartment) # string based comparison as
# compartments are not hashable
for cp in pysb_model.species
for mp in cp.monomer_patterns
if mp.monomer.name == monomer.name
]
if len(set(compartments)) > 1:
raise ValueError('Conservation laws involving species in '
'multiple compartments are currently not '
'supported! Please run pysb2amici with '
'compute_conservation_laws=False')
# TODO: implement this, multiply species by the volume of
# their respective compartment and allow total_cl to depend
# on parameters + constants and update the respective symbolic
# derivative accordingly
prototype = dict()
prototype['possible_indices'] = [
ix
for ix, specie in enumerate(pysb_model.species)
if monomer.name in extract_monomers(specie)
and not ode_model.state_is_constant(ix)
]
prototype['species_count'] = len(
prototype['possible_indices']
)
if prototype['possible_indices']:
cl_prototypes[monomer.name] = prototype
def _compute_dependency_idx(cl_prototypes: CL_Prototype) -> None:
"""
Compute connecting species, this allows us to efficiently compute
whether the respective conservation law would induce a cyclic dependency.
Adds a 'dependency_idx' field to the prototype dict that
itself is a dict where keys correspond to indexes that, when used as
target index yield dependencies on conservation laws of monomers in
the respective values
:param cl_prototypes:
dict in which possible indices will be written
"""
#
for monomer_i, prototype_i in cl_prototypes.items():
if 'dependency_idx' not in prototype_i:
prototype_i['dependency_idx'] = dict()
for monomer_j, prototype_j in cl_prototypes.items():
if monomer_i == monomer_j:
continue
if 'dependency_idx' not in prototype_j:
prototype_j['dependency_idx'] = dict()
idx_overlap = set(prototype_i['possible_indices']).intersection(
set(prototype_j['possible_indices'])
)
if len(idx_overlap) == 0:
continue
for idx in idx_overlap:
if idx not in prototype_i['dependency_idx']:
prototype_i['dependency_idx'][idx] = set()
if idx not in prototype_j['dependency_idx']:
prototype_j['dependency_idx'][idx] = set()
prototype_i['dependency_idx'][idx] |= {monomer_j}
prototype_j['dependency_idx'][idx] |= {monomer_i}
def _compute_target_index(cl_prototypes: CL_Prototype,
ode_model: ODEModel) -> None:
"""
Computes the target index for every monomer
:param cl_prototypes:
dict that contains possible indices for every monomer
:param ode_model:
ODEModel instance
"""
possible_indices = list(set(list(itertools.chain(*[
cl_prototypes[monomer]['possible_indices']
for monomer in cl_prototypes
]))))
# Note: currently this function is supposed to also count appearances in
# expressions. However, expressions are currently still empty as they
# are also populated from conservation laws. In case there are many
# state heavy expressions in the model (should not be the case for mass
# action kinetics). This may lead to suboptimal results and could improved.
# As this would require substantial code shuffling, this will only be
# fixed if this becomes an actual problem
appearance_counts = ode_model.get_appearance_counts(possible_indices)
# in this initial guess we ignore the cost of having cyclic dependencies
# between conservation laws
for monomer in cl_prototypes:
prototype = cl_prototypes[monomer]
# extract monomer specific appearance counts
prototype['appearance_counts'] = \
[
appearance_counts[possible_indices.index(idx)]
for idx in prototype['possible_indices']
]
# select target index as possible index with minimal appearance count
if len(prototype['appearance_counts']) == 0:
raise RuntimeError(f'Failed to compute conservation law for '
f'monomer {monomer}')
idx = np.argmin(prototype['appearance_counts'])
# remove entries from possible indices and appearance counts so we
# do not consider them again in later iterations
prototype['target_index'] = prototype['possible_indices'].pop(idx)
prototype['appearance_count'] = prototype['appearance_counts'].pop(idx)
# this is only an approximation as the effective species count
# of other conservation laws may also be affected by the chosen
# target index. As long as the number of unique monomers in
# multimers has a low upper bound and the species count does not
# vary too much across conservation laws, this approximation
# should be fine
prototype['fillin'] = \
prototype['appearance_count'] * prototype['species_count']
# we might end up with the same index for multiple monomers, so loop until
# we have a set of unique target indices
while not _cl_prototypes_are_valid(cl_prototypes):
_greedy_target_index_update(cl_prototypes)
def _cl_prototypes_are_valid(cl_prototypes: CL_Prototype) -> bool:
"""
Checks consistency of cl_prototypes by asserting that target indices
are unique and there are no cyclic dependencies
:param cl_prototypes:
dict that contains dependency and target indexes for
every monomer
"""
# target indices are unique
if len(cl_prototypes) != len(set(_get_target_indices(cl_prototypes))):
return False
# conservation law dependencies are cycle free
if any(
_cl_has_cycle(monomer, cl_prototypes)
for monomer in cl_prototypes
):
return False
return True
def _cl_has_cycle(monomer: str, cl_prototypes: CL_Prototype) -> bool:
"""
Checks whether monomer has a conservation law that is part of a
cyclic dependency
:param monomer:
name of monomer for which conservation law is to be checked
:param cl_prototypes:
dict that contains dependency and target indexes for every monomer
:return:
boolean indicating whether the conservation law is cyclic
"""
prototype = cl_prototypes[monomer]
if prototype['target_index'] not in prototype['dependency_idx']:
return False
visited = [monomer]
root = monomer
return any(
_is_in_cycle(
connecting_monomer,
cl_prototypes,
visited,
root
)
for connecting_monomer in prototype['dependency_idx'][
prototype['target_index']
]
)
def _is_in_cycle(monomer: str,
cl_prototypes: CL_Prototype,
visited: List[str],
root: str) -> bool:
"""
Recursively checks for cycles in conservation law dependencies via
Depth First Search
:param monomer:
current location in cl dependency graph
:param cl_prototypes:
dict that contains dependency and target indexes for
every monomer
:param visited:
history of visited monomers with conservation laws
:param root:
monomer at which the cycle search was started
:return:
boolean indicating whether the specified monomer is part of a cyclic
conservation law
"""
if monomer == root:
return True # we found a cycle and root is part of it
if monomer in visited:
return False # we found a cycle but root is not part of it
visited.append(monomer)
prototype = cl_prototypes[monomer]
if prototype['target_index'] not in prototype['dependency_idx']:
return False
return any(
_is_in_cycle(
connecting_monomer,
cl_prototypes,
visited,
root
)
for connecting_monomer in prototype['dependency_idx'][
prototype['target_index']
]
)
def _greedy_target_index_update(cl_prototypes: CL_Prototype) -> None:
"""
Computes unique target indices for conservation laws from possible
indices such that expected fill in in symbolic derivatives is minimized
:param cl_prototypes:
dict that contains possible indices and non-unique target indices
for every monomer
"""
target_indices = _get_target_indices(cl_prototypes)
for monomer, prototype in cl_prototypes.items():
if target_indices.count(prototype['target_index']) > 1 or \
_cl_has_cycle(monomer, cl_prototypes):
# compute how much fillin the next best target_index would yield
# we exclude already existing target indices to avoid that
# updating the target index removes uniqueness from already unique
# target indices, this may slightly reduce chances of finding a
# solution but prevents infinite loops
for target_index in list(set(target_indices)):
try:
local_idx = prototype['possible_indices'].index(
target_index
)
except ValueError:
local_idx = None
if local_idx:
del prototype['possible_indices'][local_idx]
del prototype['appearance_counts'][local_idx]
if len(prototype['possible_indices']) == 0:
prototype['diff_fillin'] = -1
continue
idx = np.argmin(prototype['appearance_counts'])
prototype['local_index'] = idx
prototype['alternate_target_index'] = \
prototype['possible_indices'][idx]
prototype['alternate_appearance_count'] = \
prototype['appearance_counts'][idx]
prototype['alternate_fillin'] = \
prototype['alternate_appearance_count'] \
* prototype['species_count']
prototype['diff_fillin'] = \
prototype['alternate_fillin'] - prototype['fillin']
else:
prototype['diff_fillin'] = -1
if all(
prototype['diff_fillin'] == -1
for prototype in cl_prototypes.values()
):
raise RuntimeError('Could not compute a valid set of conservation '
'laws for this model!')
# this puts prototypes with high diff_fillin last
cl_prototypes = sorted(
cl_prototypes.items(), key=lambda kv: kv[1]['diff_fillin']
)
cl_prototypes = {
proto[0]: proto[1]
for proto in cl_prototypes
}
for monomer in cl_prototypes:
prototype = cl_prototypes[monomer]
# we check that we
# A) have an alternative index computed, i.e. that
# that monomer originally had a non-unique target_index
# B) that the target_index still is not unique or part of a cyclic
# dependency. due to the sorting, this will always be the monomer
# with the highest diff_fillin (note that the target index counts
# are recomputed on the fly)
if prototype['diff_fillin'] > -1 \
and (
_get_target_indices(cl_prototypes).count(
prototype['target_index']
) > 1
or _cl_has_cycle(monomer, cl_prototypes)
):
prototype['fillin'] = prototype['alternate_fillin']
prototype['target_index'] = prototype['alternate_target_index']
prototype['appearance_count'] = \
prototype['alternate_appearance_count']
del prototype['possible_indices'][prototype['local_index']]
del prototype['appearance_counts'][prototype['local_index']]
def _get_target_indices(
cl_prototypes: CL_Prototype) -> List[List[int]]:
"""
Computes the list target indices for the current
conservation law prototype
:param cl_prototypes:
dict that contains target indices for every monomer
:return:
List of lists of target indices
"""
return [
prototype['target_index'] for prototype in cl_prototypes.values()
]
def _construct_conservation_from_prototypes(
cl_prototypes: CL_Prototype,
pysb_model: pysb.Model
) -> List[ConservationLaw]:
"""
Computes the algebraic expression for the total amount of a given
monomer
:param cl_prototypes:
see return of :func:`_generate_cl_prototypes`
:param pysb_model:
pysb model
:return:
list of dicts describing conservation laws
"""
conservation_laws = []
for monomer_name in cl_prototypes:
target_index = cl_prototypes[monomer_name]['target_index']
# T = sum_i(a_i * x_i)
# x_j = (T - sum_i≠j(a_i * x_i))/a_j
# law: sum_i≠j(a_i * x_i))/a_j
# state: x_j
target_expression = sum(
sp.Symbol(f'__s{ix}')
* extract_monomers(specie).count(monomer_name)
for ix, specie in enumerate(pysb_model.species)
if ix != target_index
) / extract_monomers(pysb_model.species[
target_index
]).count(monomer_name)
# normalize by the stoichiometry of the target species
target_state = sp.Symbol(f'__s{target_index}')
# = x_j
total_abundance = sp.Symbol(f'tcl__s{target_index}')
# = T/a_j
state_expr = total_abundance - target_expression
# x_j = T/a_j - sum_i≠j(a_i * x_i)/a_j
abundance_expr = target_expression + target_state
# T/a_j = sum_i≠j(a_i * x_i)/a_j + x_j
conservation_laws.append({
'state': target_state,
'total_abundance': total_abundance,
'state_expr': state_expr,
'abundance_expr': abundance_expr,
})
return conservation_laws
def _add_conservation_for_constant_species(
ode_model: ODEModel,
conservation_laws: List[ConservationLaw]
) -> None:
"""
Computes the algebraic expression for the total amount of a given
monomer
:param ode_model:
ODEModel instance to which the conservation laws will be added
:param conservation_laws:
see return of :func:`_construct_conservation_from_prototypes`
"""
for ix in range(ode_model.num_states_rdata()):
if ode_model.state_is_constant(ix):
target_state = sp.Symbol(f'__s{ix}')
total_abundance = sp.Symbol(f'tcl__s{ix}')
conservation_laws.append({
'state': target_state,
'total_abundance': total_abundance,
'state_expr': total_abundance,
'abundance_expr': target_state,
})
def _flatten_conservation_laws(
conservation_laws: List[ConservationLaw]) -> None:
"""
Flatten the conservation laws such that the state_expr not longer
depend on any states that are replaced by conservation laws
:param conservation_laws:
see return of :func:`_construct_conservation_from_prototypes`
"""
conservation_law_subs = \
_get_conservation_law_subs(conservation_laws)
while len(conservation_law_subs):
for cl in conservation_laws:
if _sub_matches_cl(
conservation_law_subs,
cl['state_expr'],
cl['state']
):
# this optimization is done by subs anyways, but we dont
# want to recompute the subs if we did not change anything
valid_subs = _select_valid_cls(
conservation_law_subs,
cl['state']
)
if len(valid_subs) > 0:
cl['state_expr'] = cl['state_expr'].subs(valid_subs)
conservation_law_subs = \
_get_conservation_law_subs(conservation_laws)
def _select_valid_cls(subs: Iterable[Tuple[sp.Symbol, sp.Basic]],
state: sp.Symbol) -> List[Tuple[sp.Symbol, sp.Basic]]:
"""
Subselect substitutions such that we do not end up with conservation
laws that are self-referential
:param subs:
substitutions in tuple format
:param state:
target symbolic state to which substitutions will be applied
:return:
list of valid substitutions
"""
return [
sub
for sub in subs
if str(state) not in [str(symbol) for symbol in sub[1].free_symbols]
]
def _sub_matches_cl(subs: Iterable[Tuple[sp.Symbol, sp.Basic]],
state_expr: sp.Basic,
state: sp.Basic) -> bool:
"""
Checks whether any of the substitutions in subs will be applied to
state_expr
:param subs:
substitutions in tuple format
:param state_expr:
target symbolic expressions in which substitutions will be applied
:param state: target symbolic state to which substitutions will
be applied
:return:
boolean indicating positive match
"""
sub_symbols = set(
sub[0]
for sub in subs
if str(state) not in [
str(symbol) for symbol in sub[1].free_symbols
]
)
return len(sub_symbols.intersection(state_expr.free_symbols)) > 0
def _get_conservation_law_subs(
conservation_laws: List[ConservationLaw]
) -> List[Tuple[sp.Symbol, sp.Basic]]:
"""
Computes a list of (state, law) tuples for conservation laws that still
appear in other conservation laws
:param conservation_laws:
see return of :func:`_flatten_conservation_laws`
:return:
list of tuples containing substitution rules to be used with sympy
subs
"""
free_symbols_cl = _conservation_law_variables(conservation_laws)
return [
(cl['state'], cl['state_expr']) for cl in conservation_laws
if cl['state'] in free_symbols_cl
]
def _conservation_law_variables(
conservation_laws: List[ConservationLaw]) -> Set[sp.Symbol]:
"""
Construct the set of all free variables from a list of conservation laws
:param conservation_laws:
list of conservation laws
:return:
free variables in conservation laws
"""
variables = set()
for cl in conservation_laws:
variables |= cl['state_expr'].free_symbols
return variables
[docs]def has_fixed_parameter_ic(specie: pysb.core.ComplexPattern,
pysb_model: pysb.Model,
ode_model: ODEModel) -> bool:
"""
Wrapper to interface
:meth:`ode_export.ODEModel.state_has_fixed_parameter_initial_condition`
from a pysb specie/model arguments
:param specie:
pysb species
:param pysb_model:
pysb model
:param ode_model:
ODE model
:return:
``False`` if the species does not have an initial condition at all.
Otherwise the return value of
:meth:`ode_export.ODEModel.state_has_fixed_parameter_initial_condition`
"""
# ComplexPatterns are not hashable, so we have to compare by string
ic_index = next(
(
ic
for ic, condition in enumerate(pysb_model.initials)
if pysb.pattern.match_complex_pattern(condition[0],
specie, exact=True)
),
None
)
if ic_index is None:
return False
else:
return ode_model.state_has_fixed_parameter_initial_condition(
ic_index
)
def _get_unconserved_monomers(rule: pysb.Rule,
pysb_model: pysb.Model) -> Set[str]:
"""
Constructs the set of monomer names for which the specified rule changes
the stoichiometry of the monomer in the specified model.
:param rule:
the pysb rule
:param pysb_model:
pysb model
:return:
set of monomer names for which the stoichiometry is not conserved
"""
unconserved_monomers = set()
if not rule.delete_molecules \
and len(rule.product_pattern.complex_patterns) == 0:
# if delete_molecules is not True but we have a degradation rule,
# we have to actually go through the reactions that are created by
# the rule
for reaction in [r for r in pysb_model.reactions
if rule.name in r['rule']]:
unconserved_monomers |= _get_changed_stoichiometries(
[pysb_model.species[ix] for ix in reaction['reactants']],
[pysb_model.species[ix] for ix in reaction['products']]
)
else:
# otherwise we can simply extract all information for the rule
# itself, which is computationally much more efficient
unconserved_monomers |= _get_changed_stoichiometries(
rule.reactant_pattern.complex_patterns,
rule.product_pattern.complex_patterns
)
return unconserved_monomers
def _get_changed_stoichiometries(
reactants: Union[pysb.ComplexPattern, List[pysb.ComplexPattern]],
products: Union[pysb.ComplexPattern, List[pysb.ComplexPattern]]
) -> Set[str]:
"""
Constructs the set of monomer names which have different
stoichiometries in reactants and products.
:param reactants:
(list of) complex pattern(s)
:param products:
(list of) complex pattern(s)
:returns:
set of monomer name for which the stoichiometry changed
"""
changed_stoichiometries = set()
reactant_monomers = extract_monomers(
reactants
)
product_monomers = extract_monomers(
products
)
for monomer in set(reactant_monomers + product_monomers):
if reactant_monomers.count(monomer) != product_monomers.count(monomer):
changed_stoichiometries.add(monomer)
return changed_stoichiometries
[docs]def pysb_model_from_path(pysb_model_file: str) -> pysb.Model:
"""Load a pysb model module and return the :class:`pysb.Model` instance
:param pysb_model_file: Full or relative path to the PySB model module
:return: The pysb Model instance
"""
pysb_model_module_name = \
os.path.splitext(os.path.split(pysb_model_file)[-1])[0]
import importlib.util
spec = importlib.util.spec_from_file_location(
pysb_model_module_name, pysb_model_file)
module = importlib.util.module_from_spec(spec)
sys.modules[pysb_model_module_name] = module
spec.loader.exec_module(module)
return module.model