"""
PEtab Objective
---------------
Functionality related to running simulations or evaluating the objective
function as defined by a PEtab problem
"""
import copy
import logging
import numbers
import re
from typing import (
Any,
Collection,
Dict,
Iterator,
List,
Optional,
Sequence,
Tuple,
Union,
)
import amici
import libsbml
import numpy as np
import pandas as pd
import petab
import sympy as sp
from amici.sbml_import import get_species_initial
from petab.C import * # noqa: F403
from petab.models import MODEL_TYPE_PYSB, MODEL_TYPE_SBML
from sympy.abc import _clash
from . import AmiciExpData, AmiciModel
from .logging import get_logger, log_execution_time
from .parameter_mapping import (
ParameterMapping,
ParameterMappingForCondition,
fill_in_parameters,
)
from .petab_import import PREEQ_INDICATOR_ID
from .petab_util import get_states_in_condition_table
try:
import pysb
except ImportError:
pysb = None
logger = get_logger(__name__)
# string constant definitions
LLH = "llh"
SLLH = "sllh"
FIM = "fim"
S2LLH = "s2llh"
RES = "res"
SRES = "sres"
RDATAS = "rdatas"
EDATAS = "edatas"
[docs]@log_execution_time("Simulating PEtab model", logger)
def simulate_petab(
petab_problem: petab.Problem,
amici_model: AmiciModel,
solver: Optional[amici.Solver] = None,
problem_parameters: Optional[Dict[str, float]] = None,
simulation_conditions: Union[pd.DataFrame, Dict] = None,
edatas: List[AmiciExpData] = None,
parameter_mapping: ParameterMapping = None,
scaled_parameters: Optional[bool] = False,
log_level: int = logging.WARNING,
num_threads: int = 1,
failfast: bool = True,
scaled_gradients: bool = False,
) -> Dict[str, Any]:
"""Simulate PEtab model.
.. note::
Regardless of `scaled_parameters`, unscaled sensitivities are returned,
unless `scaled_gradients=True`.
:param petab_problem:
PEtab problem to work on.
:param amici_model:
AMICI Model assumed to be compatible with ``petab_problem``.
:param solver:
An AMICI solver. Will use default options if None.
:param problem_parameters:
Run simulation with these parameters. If ``None``, PEtab
``nominalValues`` will be used. To be provided as dict, mapping PEtab
problem parameters to SBML IDs.
:param simulation_conditions:
Result of :py:func:`petab.get_simulation_conditions`. Can be provided
to save time if this has be obtained before.
Not required if ``edatas`` and ``parameter_mapping`` are provided.
:param edatas:
Experimental data. Parameters are inserted in-place for simulation.
:param parameter_mapping:
Optional precomputed PEtab parameter mapping for efficiency, as
generated by :py:func:`create_parameter_mapping`.
:param scaled_parameters:
If ``True``, ``problem_parameters`` are assumed to be on the scale
provided in the PEtab parameter table and will be unscaled.
If ``False``, they are assumed to be in linear scale.
:param log_level:
Log level, see :mod:`amici.logging` module.
:param num_threads:
Number of threads to use for simulating multiple conditions
(only used if compiled with OpenMP).
:param failfast:
Returns as soon as an integration failure is encountered, skipping
any remaining simulations.
:param scaled_gradients:
Whether to compute gradients on parameter scale (``True``) or not
(``False``).
:return:
Dictionary of
* cost function value (``LLH``),
* list of :class:`amici.amici.ReturnData` (``RDATAS``),
* list of :class:`amici.amici.ExpData` (``EDATAS``),
corresponding to the different simulation conditions.
For ordering of simulation conditions, see
:meth:`petab.Problem.get_simulation_conditions_from_measurement_df`.
"""
logger.setLevel(log_level)
if solver is None:
solver = amici_model.getSolver()
# Switch to scaled parameters.
problem_parameters = _default_scaled_parameters(
petab_problem=petab_problem,
problem_parameters=problem_parameters,
scaled_parameters=scaled_parameters,
)
scaled_parameters = True
# number of amici simulations will be number of unique
# (preequilibrationConditionId, simulationConditionId) pairs.
# Can be optimized by checking for identical condition vectors.
if simulation_conditions is None and parameter_mapping is None and edatas is None:
simulation_conditions = (
petab_problem.get_simulation_conditions_from_measurement_df()
)
# Get parameter mapping
if parameter_mapping is None:
parameter_mapping = create_parameter_mapping(
petab_problem=petab_problem,
simulation_conditions=simulation_conditions,
scaled_parameters=scaled_parameters,
amici_model=amici_model,
)
# Get edatas
if edatas is None:
# Generate ExpData with all condition-specific information
edatas = create_edatas(
amici_model=amici_model,
petab_problem=petab_problem,
simulation_conditions=simulation_conditions,
)
# Fill parameters in ExpDatas (in-place)
fill_in_parameters(
edatas=edatas,
problem_parameters=problem_parameters,
scaled_parameters=scaled_parameters,
parameter_mapping=parameter_mapping,
amici_model=amici_model,
)
# Simulate
rdatas = amici.runAmiciSimulations(
amici_model,
solver,
edata_list=edatas,
num_threads=num_threads,
failfast=failfast,
)
# Compute total llh
llh = sum(rdata["llh"] for rdata in rdatas)
# Compute total sllh
sllh = None
if solver.getSensitivityOrder() != amici.SensitivityOrder.none:
sllh = aggregate_sllh(
amici_model=amici_model,
rdatas=rdatas,
parameter_mapping=parameter_mapping,
petab_scale=scaled_parameters,
petab_problem=petab_problem,
edatas=edatas,
)
if not scaled_gradients and sllh is not None:
sllh = {
parameter_id: rescale_sensitivity(
sensitivity=sensitivity,
parameter_value=problem_parameters[parameter_id],
old_scale=petab_problem.parameter_df.loc[
parameter_id, PARAMETER_SCALE
],
new_scale=LIN,
)
for parameter_id, sensitivity in sllh.items()
}
# Log results
sim_cond = petab_problem.get_simulation_conditions_from_measurement_df()
for i, rdata in enumerate(rdatas):
sim_cond_id = "N/A" if sim_cond.empty else sim_cond.iloc[i, :].values
logger.debug(
f"Condition: {sim_cond_id}, status: {rdata['status']}, "
f"llh: {rdata['llh']}"
)
return {
LLH: llh,
SLLH: sllh,
RDATAS: rdatas,
EDATAS: edatas,
}
[docs]def aggregate_sllh(
amici_model: AmiciModel,
rdatas: Sequence[amici.ReturnDataView],
parameter_mapping: Optional[ParameterMapping],
edatas: List[AmiciExpData],
petab_scale: bool = True,
petab_problem: petab.Problem = None,
) -> Union[None, Dict[str, float]]:
"""
Aggregate likelihood gradient for all conditions, according to PEtab
parameter mapping.
:param amici_model:
AMICI model from which ``rdatas`` were obtained.
:param rdatas:
Simulation results.
:param parameter_mapping:
PEtab parameter mapping to condition-specific simulation parameters.
:param edatas:
Experimental data used for simulation.
:param petab_scale:
Whether to check that sensitivities were computed with parameters on
the scales provided in the PEtab parameters table.
:param petab_problem:
The PEtab problem that defines the parameter scales.
:return:
Aggregated likelihood sensitivities.
"""
accumulated_sllh = {}
model_parameter_ids = amici_model.getParameterIds()
if petab_scale and petab_problem is None:
raise ValueError(
"Please provide the PEtab problem, when using " "`petab_scale=True`."
)
# Check for issues in all condition simulation results.
for rdata in rdatas:
# Condition failed during simulation.
if rdata.status != amici.AMICI_SUCCESS:
return None
# Condition simulation result does not provide SLLH.
if rdata.sllh is None:
raise ValueError(
"The sensitivities of the likelihood for a condition were "
"not computed."
)
for condition_parameter_mapping, edata, rdata in zip(
parameter_mapping, edatas, rdatas
):
for sllh_parameter_index, condition_parameter_sllh in enumerate(rdata.sllh):
# Get PEtab parameter ID
# Use ExpData if it provides a parameter list, else default to
# Model.
if edata.plist:
model_parameter_index = edata.plist[sllh_parameter_index]
else:
model_parameter_index = amici_model.plist(sllh_parameter_index)
model_parameter_id = model_parameter_ids[model_parameter_index]
petab_parameter_id = condition_parameter_mapping.map_sim_var[
model_parameter_id
]
# Initialize
if petab_parameter_id not in accumulated_sllh:
accumulated_sllh[petab_parameter_id] = 0
# Check that the scale is consistent
if petab_scale:
# `ParameterMappingForCondition` objects provide the scale in
# terms of `petab.C` constants already, not AMICI equivalents.
model_parameter_scale = condition_parameter_mapping.scale_map_sim_var[
model_parameter_id
]
petab_parameter_scale = petab_problem.parameter_df.loc[
petab_parameter_id, PARAMETER_SCALE
]
if model_parameter_scale != petab_parameter_scale:
raise ValueError(
f"The scale of the parameter `{petab_parameter_id}` "
"differs between the AMICI model "
f"({model_parameter_scale}) and the PEtab problem "
f"({petab_parameter_scale})."
)
# Accumulate
accumulated_sllh[petab_parameter_id] += condition_parameter_sllh
return accumulated_sllh
[docs]def rescale_sensitivity(
sensitivity: float,
parameter_value: float,
old_scale: str,
new_scale: str,
) -> float:
"""Rescale a sensitivity between parameter scales.
:param sensitivity:
The sensitivity corresponding to the parameter value.
:param parameter_value:
The parameter vector element, on ``old_scale``.
:param old_scale:
The scale of the parameter value.
:param new_scale:
The parameter scale on which to rescale the sensitivity.
:return:
The rescaled sensitivity.
"""
LOG_E_10 = np.log(10)
if old_scale == new_scale:
return sensitivity
unscaled_parameter_value = petab.parameters.unscale(
parameter=parameter_value,
scale_str=old_scale,
)
scale = {
(LIN, LOG): lambda s: s * unscaled_parameter_value,
(LOG, LIN): lambda s: s / unscaled_parameter_value,
(LIN, LOG10): lambda s: s * (unscaled_parameter_value * LOG_E_10),
(LOG10, LIN): lambda s: s / (unscaled_parameter_value * LOG_E_10),
}
scale[(LOG, LOG10)] = lambda s: scale[(LIN, LOG10)](scale[(LOG, LIN)](s))
scale[(LOG10, LOG)] = lambda s: scale[(LIN, LOG)](scale[(LOG10, LIN)](s))
if (old_scale, new_scale) not in scale:
raise NotImplementedError(f"Old scale: {old_scale}. New scale: {new_scale}.")
return scale[(old_scale, new_scale)](sensitivity)
[docs]def create_parameterized_edatas(
amici_model: AmiciModel,
petab_problem: petab.Problem,
problem_parameters: Dict[str, numbers.Number],
scaled_parameters: bool = False,
parameter_mapping: ParameterMapping = None,
simulation_conditions: Union[pd.DataFrame, Dict] = None,
) -> List[amici.ExpData]:
"""Create list of :class:amici.ExpData objects with parameters filled in.
:param amici_model:
AMICI Model assumed to be compatible with ``petab_problem``.
:param petab_problem:
PEtab problem to work on.
:param problem_parameters:
Run simulation with these parameters. If ``None``, PEtab
``nominalValues`` will be used. To be provided as dict, mapping PEtab
problem parameters to SBML IDs.
:param scaled_parameters:
If ``True``, ``problem_parameters`` are assumed to be on the scale
provided in the PEtab parameter table and will be unscaled.
If ``False``, they are assumed to be in linear scale.
:param parameter_mapping:
Optional precomputed PEtab parameter mapping for efficiency, as
generated by :func:`create_parameter_mapping`.
:param simulation_conditions:
Result of :func:`petab.get_simulation_conditions`. Can be provided to
save time if this has been obtained before.
:return:
List with one :class:`amici.amici.ExpData` per simulation condition,
with filled in timepoints, data and parameters.
"""
# number of amici simulations will be number of unique
# (preequilibrationConditionId, simulationConditionId) pairs.
# Can be optimized by checking for identical condition vectors.
if simulation_conditions is None:
simulation_conditions = (
petab_problem.get_simulation_conditions_from_measurement_df()
)
# Get parameter mapping
if parameter_mapping is None:
parameter_mapping = create_parameter_mapping(
petab_problem=petab_problem,
simulation_conditions=simulation_conditions,
scaled_parameters=scaled_parameters,
amici_model=amici_model,
)
# Generate ExpData with all condition-specific information
edatas = create_edatas(
amici_model=amici_model,
petab_problem=petab_problem,
simulation_conditions=simulation_conditions,
)
# Fill parameters in ExpDatas (in-place)
fill_in_parameters(
edatas=edatas,
problem_parameters=problem_parameters,
scaled_parameters=scaled_parameters,
parameter_mapping=parameter_mapping,
amici_model=amici_model,
)
return edatas
[docs]def create_parameter_mapping(
petab_problem: petab.Problem,
simulation_conditions: Union[pd.DataFrame, List[Dict]],
scaled_parameters: bool,
amici_model: AmiciModel,
**parameter_mapping_kwargs,
) -> ParameterMapping:
"""Generate AMICI specific parameter mapping.
:param petab_problem:
PEtab problem
:param simulation_conditions:
Result of :func:`petab.get_simulation_conditions`. Can be provided to
save time if this has been obtained before.
:param scaled_parameters:
If ``True``, problem_parameters are assumed to be on the scale provided
in the PEtab parameter table and will be unscaled. If ``False``, they
are assumed to be in linear scale.
:param amici_model:
AMICI model.
:param parameter_mapping_kwargs:
Optional keyword arguments passed to
:func:`petab.get_optimization_to_simulation_parameter_mapping`.
To allow changing fixed PEtab problem parameters (``estimate=0``),
use ``fill_fixed_parameters=False``.
:return:
List of the parameter mappings.
"""
if simulation_conditions is None:
simulation_conditions = (
petab_problem.get_simulation_conditions_from_measurement_df()
)
if isinstance(simulation_conditions, list):
simulation_conditions = pd.DataFrame(data=simulation_conditions)
# Because AMICI globalizes all local parameters during model import,
# we need to do that here as well to prevent parameter mapping errors
# (PEtab does currently not care about SBML LocalParameters)
if petab_problem.model.type_id == MODEL_TYPE_SBML:
if petab_problem.sbml_document:
converter_config = (
libsbml.SBMLLocalParameterConverter().getDefaultProperties()
)
petab_problem.sbml_document.convert(converter_config)
else:
logger.debug(
"No petab_problem.sbml_document is set. Cannot "
"convert SBML LocalParameters. If the model contains "
"LocalParameters, parameter mapping will fail."
)
default_parameter_mapping_kwargs = {
"warn_unmapped": False,
"scaled_parameters": scaled_parameters,
"allow_timepoint_specific_numeric_noise_parameters": not petab.lint.observable_table_has_nontrivial_noise_formula(
petab_problem.observable_df
),
}
if parameter_mapping_kwargs is None:
parameter_mapping_kwargs = {}
prelim_parameter_mapping = petab.get_optimization_to_simulation_parameter_mapping(
condition_df=petab_problem.condition_df,
measurement_df=petab_problem.measurement_df,
parameter_df=petab_problem.parameter_df,
observable_df=petab_problem.observable_df,
mapping_df=petab_problem.mapping_df,
model=petab_problem.model,
**dict(default_parameter_mapping_kwargs, **parameter_mapping_kwargs),
)
parameter_mapping = ParameterMapping()
for (_, condition), prelim_mapping_for_condition in zip(
simulation_conditions.iterrows(), prelim_parameter_mapping
):
mapping_for_condition = create_parameter_mapping_for_condition(
prelim_mapping_for_condition, condition, petab_problem, amici_model
)
parameter_mapping.append(mapping_for_condition)
return parameter_mapping
def _get_initial_state_sbml(
petab_problem: petab.Problem, element_id: str
) -> Union[float, sp.Basic]:
element = petab_problem.sbml_model.getElementBySId(element_id)
type_code = element.getTypeCode()
initial_assignment = petab_problem.sbml_model.getInitialAssignmentBySymbol(
element_id
)
if initial_assignment:
initial_assignment = sp.sympify(
libsbml.formulaToL3String(initial_assignment.getMath()), locals=_clash
)
if type_code == libsbml.SBML_SPECIES:
value = (
get_species_initial(element)
if initial_assignment is None
else initial_assignment
)
elif type_code == libsbml.SBML_PARAMETER:
value = element.getValue() if initial_assignment is None else initial_assignment
elif type_code == libsbml.SBML_COMPARTMENT:
value = element.getSize() if initial_assignment is None else initial_assignment
else:
raise NotImplementedError(
f"Don't know what how to handle {element_id} in " "condition table."
)
return value
def _get_initial_state_pysb(
petab_problem: petab.Problem, element_id: str
) -> Union[float, sp.Symbol]:
species_idx = int(re.match(r"__s(\d+)$", element_id)[1])
species_pattern = petab_problem.model.model.species[species_idx]
from pysb.pattern import match_complex_pattern
value = next(
(
initial.value
for initial in petab_problem.model.model.initials
if match_complex_pattern(initial.pattern, species_pattern, exact=True)
),
0.0,
)
if isinstance(value, pysb.Parameter):
if value.name in petab_problem.parameter_df.index:
value = value.name
else:
value = value.value
return value
def _set_initial_state(
petab_problem,
condition_id,
element_id,
init_par_id,
par_map,
scale_map,
value,
):
value = petab.to_float_if_float(value)
if pd.isna(value):
if petab_problem.model.type_id == MODEL_TYPE_SBML:
value = _get_initial_state_sbml(petab_problem, element_id)
elif petab_problem.model.type_id == MODEL_TYPE_PYSB:
value = _get_initial_state_pysb(petab_problem, element_id)
try:
value = float(value)
except (ValueError, TypeError):
if sp.nsimplify(value).is_Atom and (
pysb is None or not isinstance(value, pysb.Component)
):
# Get rid of multiplication with one
value = sp.nsimplify(value)
else:
raise NotImplementedError(
"Cannot handle non-trivial initial state "
f"expression for {element_id}: {value}"
)
# this should be a parameter ID
value = str(value)
logger.debug(
f"The species {element_id} has no initial value "
f"defined for the condition {condition_id} in "
"the PEtab conditions table. The initial value is "
f"now set to {value}, which is the initial value "
"defined in the SBML model."
)
par_map[init_par_id] = value
if isinstance(value, float):
# numeric initial state
scale_map[init_par_id] = petab.LIN
else:
# parametric initial state
scale_map[init_par_id] = petab_problem.parameter_df[PARAMETER_SCALE].get(
value, petab.LIN
)
[docs]def create_parameter_mapping_for_condition(
parameter_mapping_for_condition: petab.ParMappingDictQuadruple,
condition: Union[pd.Series, Dict],
petab_problem: petab.Problem,
amici_model: AmiciModel,
) -> ParameterMappingForCondition:
"""Generate AMICI specific parameter mapping for condition.
:param parameter_mapping_for_condition:
Preliminary parameter mapping for condition.
:param condition:
:class:`pandas.DataFrame` row with ``preequilibrationConditionId`` and
``simulationConditionId``.
:param petab_problem:
Underlying PEtab problem.
:param amici_model:
AMICI model.
:return:
The parameter and parameter scale mappings, for fixed
preequilibration, fixed simulation, and variable simulation
parameters, and then the respective scalings.
"""
(
condition_map_preeq,
condition_map_sim,
condition_scale_map_preeq,
condition_scale_map_sim,
) = parameter_mapping_for_condition
logger.debug(f"PEtab mapping: {parameter_mapping_for_condition}")
if len(condition_map_preeq) != len(condition_scale_map_preeq) or len(
condition_map_sim
) != len(condition_scale_map_sim):
raise AssertionError(
"Number of parameters and number of parameter " "scales do not match."
)
if len(condition_map_preeq) and len(condition_map_preeq) != len(condition_map_sim):
logger.debug(f"Preequilibration parameter map: {condition_map_preeq}")
logger.debug(f"Simulation parameter map: {condition_map_sim}")
raise AssertionError(
"Number of parameters for preequilbration " "and simulation do not match."
)
##########################################################################
# initial states
# Initial states have been set during model import based on the SBML model.
# If initial states were overwritten in the PEtab condition table, they are
# applied here.
# During model generation, parameters for initial concentrations and
# respective initial assignments have been created for the
# relevant species, here we add these parameters to the parameter mapping.
# In absence of preequilibration this could also be handled via
# ExpData.x0, but in the case of preequilibration this would not allow for
# resetting initial states.
if states_in_condition_table := get_states_in_condition_table(
petab_problem, condition
):
# set indicator fixed parameter for preeq
# (we expect here, that this parameter was added during import and
# that it was not added by the user with a different meaning...)
if condition_map_preeq:
condition_map_preeq[PREEQ_INDICATOR_ID] = 1.0
condition_scale_map_preeq[PREEQ_INDICATOR_ID] = LIN
condition_map_sim[PREEQ_INDICATOR_ID] = 0.0
condition_scale_map_sim[PREEQ_INDICATOR_ID] = LIN
for element_id, (value, preeq_value) in states_in_condition_table.items():
# for preequilibration
init_par_id = f"initial_{element_id}_preeq"
if (
condition_id := condition.get(PREEQUILIBRATION_CONDITION_ID)
) is not None:
_set_initial_state(
petab_problem,
condition_id,
element_id,
init_par_id,
condition_map_preeq,
condition_scale_map_preeq,
preeq_value,
)
else:
# need to set dummy value for preeq parameter anyways, as it
# is expected below (set to 0, not nan, because will be
# multiplied with indicator variable in initial assignment)
condition_map_sim[init_par_id] = 0.0
condition_scale_map_sim[init_par_id] = LIN
# for simulation
condition_id = condition[SIMULATION_CONDITION_ID]
init_par_id = f"initial_{element_id}_sim"
_set_initial_state(
petab_problem,
condition_id,
element_id,
init_par_id,
condition_map_sim,
condition_scale_map_sim,
value,
)
##########################################################################
# separate fixed and variable AMICI parameters, because we may have
# different fixed parameters for preeq and sim condition, but we cannot
# have different variable parameters. without splitting,
# merge_preeq_and_sim_pars_condition below may fail.
# TODO: This can be done already in parameter mapping creation.
variable_par_ids = amici_model.getParameterIds()
fixed_par_ids = amici_model.getFixedParameterIds()
condition_map_preeq_var, condition_map_preeq_fix = _subset_dict(
condition_map_preeq, variable_par_ids, fixed_par_ids
)
condition_scale_map_preeq_var, condition_scale_map_preeq_fix = _subset_dict(
condition_scale_map_preeq, variable_par_ids, fixed_par_ids
)
condition_map_sim_var, condition_map_sim_fix = _subset_dict(
condition_map_sim, variable_par_ids, fixed_par_ids
)
condition_scale_map_sim_var, condition_scale_map_sim_fix = _subset_dict(
condition_scale_map_sim, variable_par_ids, fixed_par_ids
)
logger.debug("Fixed parameters preequilibration: " f"{condition_map_preeq_fix}")
logger.debug("Fixed parameters simulation: " f"{condition_map_sim_fix}")
logger.debug("Variable parameters preequilibration: " f"{condition_map_preeq_var}")
logger.debug("Variable parameters simulation: " f"{condition_map_sim_var}")
petab.merge_preeq_and_sim_pars_condition(
condition_map_preeq_var,
condition_map_sim_var,
condition_scale_map_preeq_var,
condition_scale_map_sim_var,
condition,
)
logger.debug(f"Merged: {condition_map_sim_var}")
parameter_mapping_for_condition = ParameterMappingForCondition(
map_preeq_fix=condition_map_preeq_fix,
map_sim_fix=condition_map_sim_fix,
map_sim_var=condition_map_sim_var,
scale_map_preeq_fix=condition_scale_map_preeq_fix,
scale_map_sim_fix=condition_scale_map_sim_fix,
scale_map_sim_var=condition_scale_map_sim_var,
)
return parameter_mapping_for_condition
[docs]def create_edatas(
amici_model: AmiciModel,
petab_problem: petab.Problem,
simulation_conditions: Union[pd.DataFrame, Dict] = None,
) -> List[amici.ExpData]:
"""Create list of :class:`amici.amici.ExpData` objects for PEtab problem.
:param amici_model:
AMICI model.
:param petab_problem:
Underlying PEtab problem.
:param simulation_conditions:
Result of :func:`petab.get_simulation_conditions`. Can be provided to
save time if this has be obtained before.
:return:
List with one :class:`amici.amici.ExpData` per simulation condition,
with filled in timepoints and data.
"""
if simulation_conditions is None:
simulation_conditions = (
petab_problem.get_simulation_conditions_from_measurement_df()
)
observable_ids = amici_model.getObservableIds()
measurement_groupvar = [SIMULATION_CONDITION_ID]
if PREEQUILIBRATION_CONDITION_ID in simulation_conditions:
measurement_groupvar.append(petab.PREEQUILIBRATION_CONDITION_ID)
measurement_dfs = dict(
list(petab_problem.measurement_df.groupby(measurement_groupvar))
)
edatas = []
for _, condition in simulation_conditions.iterrows():
# Create amici.ExpData for each simulation
if PREEQUILIBRATION_CONDITION_ID in condition:
measurement_index = (
condition.get(SIMULATION_CONDITION_ID),
condition.get(PREEQUILIBRATION_CONDITION_ID),
)
else:
measurement_index = (condition.get(SIMULATION_CONDITION_ID),)
edata = create_edata_for_condition(
condition=condition,
amici_model=amici_model,
measurement_df=measurement_dfs[measurement_index],
petab_problem=petab_problem,
observable_ids=observable_ids,
)
edatas.append(edata)
return edatas
[docs]def create_edata_for_condition(
condition: Union[Dict, pd.Series],
measurement_df: pd.DataFrame,
amici_model: AmiciModel,
petab_problem: petab.Problem,
observable_ids: List[str],
) -> amici.ExpData:
"""Get :class:`amici.amici.ExpData` for the given PEtab condition.
Sets timepoints, observed data and sigmas.
:param condition:
:class:`pandas.DataFrame` row with ``preequilibrationConditionId`` and
``simulationConditionId``.
:param measurement_df:
:class:`pandas.DataFrame` with measurements for the given condition.
:param amici_model:
AMICI model
:param petab_problem:
Underlying PEtab problem
:param observable_ids:
List of observable IDs
:return:
ExpData instance.
"""
if amici_model.nytrue != len(observable_ids):
raise AssertionError(
"Number of AMICI model observables does not "
"match number of PEtab observables."
)
# create an ExpData object
edata = amici.ExpData(amici_model)
edata.id = condition[SIMULATION_CONDITION_ID]
if condition.get(PREEQUILIBRATION_CONDITION_ID):
edata.id += "+" + condition.get(PREEQUILIBRATION_CONDITION_ID)
##########################################################################
# enable initial parameters reinitialization
states_in_condition_table = get_states_in_condition_table(
petab_problem, condition=condition
)
if condition.get(PREEQUILIBRATION_CONDITION_ID) and states_in_condition_table:
state_ids = amici_model.getStateIds()
state_idx_reinitalization = [
state_ids.index(s)
for s, (v, v_preeq) in states_in_condition_table.items()
if not np.isnan(v)
]
edata.reinitialization_state_idxs_sim = state_idx_reinitalization
logger.debug(
"Enabling state reinitialization for condition "
f"{condition.get(PREEQUILIBRATION_CONDITION_ID, '')} - "
f"{condition.get(SIMULATION_CONDITION_ID)} "
f"{states_in_condition_table}"
)
##########################################################################
# timepoints
# find replicate numbers of time points
timepoints_w_reps = _get_timepoints_with_replicates(df_for_condition=measurement_df)
edata.setTimepoints(timepoints_w_reps)
##########################################################################
# measurements and sigmas
y, sigma_y = _get_measurements_and_sigmas(
df_for_condition=measurement_df,
timepoints_w_reps=timepoints_w_reps,
observable_ids=observable_ids,
)
edata.setObservedData(y.flatten())
edata.setObservedDataStdDev(sigma_y.flatten())
return edata
def _subset_dict(
full: Dict[Any, Any], *args: Collection[Any]
) -> Iterator[Dict[Any, Any]]:
"""Get subset of dictionary based on provided keys
:param full:
Dictionary to subset
:param args:
Collections of keys to be contained in the different subsets
:return:
subsetted dictionary
"""
for keys in args:
yield {key: val for (key, val) in full.items() if key in keys}
def _get_timepoints_with_replicates(
df_for_condition: pd.DataFrame,
) -> List[numbers.Number]:
"""
Get list of timepoints including replicate measurements
:param df_for_condition:
PEtab measurement table subset for a single condition.
:return:
Sorted list of timepoints, including multiple timepoints accounting
for replicate measurements.
"""
# create sorted list of all timepoints for which measurements exist
timepoints = sorted(df_for_condition[TIME].unique().astype(float))
# find replicate numbers of time points
timepoints_w_reps = []
for time in timepoints:
# subselect for time
df_for_time = df_for_condition[df_for_condition.time.astype(float) == time]
# rep number is maximum over rep numbers for observables
n_reps = max(df_for_time.groupby([OBSERVABLE_ID, TIME]).size())
# append time point n_rep times
timepoints_w_reps.extend([time] * n_reps)
return timepoints_w_reps
def _get_measurements_and_sigmas(
df_for_condition: pd.DataFrame,
timepoints_w_reps: Sequence[numbers.Number],
observable_ids: Sequence[str],
) -> Tuple[np.array, np.array]:
"""
Get measurements and sigmas
Generate arrays with measurements and sigmas in AMICI format from a
PEtab measurement table subset for a single condition.
:param df_for_condition:
Subset of PEtab measurement table for one condition
:param timepoints_w_reps:
Timepoints for which there exist measurements, including replicates
:param observable_ids:
List of observable IDs for mapping IDs to indices.
:return:
arrays for measurement and sigmas
"""
# prepare measurement matrix
y = np.full(shape=(len(timepoints_w_reps), len(observable_ids)), fill_value=np.nan)
# prepare sigma matrix
sigma_y = y.copy()
timepoints = sorted(df_for_condition[TIME].unique().astype(float))
for time in timepoints:
# subselect for time
df_for_time = df_for_condition[df_for_condition[TIME] == time]
time_ix_0 = timepoints_w_reps.index(time)
# remember used time indices for each observable
time_ix_for_obs_ix = {}
# iterate over measurements
for _, measurement in df_for_time.iterrows():
# extract observable index
observable_ix = observable_ids.index(measurement[OBSERVABLE_ID])
# update time index for observable
if observable_ix in time_ix_for_obs_ix:
time_ix_for_obs_ix[observable_ix] += 1
else:
time_ix_for_obs_ix[observable_ix] = time_ix_0
# fill observable and possibly noise parameter
y[time_ix_for_obs_ix[observable_ix], observable_ix] = measurement[
MEASUREMENT
]
if isinstance(measurement.get(NOISE_PARAMETERS, None), numbers.Number):
sigma_y[time_ix_for_obs_ix[observable_ix], observable_ix] = measurement[
NOISE_PARAMETERS
]
return y, sigma_y
[docs]def rdatas_to_measurement_df(
rdatas: Sequence[amici.ReturnData], model: AmiciModel, measurement_df: pd.DataFrame
) -> pd.DataFrame:
"""
Create a measurement dataframe in the PEtab format from the passed
``rdatas`` and own information.
:param rdatas:
A sequence of rdatas with the ordering of
:func:`petab.get_simulation_conditions`.
:param model:
AMICI model used to generate ``rdatas``.
:param measurement_df:
PEtab measurement table used to generate ``rdatas``.
:return:
A dataframe built from the rdatas in the format of ``measurement_df``.
"""
simulation_conditions = petab.get_simulation_conditions(measurement_df)
observable_ids = model.getObservableIds()
rows = []
# iterate over conditions
for (_, condition), rdata in zip(simulation_conditions.iterrows(), rdatas):
# current simulation matrix
y = rdata.y
# time array used in rdata
t = list(rdata.ts)
# extract rows for condition
cur_measurement_df = petab.get_rows_for_condition(measurement_df, condition)
# iterate over entries for the given condition
# note: this way we only generate a dataframe entry for every
# row that existed in the original dataframe. if we want to
# e.g. have also timepoints non-existent in the original file,
# we need to instead iterate over the rdata['y'] entries
for _, row in cur_measurement_df.iterrows():
# copy row
row_sim = copy.deepcopy(row)
# extract simulated measurement value
timepoint_idx = t.index(row[TIME])
observable_idx = observable_ids.index(row[OBSERVABLE_ID])
measurement_sim = y[timepoint_idx, observable_idx]
# change measurement entry
row_sim[MEASUREMENT] = measurement_sim
rows.append(row_sim)
return pd.DataFrame(rows)
[docs]def rdatas_to_simulation_df(
rdatas: Sequence[amici.ReturnData], model: AmiciModel, measurement_df: pd.DataFrame
) -> pd.DataFrame:
"""Create a PEtab simulation dataframe from
:class:`amici.amici.ReturnData` s.
See :func:`rdatas_to_measurement_df` for details, only that model outputs
will appear in column ``simulation`` instead of ``measurement``."""
df = rdatas_to_measurement_df(
rdatas=rdatas, model=model, measurement_df=measurement_df
)
return df.rename(columns={MEASUREMENT: SIMULATION})
def _default_scaled_parameters(
petab_problem: petab.Problem,
problem_parameters: Optional[Dict[str, float]] = None,
scaled_parameters: bool = False,
) -> Optional[Dict[str, float]]:
"""
Helper method to handle an unscaled or unspecified parameter vector.
The parameter vector defaults to the nominal values in the PEtab
parameter table.
Unscaled parameter values are scaled.
:param petab_problem:
The PEtab problem.
:param problem_parameters:
Keys are PEtab parameter IDs, values are parameter values on the scale
defined in the PEtab parameter table. Defaults to the nominal values in
the PEtab parameter table.
:param scaled_parameters:
Whether `problem_parameters` are on the scale defined in the PEtab
parameter table.
:return:
The scaled parameter vector.
"""
if problem_parameters is None:
problem_parameters = dict(
zip(
petab_problem.x_ids,
petab_problem.x_nominal_scaled,
)
)
elif not scaled_parameters:
problem_parameters = petab_problem.scale_parameters(problem_parameters)
return problem_parameters