"""
PEtab Objective
---------------
Functionality related to running simulations or evaluating the objective
function as defined by a PEtab problem
"""
import copy
import logging
import numbers
from typing import (Any, Collection, Dict, Iterator, List, Optional, Sequence,
Tuple, Union)
import libsbml
import numpy as np
import pandas as pd
import petab
import sympy as sp
from petab.C import * # noqa: F403
from sympy.abc import _clash
import amici
from amici.sbml_import import get_species_initial
from . import AmiciExpData, AmiciModel
from .logging import get_logger, log_execution_time
from .parameter_mapping import (ParameterMapping, ParameterMappingForCondition,
fill_in_parameters)
from .petab_import import PREEQ_INDICATOR_ID, element_is_state
from .parameter_mapping import (
fill_in_parameters,
ParameterMappingForCondition,
ParameterMapping,
)
logger = get_logger(__name__)
# string constant definitions
LLH = 'llh'
SLLH = 'sllh'
FIM = 'fim'
S2LLH = 's2llh'
RES = 'res'
SRES = 'sres'
RDATAS = 'rdatas'
EDATAS = 'edatas'
[docs]@log_execution_time('Simulating PEtab model', logger)
def simulate_petab(
petab_problem: petab.Problem,
amici_model: AmiciModel,
solver: Optional[amici.Solver] = None,
problem_parameters: Optional[Dict[str, float]] = None,
simulation_conditions: Union[pd.DataFrame, Dict] = None,
edatas: List[AmiciExpData] = None,
parameter_mapping: ParameterMapping = None,
scaled_parameters: Optional[bool] = False,
log_level: int = logging.WARNING,
num_threads: int = 1,
failfast: bool = True,
scaled_gradients: bool = False,
) -> Dict[str, Any]:
"""Simulate PEtab model.
.. note::
Regardless of `scaled_parameters`, unscaled sensitivities are returned,
unless `scaled_gradients=True`.
:param petab_problem:
PEtab problem to work on.
:param amici_model:
AMICI Model assumed to be compatible with ``petab_problem``.
:param solver:
An AMICI solver. Will use default options if None.
:param problem_parameters:
Run simulation with these parameters. If ``None``, PEtab
``nominalValues`` will be used. To be provided as dict, mapping PEtab
problem parameters to SBML IDs.
:param simulation_conditions:
Result of :py:func:`petab.get_simulation_conditions`. Can be provided
to save time if this has be obtained before.
Not required if ``edatas`` and ``parameter_mapping`` are provided.
:param edatas:
Experimental data. Parameters are inserted in-place for simulation.
:param parameter_mapping:
Optional precomputed PEtab parameter mapping for efficiency, as
generated by :py:func:`create_parameter_mapping`.
:param scaled_parameters:
If ``True``, ``problem_parameters`` are assumed to be on the scale
provided in the PEtab parameter table and will be unscaled.
If ``False``, they are assumed to be in linear scale.
:param log_level:
Log level, see :mod:`amici.logging` module.
:param num_threads:
Number of threads to use for simulating multiple conditions
(only used if compiled with OpenMP).
:param failfast:
Returns as soon as an integration failure is encountered, skipping
any remaining simulations.
:param scaled_gradients:
Whether to compute gradients on parameter scale (``True``) or not
(``False``).
:return:
Dictionary of
* cost function value (``LLH``),
* list of :class:`amici.amici.ReturnData` (``RDATAS``),
* list of :class:`amici.amici.ExpData` (``EDATAS``),
corresponding to the different simulation conditions.
For ordering of simulation conditions, see
:meth:`petab.Problem.get_simulation_conditions_from_measurement_df`.
"""
logger.setLevel(log_level)
if solver is None:
solver = amici_model.getSolver()
# Switch to scaled parameters.
problem_parameters = _default_scaled_parameters(
petab_problem=petab_problem,
problem_parameters=problem_parameters,
scaled_parameters=scaled_parameters,
)
scaled_parameters = True
# number of amici simulations will be number of unique
# (preequilibrationConditionId, simulationConditionId) pairs.
# Can be optimized by checking for identical condition vectors.
if simulation_conditions is None and parameter_mapping is None \
and edatas is None:
simulation_conditions = \
petab_problem.get_simulation_conditions_from_measurement_df()
# Get parameter mapping
if parameter_mapping is None:
parameter_mapping = create_parameter_mapping(
petab_problem=petab_problem,
simulation_conditions=simulation_conditions,
scaled_parameters=scaled_parameters,
amici_model=amici_model)
# Get edatas
if edatas is None:
# Generate ExpData with all condition-specific information
edatas = create_edatas(
amici_model=amici_model,
petab_problem=petab_problem,
simulation_conditions=simulation_conditions)
# Fill parameters in ExpDatas (in-place)
fill_in_parameters(
edatas=edatas,
problem_parameters=problem_parameters,
scaled_parameters=scaled_parameters,
parameter_mapping=parameter_mapping,
amici_model=amici_model)
# Simulate
rdatas = amici.runAmiciSimulations(
amici_model, solver, edata_list=edatas,
num_threads=num_threads, failfast=failfast)
# Compute total llh
llh = sum(rdata['llh'] for rdata in rdatas)
# Compute total sllh
sllh = None
if solver.getSensitivityOrder() != amici.SensitivityOrder.none:
sllh = aggregate_sllh(
amici_model=amici_model,
rdatas=rdatas,
parameter_mapping=parameter_mapping,
petab_scale=scaled_parameters,
petab_problem=petab_problem,
edatas=edatas,
)
if not scaled_gradients and sllh is not None:
sllh = {
parameter_id: rescale_sensitivity(
sensitivity=sensitivity,
parameter_value=problem_parameters[parameter_id],
old_scale=petab_problem.parameter_df.loc[
parameter_id, PARAMETER_SCALE
],
new_scale=LIN,
)
for parameter_id, sensitivity in sllh.items()
}
# Log results
sim_cond = petab_problem.get_simulation_conditions_from_measurement_df()
for i, rdata in enumerate(rdatas):
sim_cond_id = "N/A" if sim_cond.empty else sim_cond.iloc[i, :].values
logger.debug(
f"Condition: {sim_cond_id}, status: {rdata['status']}, "
f"llh: {rdata['llh']}"
)
return {
LLH: llh,
SLLH: sllh,
RDATAS: rdatas,
EDATAS: edatas,
}
[docs]def aggregate_sllh(
amici_model: AmiciModel,
rdatas: Sequence[amici.ReturnDataView],
parameter_mapping: Optional[ParameterMapping],
edatas: List[AmiciExpData],
petab_scale: bool = True,
petab_problem: petab.Problem = None,
) -> Union[None, Dict[str, float]]:
"""
Aggregate likelihood gradient for all conditions, according to PEtab
parameter mapping.
:param amici_model:
AMICI model from which ``rdatas`` were obtained.
:param rdatas:
Simulation results.
:param parameter_mapping:
PEtab parameter mapping to condition-specific simulation parameters.
:param edatas:
Experimental data used for simulation.
:param petab_scale:
Whether to check that sensitivities were computed with parameters on
the scales provided in the PEtab parameters table.
:param petab_problem:
The PEtab problem that defines the parameter scales.
:return:
Aggregated likelihood sensitivities.
"""
accumulated_sllh = {}
model_parameter_ids = amici_model.getParameterIds()
if petab_scale and petab_problem is None:
raise ValueError(
'Please provide the PEtab problem, when using '
'`petab_scale=True`.'
)
# Check for issues in all condition simulation results.
for rdata in rdatas:
# Condition failed during simulation.
if rdata.status != amici.AMICI_SUCCESS:
return None
# Condition simulation result does not provide SLLH.
if rdata.sllh is None:
raise ValueError(
'The sensitivities of the likelihood for a condition were '
'not computed.'
)
for condition_parameter_mapping, edata, rdata in \
zip(parameter_mapping, edatas, rdatas):
for sllh_parameter_index, condition_parameter_sllh in \
enumerate(rdata.sllh):
# Get PEtab parameter ID
# Use ExpData if it provides a parameter list, else default to
# Model.
if edata.plist:
model_parameter_index = edata.plist[sllh_parameter_index]
else:
model_parameter_index = amici_model.plist(sllh_parameter_index)
model_parameter_id = model_parameter_ids[model_parameter_index]
petab_parameter_id = (
condition_parameter_mapping
.map_sim_var[model_parameter_id]
)
# Initialize
if petab_parameter_id not in accumulated_sllh:
accumulated_sllh[petab_parameter_id] = 0
# Check that the scale is consistent
if petab_scale:
# `ParameterMappingForCondition` objects provide the scale in
# terms of `petab.C` constants already, not AMICI equivalents.
model_parameter_scale = (
condition_parameter_mapping
.scale_map_sim_var[model_parameter_id]
)
petab_parameter_scale = (
petab_problem
.parameter_df
.loc[petab_parameter_id, PARAMETER_SCALE]
)
if model_parameter_scale != petab_parameter_scale:
raise ValueError(
f'The scale of the parameter `{petab_parameter_id}` '
'differs between the AMICI model '
f'({model_parameter_scale}) and the PEtab problem '
f'({petab_parameter_scale}).'
)
# Accumulate
accumulated_sllh[petab_parameter_id] += condition_parameter_sllh
return accumulated_sllh
[docs]def rescale_sensitivity(
sensitivity: float,
parameter_value: float,
old_scale: str,
new_scale: str,
) -> float:
"""Rescale a sensitivity between parameter scales.
:param sensitivity:
The sensitivity corresponding to the parameter value.
:param parameter_value:
The parameter vector element, on ``old_scale``.
:param old_scale:
The scale of the parameter value.
:param new_scale:
The parameter scale on which to rescale the sensitivity.
:return:
The rescaled sensitivity.
"""
LOG_E_10 = np.log(10)
if old_scale == new_scale:
return sensitivity
unscaled_parameter_value = petab.parameters.unscale(
parameter=parameter_value,
scale_str=old_scale,
)
scale = {
(LIN, LOG): lambda s: s * unscaled_parameter_value,
(LOG, LIN): lambda s: s / unscaled_parameter_value,
(LIN, LOG10): lambda s: s * (unscaled_parameter_value * LOG_E_10),
(LOG10, LIN): lambda s: s / (unscaled_parameter_value * LOG_E_10),
}
scale[(LOG, LOG10)] = lambda s: scale[(LIN, LOG10)](scale[(LOG, LIN)](s))
scale[(LOG10, LOG)] = lambda s: scale[(LIN, LOG)](scale[(LOG10, LIN)](s))
if (old_scale, new_scale) not in scale:
raise NotImplementedError(
f"Old scale: {old_scale}. New scale: {new_scale}."
)
return scale[(old_scale, new_scale)](sensitivity)
[docs]def create_parameterized_edatas(
amici_model: AmiciModel,
petab_problem: petab.Problem,
problem_parameters: Dict[str, numbers.Number],
scaled_parameters: bool = False,
parameter_mapping: ParameterMapping = None,
simulation_conditions: Union[pd.DataFrame, Dict] = None,
) -> List[amici.ExpData]:
"""Create list of :class:amici.ExpData objects with parameters filled in.
:param amici_model:
AMICI Model assumed to be compatible with ``petab_problem``.
:param petab_problem:
PEtab problem to work on.
:param problem_parameters:
Run simulation with these parameters. If ``None``, PEtab
``nominalValues`` will be used. To be provided as dict, mapping PEtab
problem parameters to SBML IDs.
:param scaled_parameters:
If ``True``, ``problem_parameters`` are assumed to be on the scale
provided in the PEtab parameter table and will be unscaled.
If ``False``, they are assumed to be in linear scale.
:param parameter_mapping:
Optional precomputed PEtab parameter mapping for efficiency, as
generated by :func:`create_parameter_mapping`.
:param simulation_conditions:
Result of :func:`petab.get_simulation_conditions`. Can be provided to
save time if this has been obtained before.
:return:
List with one :class:`amici.amici.ExpData` per simulation condition,
with filled in timepoints, data and parameters.
"""
# number of amici simulations will be number of unique
# (preequilibrationConditionId, simulationConditionId) pairs.
# Can be optimized by checking for identical condition vectors.
if simulation_conditions is None:
simulation_conditions = \
petab_problem.get_simulation_conditions_from_measurement_df()
# Get parameter mapping
if parameter_mapping is None:
parameter_mapping = create_parameter_mapping(
petab_problem=petab_problem,
simulation_conditions=simulation_conditions,
scaled_parameters=scaled_parameters,
amici_model=amici_model)
# Generate ExpData with all condition-specific information
edatas = create_edatas(
amici_model=amici_model,
petab_problem=petab_problem,
simulation_conditions=simulation_conditions)
# Fill parameters in ExpDatas (in-place)
fill_in_parameters(
edatas=edatas,
problem_parameters=problem_parameters,
scaled_parameters=scaled_parameters,
parameter_mapping=parameter_mapping,
amici_model=amici_model)
return edatas
[docs]def create_parameter_mapping(
petab_problem: petab.Problem,
simulation_conditions: Union[pd.DataFrame, List[Dict]],
scaled_parameters: bool,
amici_model: AmiciModel,
**parameter_mapping_kwargs,
) -> ParameterMapping:
"""Generate AMICI specific parameter mapping.
:param petab_problem:
PEtab problem
:param simulation_conditions:
Result of :func:`petab.get_simulation_conditions`. Can be provided to
save time if this has been obtained before.
:param scaled_parameters:
If ``True``, problem_parameters are assumed to be on the scale provided
in the PEtab parameter table and will be unscaled. If ``False``, they
are assumed to be in linear scale.
:param amici_model:
AMICI model.
:param parameter_mapping_kwargs:
Optional keyword arguments passed to
:func:`petab.get_optimization_to_simulation_parameter_mapping`.
To allow changing fixed PEtab problem parameters (``estimate=0``),
use ``fill_fixed_parameters=False``.
:return:
List of the parameter mappings.
"""
if simulation_conditions is None:
simulation_conditions = \
petab_problem.get_simulation_conditions_from_measurement_df()
if isinstance(simulation_conditions, list):
simulation_conditions = pd.DataFrame(data=simulation_conditions)
# Because AMICI globalizes all local parameters during model import,
# we need to do that here as well to prevent parameter mapping errors
# (PEtab does currently not care about SBML LocalParameters)
if petab_problem.sbml_document:
converter_config = libsbml.SBMLLocalParameterConverter() \
.getDefaultProperties()
petab_problem.sbml_document.convert(converter_config)
else:
logger.debug("No petab_problem.sbml_document is set. Cannot convert "
"SBML LocalParameters. If the model contains "
"LocalParameters, parameter mapping will fail.")
default_parameter_mapping_kwargs = {
"warn_unmapped": False,
"scaled_parameters": scaled_parameters,
"allow_timepoint_specific_numeric_noise_parameters":
not petab.lint.observable_table_has_nontrivial_noise_formula(
petab_problem.observable_df),
}
if parameter_mapping_kwargs is None:
parameter_mapping_kwargs = {}
prelim_parameter_mapping = \
petab.get_optimization_to_simulation_parameter_mapping(
condition_df=petab_problem.condition_df,
measurement_df=petab_problem.measurement_df,
parameter_df=petab_problem.parameter_df,
observable_df=petab_problem.observable_df,
model=petab_problem.model,
**dict(default_parameter_mapping_kwargs,
**parameter_mapping_kwargs)
)
parameter_mapping = ParameterMapping()
for (_, condition), prelim_mapping_for_condition in \
zip(simulation_conditions.iterrows(), prelim_parameter_mapping):
mapping_for_condition = create_parameter_mapping_for_condition(
prelim_mapping_for_condition, condition, petab_problem,
amici_model)
parameter_mapping.append(mapping_for_condition)
return parameter_mapping
[docs]def create_parameter_mapping_for_condition(
parameter_mapping_for_condition: petab.ParMappingDictQuadruple,
condition: Union[pd.Series, Dict],
petab_problem: petab.Problem,
amici_model: AmiciModel
) -> ParameterMappingForCondition:
"""Generate AMICI specific parameter mapping for condition.
:param parameter_mapping_for_condition:
Preliminary parameter mapping for condition.
:param condition:
:class:`pandas.DataFrame` row with ``preequilibrationConditionId`` and
``simulationConditionId``.
:param petab_problem:
Underlying PEtab problem.
:param amici_model:
AMICI model.
:return:
The parameter and parameter scale mappings, for fixed
preequilibration, fixed simulation, and variable simulation
parameters, and then the respective scalings.
"""
(condition_map_preeq, condition_map_sim, condition_scale_map_preeq,
condition_scale_map_sim) = parameter_mapping_for_condition
logger.debug(f"PEtab mapping: {parameter_mapping_for_condition}")
if len(condition_map_preeq) != len(condition_scale_map_preeq) \
or len(condition_map_sim) != len(condition_scale_map_sim):
raise AssertionError("Number of parameters and number of parameter "
"scales do not match.")
if len(condition_map_preeq) \
and len(condition_map_preeq) != len(condition_map_sim):
logger.debug(f"Preequilibration parameter map: {condition_map_preeq}")
logger.debug(f"Simulation parameter map: {condition_map_sim}")
raise AssertionError("Number of parameters for preequilbration "
"and simulation do not match.")
##########################################################################
# initial states
# Initial states have been set during model import based on the SBML model.
# If initial states were overwritten in the PEtab condition table, they are
# applied here.
# During model generation, parameters for initial concentrations and
# respective initial assignments have been created for the
# relevant species, here we add these parameters to the parameter mapping.
# In absence of preequilibration this could also be handled via
# ExpData.x0, but in the case of preequilibration this would not allow for
# resetting initial states.
states_in_condition_table = [
col for col in petab_problem.condition_df
if element_is_state(petab_problem.sbml_model, col)
]
if states_in_condition_table:
# set indicator fixed parameter for preeq
# (we expect here, that this parameter was added during import and
# that it was not added by the user with a different meaning...)
if condition_map_preeq:
condition_map_preeq[PREEQ_INDICATOR_ID] = 1.0
condition_scale_map_preeq[PREEQ_INDICATOR_ID] = LIN
condition_map_sim[PREEQ_INDICATOR_ID] = 0.0
condition_scale_map_sim[PREEQ_INDICATOR_ID] = LIN
def _set_initial_state(condition_id, element_id, init_par_id,
par_map, scale_map):
value = petab.to_float_if_float(
petab_problem.condition_df.loc[condition_id, element_id])
if pd.isna(value):
element = petab_problem.sbml_model.getElementBySId(element_id)
type_code = element.getTypeCode()
initial_assignment = petab_problem.sbml_model\
.getInitialAssignmentBySymbol(element_id)
if initial_assignment:
initial_assignment = sp.sympify(
libsbml.formulaToL3String(initial_assignment.getMath()),
locals=_clash
)
if type_code == libsbml.SBML_SPECIES:
value = get_species_initial(element) \
if initial_assignment is None else initial_assignment
elif type_code == libsbml.SBML_PARAMETER:
value = element.getValue()\
if initial_assignment is None else initial_assignment
elif type_code == libsbml.SBML_COMPARTMENT:
value = element.getSize()\
if initial_assignment is None else initial_assignment
else:
raise NotImplementedError(
f"Don't know what how to handle {element_id} in "
"condition table.")
try:
value = float(value)
except (ValueError, TypeError):
if sp.nsimplify(value).is_Atom:
# Get rid of multiplication with one
value = sp.nsimplify(value)
else:
raise NotImplementedError(
"Cannot handle non-trivial initial state "
f"expression for {element_id}: {value}")
# this should be a parameter ID
value = str(value)
logger.debug(f'The species {element_id} has no initial value '
f'defined for the condition {condition_id} in '
'the PEtab conditions table. The initial value is '
f'now set to {value}, which is the initial value '
'defined in the SBML model.')
par_map[init_par_id] = value
if isinstance(value, float):
# numeric initial state
scale_map[init_par_id] = petab.LIN
else:
# parametric initial state
scale_map[init_par_id] = \
petab_problem.parameter_df[PARAMETER_SCALE]\
.get(value, petab.LIN)
for element_id in states_in_condition_table:
# for preequilibration
init_par_id = f'initial_{element_id}_preeq'
if condition.get(PREEQUILIBRATION_CONDITION_ID):
condition_id = condition[PREEQUILIBRATION_CONDITION_ID]
_set_initial_state(
condition_id, element_id, init_par_id, condition_map_preeq,
condition_scale_map_preeq)
else:
# need to set dummy value for preeq parameter anyways, as it
# is expected below (set to 0, not nan, because will be
# multiplied with indicator variable in initial assignment)
condition_map_sim[init_par_id] = 0.0
condition_scale_map_sim[init_par_id] = LIN
# for simulation
condition_id = condition[SIMULATION_CONDITION_ID]
init_par_id = f'initial_{element_id}_sim'
_set_initial_state(
condition_id, element_id, init_par_id, condition_map_sim,
condition_scale_map_sim)
##########################################################################
# separate fixed and variable AMICI parameters, because we may have
# different fixed parameters for preeq and sim condition, but we cannot
# have different variable parameters. without splitting,
# merge_preeq_and_sim_pars_condition below may fail.
# TODO: This can be done already in parameter mapping creation.
variable_par_ids = amici_model.getParameterIds()
fixed_par_ids = amici_model.getFixedParameterIds()
condition_map_preeq_var, condition_map_preeq_fix = \
_subset_dict(condition_map_preeq, variable_par_ids, fixed_par_ids)
condition_scale_map_preeq_var, condition_scale_map_preeq_fix = \
_subset_dict(condition_scale_map_preeq, variable_par_ids, fixed_par_ids)
condition_map_sim_var, condition_map_sim_fix = \
_subset_dict(condition_map_sim, variable_par_ids, fixed_par_ids)
condition_scale_map_sim_var, condition_scale_map_sim_fix = \
_subset_dict(condition_scale_map_sim, variable_par_ids, fixed_par_ids)
logger.debug("Fixed parameters preequilibration: "
f"{condition_map_preeq_fix}")
logger.debug("Fixed parameters simulation: "
f"{condition_map_sim_fix}")
logger.debug("Variable parameters preequilibration: "
f"{condition_map_preeq_var}")
logger.debug("Variable parameters simulation: "
f"{condition_map_sim_var}")
petab.merge_preeq_and_sim_pars_condition(
condition_map_preeq_var, condition_map_sim_var,
condition_scale_map_preeq_var, condition_scale_map_sim_var,
condition)
logger.debug(f"Merged: {condition_map_sim_var}")
parameter_mapping_for_condition = ParameterMappingForCondition(
map_preeq_fix=condition_map_preeq_fix,
map_sim_fix=condition_map_sim_fix,
map_sim_var=condition_map_sim_var,
scale_map_preeq_fix=condition_scale_map_preeq_fix,
scale_map_sim_fix=condition_scale_map_sim_fix,
scale_map_sim_var=condition_scale_map_sim_var
)
return parameter_mapping_for_condition
[docs]def create_edatas(
amici_model: AmiciModel,
petab_problem: petab.Problem,
simulation_conditions: Union[pd.DataFrame, Dict] = None,
) -> List[amici.ExpData]:
"""Create list of :class:`amici.amici.ExpData` objects for PEtab problem.
:param amici_model:
AMICI model.
:param petab_problem:
Underlying PEtab problem.
:param simulation_conditions:
Result of :func:`petab.get_simulation_conditions`. Can be provided to
save time if this has be obtained before.
:return:
List with one :class:`amici.amici.ExpData` per simulation condition,
with filled in timepoints and data.
"""
if simulation_conditions is None:
simulation_conditions = \
petab_problem.get_simulation_conditions_from_measurement_df()
observable_ids = amici_model.getObservableIds()
measurement_dfs = dict(list(
petab_problem.measurement_df.groupby(
[petab.SIMULATION_CONDITION_ID,
petab.PREEQUILIBRATION_CONDITION_ID]
if petab.PREEQUILIBRATION_CONDITION_ID in simulation_conditions
else petab.SIMULATION_CONDITION_ID
)
))
edatas = []
for _, condition in simulation_conditions.iterrows():
# Create amici.ExpData for each simulation
if petab.PREEQUILIBRATION_CONDITION_ID in condition:
measurement_index = (
condition.get(petab.SIMULATION_CONDITION_ID),
condition.get(petab.PREEQUILIBRATION_CONDITION_ID)
)
else:
measurement_index = condition.get(petab.SIMULATION_CONDITION_ID)
edata = create_edata_for_condition(
condition=condition,
amici_model=amici_model,
measurement_df=measurement_dfs[measurement_index],
petab_problem=petab_problem,
observable_ids=observable_ids,
)
edatas.append(edata)
return edatas
[docs]def create_edata_for_condition(
condition: Union[Dict, pd.Series],
measurement_df: pd.DataFrame,
amici_model: AmiciModel,
petab_problem: petab.Problem,
observable_ids: List[str],
) -> amici.ExpData:
"""Get :class:`amici.amici.ExpData` for the given PEtab condition.
Sets timepoints, observed data and sigmas.
:param condition:
:class:`pandas.DataFrame` row with ``preequilibrationConditionId`` and
``simulationConditionId``.
:param measurement_df:
:class:`pandas.DataFrame` with measurements for the given condition.
:param amici_model:
AMICI model
:param petab_problem:
Underlying PEtab problem
:param observable_ids:
List of observable IDs
:return:
ExpData instance.
"""
if amici_model.nytrue != len(observable_ids):
raise AssertionError("Number of AMICI model observables does not "
"match number of PEtab observables.")
# create an ExpData object
edata = amici.ExpData(amici_model)
edata.id = condition[SIMULATION_CONDITION_ID]
if condition.get(PREEQUILIBRATION_CONDITION_ID):
edata.id += "+" + condition.get(PREEQUILIBRATION_CONDITION_ID)
##########################################################################
# enable initial parameters reinitialization
states_in_condition_table = [
col for col in petab_problem.condition_df
if not pd.isna(petab_problem.condition_df.loc[
condition[SIMULATION_CONDITION_ID], col])
and element_is_state(petab_problem.sbml_model, col)
]
if condition.get(PREEQUILIBRATION_CONDITION_ID) \
and states_in_condition_table:
state_ids = amici_model.getStateIds()
state_idx_reinitalization = [state_ids.index(s)
for s in states_in_condition_table]
edata.reinitialization_state_idxs_sim = state_idx_reinitalization
logger.debug("Enabling state reinitialization for condition "
f"{condition.get(PREEQUILIBRATION_CONDITION_ID, '')} - "
f"{condition.get(SIMULATION_CONDITION_ID)} "
f"{states_in_condition_table}")
##########################################################################
# timepoints
# find replicate numbers of time points
timepoints_w_reps = _get_timepoints_with_replicates(
df_for_condition=measurement_df)
edata.setTimepoints(timepoints_w_reps)
##########################################################################
# measurements and sigmas
y, sigma_y = _get_measurements_and_sigmas(
df_for_condition=measurement_df, timepoints_w_reps=timepoints_w_reps,
observable_ids=observable_ids)
edata.setObservedData(y.flatten())
edata.setObservedDataStdDev(sigma_y.flatten())
return edata
def _subset_dict(full: Dict[Any, Any],
*args: Collection[Any]) -> Iterator[Dict[Any, Any]]:
"""Get subset of dictionary based on provided keys
:param full:
Dictionary to subset
:param args:
Collections of keys to be contained in the different subsets
:return:
subsetted dictionary
"""
for keys in args:
yield {key: val for (key, val) in full.items() if key in keys}
def _get_timepoints_with_replicates(
df_for_condition: pd.DataFrame) -> List[numbers.Number]:
"""
Get list of timepoints including replicate measurements
:param df_for_condition:
PEtab measurement table subset for a single condition.
:return:
Sorted list of timepoints, including multiple timepoints accounting
for replicate measurements.
"""
# create sorted list of all timepoints for which measurements exist
timepoints = sorted(df_for_condition[TIME].unique().astype(float))
# find replicate numbers of time points
timepoints_w_reps = []
for time in timepoints:
# subselect for time
df_for_time = df_for_condition[
df_for_condition.time.astype(float) == time
]
# rep number is maximum over rep numbers for observables
n_reps = max(df_for_time.groupby(
[OBSERVABLE_ID, TIME]).size())
# append time point n_rep times
timepoints_w_reps.extend([time] * n_reps)
return timepoints_w_reps
def _get_measurements_and_sigmas(
df_for_condition: pd.DataFrame,
timepoints_w_reps: Sequence[numbers.Number],
observable_ids: Sequence[str],
) -> Tuple[np.array, np.array]:
"""
Get measurements and sigmas
Generate arrays with measurements and sigmas in AMICI format from a
PEtab measurement table subset for a single condition.
:param df_for_condition:
Subset of PEtab measurement table for one condition
:param timepoints_w_reps:
Timepoints for which there exist measurements, including replicates
:param observable_ids:
List of observable IDs for mapping IDs to indices.
:return:
arrays for measurement and sigmas
"""
# prepare measurement matrix
y = np.full(shape=(len(timepoints_w_reps), len(observable_ids)),
fill_value=np.nan)
# prepare sigma matrix
sigma_y = y.copy()
timepoints = sorted(df_for_condition[TIME].unique().astype(float))
for time in timepoints:
# subselect for time
df_for_time = df_for_condition[df_for_condition[TIME] == time]
time_ix_0 = timepoints_w_reps.index(time)
# remember used time indices for each observable
time_ix_for_obs_ix = {}
# iterate over measurements
for _, measurement in df_for_time.iterrows():
# extract observable index
observable_ix = observable_ids.index(measurement[OBSERVABLE_ID])
# update time index for observable
if observable_ix in time_ix_for_obs_ix:
time_ix_for_obs_ix[observable_ix] += 1
else:
time_ix_for_obs_ix[observable_ix] = time_ix_0
# fill observable and possibly noise parameter
y[time_ix_for_obs_ix[observable_ix],
observable_ix] = measurement[MEASUREMENT]
if isinstance(measurement.get(NOISE_PARAMETERS, None),
numbers.Number):
sigma_y[time_ix_for_obs_ix[observable_ix],
observable_ix] = measurement[NOISE_PARAMETERS]
return y, sigma_y
[docs]def rdatas_to_measurement_df(
rdatas: Sequence[amici.ReturnData],
model: AmiciModel,
measurement_df: pd.DataFrame) -> pd.DataFrame:
"""
Create a measurement dataframe in the PEtab format from the passed
``rdatas`` and own information.
:param rdatas:
A sequence of rdatas with the ordering of
:func:`petab.get_simulation_conditions`.
:param model:
AMICI model used to generate ``rdatas``.
:param measurement_df:
PEtab measurement table used to generate ``rdatas``.
:return:
A dataframe built from the rdatas in the format of ``measurement_df``.
"""
simulation_conditions = petab.get_simulation_conditions(
measurement_df)
observable_ids = model.getObservableIds()
rows = []
# iterate over conditions
for (_, condition), rdata in zip(simulation_conditions.iterrows(), rdatas):
# current simulation matrix
y = rdata.y
# time array used in rdata
t = list(rdata.ts)
# extract rows for condition
cur_measurement_df = petab.get_rows_for_condition(
measurement_df, condition)
# iterate over entries for the given condition
# note: this way we only generate a dataframe entry for every
# row that existed in the original dataframe. if we want to
# e.g. have also timepoints non-existent in the original file,
# we need to instead iterate over the rdata['y'] entries
for _, row in cur_measurement_df.iterrows():
# copy row
row_sim = copy.deepcopy(row)
# extract simulated measurement value
timepoint_idx = t.index(row[TIME])
observable_idx = observable_ids.index(row[OBSERVABLE_ID])
measurement_sim = y[timepoint_idx, observable_idx]
# change measurement entry
row_sim[MEASUREMENT] = measurement_sim
rows.append(row_sim)
return pd.DataFrame(rows)
[docs]def rdatas_to_simulation_df(
rdatas: Sequence[amici.ReturnData],
model: AmiciModel,
measurement_df: pd.DataFrame) -> pd.DataFrame:
"""Create a PEtab simulation dataframe from
:class:`amici.amici.ReturnData` s.
See :func:`rdatas_to_measurement_df` for details, only that model outputs
will appear in column ``simulation`` instead of ``measurement``."""
df = rdatas_to_measurement_df(rdatas=rdatas, model=model,
measurement_df=measurement_df)
return df.rename(columns={MEASUREMENT: SIMULATION})
def _default_scaled_parameters(
petab_problem: petab.Problem,
problem_parameters: Optional[Dict[str, float]] = None,
scaled_parameters: bool = False,
) -> Optional[Dict[str, float]]:
"""
Helper method to handle an unscaled or unspecified parameter vector.
The parameter vector defaults to the nominal values in the PEtab
parameter table.
Unscaled parameter values are scaled.
:param petab_problem:
The PEtab problem.
:param problem_parameters:
Keys are PEtab parameter IDs, values are parameter values on the scale
defined in the PEtab parameter table. Defaults to the nominal values in
the PEtab parameter table.
:param scaled_parameters:
Whether `problem_parameters` are on the scale defined in the PEtab
parameter table.
:return:
The scaled parameter vector.
"""
if problem_parameters is None:
problem_parameters = dict(zip(
petab_problem.x_ids,
petab_problem.x_nominal_scaled,
))
elif not scaled_parameters:
problem_parameters = petab_problem.scale_parameters(problem_parameters)
return problem_parameters