"""
PEtab Objective
---------------
Functionality related to running simulations or evaluating the objective
function as defined by a PEtab problem
"""
import copy
import logging
import numbers
from typing import (List, Sequence, Optional, Dict, Tuple, Union, Any,
Collection, Iterator)
import amici
from amici.sbml_import import get_species_initial
import libsbml
import numpy as np
import pandas as pd
import petab
import sympy as sp
from petab.C import * # noqa: F403
from . import AmiciModel, AmiciExpData
from .logging import get_logger, log_execution_time
from .petab_import import PREEQ_INDICATOR_ID
from .parameter_mapping import (
fill_in_parameters, ParameterMappingForCondition, ParameterMapping)
logger = get_logger(__name__)
# string constant definitions
LLH = 'llh'
SLLH = 'sllh'
FIM = 'fim'
S2LLH = 's2llh'
RES = 'res'
SRES = 'sres'
RDATAS = 'rdatas'
[docs]@log_execution_time('Simulating PEtab model', logger)
def simulate_petab(
petab_problem: petab.Problem,
amici_model: AmiciModel,
solver: Optional[amici.Solver] = None,
problem_parameters: Optional[Dict[str, float]] = None,
simulation_conditions: Union[pd.DataFrame, Dict] = None,
edatas: List[AmiciExpData] = None,
parameter_mapping: ParameterMapping = None,
scaled_parameters: Optional[bool] = False,
log_level: int = logging.WARNING,
num_threads: int = 1,
failfast: bool = True,
) -> Dict[str, Any]:
"""Simulate PEtab model.
:param petab_problem:
PEtab problem to work on.
:param amici_model:
AMICI Model assumed to be compatible with ``petab_problem``.
:param solver:
An AMICI solver. Will use default options if None.
:param problem_parameters:
Run simulation with these parameters. If None, PEtab `nominalValues`
will be used). To be provided as dict, mapping PEtab problem
parameters to SBML IDs.
:param simulation_conditions:
Result of `petab.get_simulation_conditions`. Can be provided to save
time if this has be obtained before.
Not required if `edatas` and `parameter_mapping` are provided.
:param edatas:
Experimental data. Parameters are inserted in-place for simulation.
:param parameter_mapping:
Optional precomputed PEtab parameter mapping for efficiency, as
generated by `create_parameter_mapping`.
:param scaled_parameters:
If True, problem_parameters are assumed to be on the scale provided
in the PEtab parameter table and will be unscaled. If False, they
are assumed to be in linear scale.
:param log_level:
Log level, see :mod:`amici.logging` module.
:param num_threads:
Number of threads to use for simulating multiple conditions
(only used if compiled with OpenMP).
:param failfast:
Returns as soon as an integration failure is encountered, skipping
any remaining simulations.
:return:
Dictionary of
* cost function value (LLH),
* const function sensitivity w.r.t. parameters (SLLH),
(**NOTE**: Sensitivities are computed for the scaled parameters)
* list of `ReturnData` (RDATAS),
corresponding to the different simulation conditions.
For ordering of simulation conditions, see
:meth:`petab.Problem.get_simulation_conditions_from_measurement_df`.
"""
logger.setLevel(log_level)
if solver is None:
solver = amici_model.getSolver()
# Get parameters
if problem_parameters is None:
# Use PEtab nominal values as default
problem_parameters = {t.Index: getattr(t, NOMINAL_VALUE) for t in
petab_problem.parameter_df.itertuples()}
scaled_parameters = False
# number of amici simulations will be number of unique
# (preequilibrationConditionId, simulationConditionId) pairs.
# Can be optimized by checking for identical condition vectors.
if simulation_conditions is None and parameter_mapping is None \
and edatas is None:
simulation_conditions = \
petab_problem.get_simulation_conditions_from_measurement_df()
# Get parameter mapping
if parameter_mapping is None:
parameter_mapping = create_parameter_mapping(
petab_problem=petab_problem,
simulation_conditions=simulation_conditions,
scaled_parameters=scaled_parameters,
amici_model=amici_model)
# Get edatas
if edatas is None:
# Generate ExpData with all condition-specific information
edatas = create_edatas(
amici_model=amici_model,
petab_problem=petab_problem,
simulation_conditions=simulation_conditions)
# Fill parameters in ExpDatas (in-place)
fill_in_parameters(
edatas=edatas,
problem_parameters=problem_parameters,
scaled_parameters=scaled_parameters,
parameter_mapping=parameter_mapping,
amici_model=amici_model)
# Simulate
rdatas = amici.runAmiciSimulations(
amici_model, solver, edata_list=edatas,
num_threads=num_threads, failfast=failfast)
# Compute total llh
llh = sum(rdata['llh'] for rdata in rdatas)
# Log results
sim_cond = petab_problem.get_simulation_conditions_from_measurement_df()
for i, rdata in enumerate(rdatas):
logger.debug(f"Condition: {sim_cond.iloc[i, :].values}, status: "
f"{rdata['status']}, llh: {rdata['llh']}")
return {
LLH: llh,
RDATAS: rdatas
}
[docs]def create_parameterized_edatas(
amici_model: AmiciModel,
petab_problem: petab.Problem,
problem_parameters: Dict[str, numbers.Number],
scaled_parameters: bool = False,
parameter_mapping: ParameterMapping = None,
simulation_conditions: Union[pd.DataFrame, Dict] = None,
) -> List[amici.ExpData]:
"""Create list of :class:amici.ExpData objects with parameters filled in.
:param amici_model:
AMICI Model assumed to be compatible with ``petab_problem``.
:param petab_problem:
PEtab problem to work on.
:param problem_parameters:
Run simulation with these parameters. If None, PEtab `nominalValues`
will be used). To be provided as dict, mapping PEtab problem
parameters to SBML IDs.
:param scaled_parameters:
If True, problem_parameters are assumed to be on the scale provided
in the PEtab parameter table and will be unscaled. If False, they
are assumed to be in linear scale.
:param parameter_mapping:
Optional precomputed PEtab parameter mapping for efficiency, as
generated by `create_parameter_mapping`.
:param simulation_conditions:
Result of `petab.get_simulation_conditions`. Can be provided to save
time if this has been obtained before.
:return:
List with one :class:`amici.amici.ExpData` per simulation condition,
with filled in timepoints, data and parameters.
"""
# number of amici simulations will be number of unique
# (preequilibrationConditionId, simulationConditionId) pairs.
# Can be optimized by checking for identical condition vectors.
if simulation_conditions is None:
simulation_conditions = \
petab_problem.get_simulation_conditions_from_measurement_df()
# Get parameter mapping
if parameter_mapping is None:
parameter_mapping = create_parameter_mapping(
petab_problem=petab_problem,
simulation_conditions=simulation_conditions,
scaled_parameters=scaled_parameters,
amici_model=amici_model)
# Generate ExpData with all condition-specific information
edatas = create_edatas(
amici_model=amici_model,
petab_problem=petab_problem,
simulation_conditions=simulation_conditions)
# Fill parameters in ExpDatas (in-place)
fill_in_parameters(
edatas=edatas,
problem_parameters=problem_parameters,
scaled_parameters=scaled_parameters,
parameter_mapping=parameter_mapping,
amici_model=amici_model)
return edatas
[docs]def create_parameter_mapping(
petab_problem: petab.Problem,
simulation_conditions: Union[pd.DataFrame, Dict],
scaled_parameters: bool,
amici_model: AmiciModel,
) -> ParameterMapping:
"""Generate AMICI specific parameter mapping.
:param petab_problem:
PEtab problem
:param simulation_conditions:
Result of `petab.get_simulation_conditions`. Can be provided to save
time if this has been obtained before.
:param scaled_parameters:
If True, problem_parameters are assumed to be on the scale provided
in the PEtab parameter table and will be unscaled. If False, they
are assumed to be in linear scale.
:param amici_model:
AMICI model.
:return:
List of the parameter mappings.
"""
if simulation_conditions is None:
simulation_conditions = \
petab_problem.get_simulation_conditions_from_measurement_df()
# Because AMICI globalizes all local parameters during model import,
# we need to do that here as well to prevent parameter mapping errors
# (PEtab does currently not care about SBML LocalParameters)
if petab_problem.sbml_document:
converter_config = libsbml.SBMLLocalParameterConverter() \
.getDefaultProperties()
petab_problem.sbml_document.convert(converter_config)
else:
logger.debug("No petab_problem.sbml_document is set. Cannot convert "
"SBML LocalParameters. If the model contains "
"LocalParameters, parameter mapping will fail.")
prelim_parameter_mapping = \
petab_problem.get_optimization_to_simulation_parameter_mapping(
warn_unmapped=False, scaled_parameters=scaled_parameters,
allow_timepoint_specific_numeric_noise_parameters=
not petab.lint.observable_table_has_nontrivial_noise_formula(
petab_problem.observable_df
)
)
parameter_mapping = ParameterMapping()
for (_, condition), prelim_mapping_for_condition in \
zip(simulation_conditions.iterrows(), prelim_parameter_mapping):
mapping_for_condition = create_parameter_mapping_for_condition(
prelim_mapping_for_condition, condition, petab_problem,
amici_model)
parameter_mapping.append(mapping_for_condition)
return parameter_mapping
[docs]def create_parameter_mapping_for_condition(
parameter_mapping_for_condition: petab.ParMappingDictQuadruple,
condition: Union[pd.Series, Dict],
petab_problem: petab.Problem,
amici_model: AmiciModel
) -> ParameterMappingForCondition:
"""Generate AMICI specific parameter mapping for condition.
:param parameter_mapping_for_condition:
Preliminary parameter mapping for condition.
:param condition:
pandas.DataFrame row with preequilibrationConditionId and
simulationConditionId.
:param petab_problem:
Underlying PEtab problem.
:param amici_model:
AMICI model.
:return:
The parameter and parameter scale mappings, for fixed
preequilibration, fixed simulation, and variable simulation
parameters, and then the respective scalings.
"""
(condition_map_preeq, condition_map_sim, condition_scale_map_preeq,
condition_scale_map_sim) = parameter_mapping_for_condition
logger.debug(f"PEtab mapping: {parameter_mapping_for_condition}")
if len(condition_map_preeq) != len(condition_scale_map_preeq) \
or len(condition_map_sim) != len(condition_scale_map_sim):
raise AssertionError("Number of parameters and number of parameter "
"scales do not match.")
if len(condition_map_preeq) \
and len(condition_map_preeq) != len(condition_map_sim):
logger.debug(f"Preequilibration parameter map: {condition_map_preeq}")
logger.debug(f"Simulation parameter map: {condition_map_sim}")
raise AssertionError("Number of parameters for preequilbration "
"and simulation do not match.")
##########################################################################
# initial states
# Initial states have been set during model import based on the SBML model.
# If initial states were overwritten in the PEtab condition table, they are
# applied here.
# During model generation, parameters for initial concentrations and
# respective initial assignments have been created for the
# relevant species, here we add these parameters to the parameter mapping.
# In absence of preequilibration this could also be handled via
# ExpData.x0, but in the case of preequilibration this would not allow for
# resetting initial states.
species_in_condition_table = [
col for col in petab_problem.condition_df
if petab_problem.sbml_model.getSpecies(col) is not None]
if species_in_condition_table:
# set indicator fixed parameter for preeq
# (we expect here, that this parameter was added during import and
# that it was not added by the user with a different meaning...)
if condition_map_preeq:
condition_map_preeq[PREEQ_INDICATOR_ID] = 1.0
condition_scale_map_preeq[PREEQ_INDICATOR_ID] = LIN
condition_map_sim[PREEQ_INDICATOR_ID] = 0.0
condition_scale_map_sim[PREEQ_INDICATOR_ID] = LIN
def _set_initial_concentration(condition_id, species_id, init_par_id,
par_map, scale_map):
value = petab.to_float_if_float(
petab_problem.condition_df.loc[condition_id, species_id])
if pd.isna(value):
value = get_species_initial(
petab_problem.sbml_model.getSpecies(species_id)
)
try:
value = float(value)
except (ValueError, TypeError):
if sp.nsimplify(value).is_Atom:
# Get rid of multiplication with one
value = sp.nsimplify(value)
else:
raise NotImplementedError(
"Cannot handle non-trivial expressions for "
f"species initial for {species_id}: {value}")
# this should be a parameter ID
value = str(value)
logger.debug(f'The species {species_id} has no initial value '
f'defined for the condition {condition_id} in '
'the PEtab conditions table. The initial value is '
f'now set to {value}, which is the initial value '
'defined in the SBML model.')
par_map[init_par_id] = value
if isinstance(value, float):
# numeric initial state
scale_map[init_par_id] = petab.LIN
else:
# parametric initial state
scale_map[init_par_id] = petab_problem.parameter_df.loc[
value, PARAMETER_SCALE]
for species_id in species_in_condition_table:
# for preequilibration
init_par_id = f'initial_{species_id}_preeq'
if condition.get(PREEQUILIBRATION_CONDITION_ID):
condition_id = condition[PREEQUILIBRATION_CONDITION_ID]
_set_initial_concentration(
condition_id, species_id, init_par_id, condition_map_preeq,
condition_scale_map_preeq)
else:
# need to set dummy value for preeq parameter anyways, as it
# is expected below (set to 0, not nan, because will be
# multiplied with indicator variable in initial assignment)
condition_map_sim[init_par_id] = 0.0
condition_scale_map_sim[init_par_id] = LIN
# for simulation
condition_id = condition[SIMULATION_CONDITION_ID]
init_par_id = f'initial_{species_id}_sim'
_set_initial_concentration(
condition_id, species_id, init_par_id, condition_map_sim,
condition_scale_map_sim)
##########################################################################
# separate fixed and variable AMICI parameters, because we may have
# different fixed parameters for preeq and sim condition, but we cannot
# have different variable parameters. without splitting,
# merge_preeq_and_sim_pars_condition below may fail.
# TODO: This can be done already in parameter mapping creation.
variable_par_ids = amici_model.getParameterIds()
fixed_par_ids = amici_model.getFixedParameterIds()
condition_map_preeq_var, condition_map_preeq_fix = \
subset_dict(condition_map_preeq, variable_par_ids, fixed_par_ids)
condition_scale_map_preeq_var, condition_scale_map_preeq_fix = \
subset_dict(condition_scale_map_preeq, variable_par_ids, fixed_par_ids)
condition_map_sim_var, condition_map_sim_fix = \
subset_dict(condition_map_sim, variable_par_ids, fixed_par_ids)
condition_scale_map_sim_var, condition_scale_map_sim_fix = \
subset_dict(condition_scale_map_sim, variable_par_ids, fixed_par_ids)
logger.debug("Fixed parameters preequilibration: "
f"{condition_map_preeq_fix}")
logger.debug("Fixed parameters simulation: "
f"{condition_map_sim_fix}")
logger.debug("Variable parameters preequilibration: "
f"{condition_map_preeq_var}")
logger.debug("Variable parameters simulation: "
f"{condition_map_sim_var}")
petab.merge_preeq_and_sim_pars_condition(
condition_map_preeq_var, condition_map_sim_var,
condition_scale_map_preeq_var, condition_scale_map_sim_var,
condition)
logger.debug(f"Merged: {condition_map_sim_var}")
parameter_mapping_for_condition = ParameterMappingForCondition(
map_preeq_fix=condition_map_preeq_fix,
map_sim_fix=condition_map_sim_fix,
map_sim_var=condition_map_sim_var,
scale_map_preeq_fix=condition_scale_map_preeq_fix,
scale_map_sim_fix=condition_scale_map_sim_fix,
scale_map_sim_var=condition_scale_map_sim_var
)
return parameter_mapping_for_condition
[docs]def create_edatas(
amici_model: AmiciModel,
petab_problem: petab.Problem,
simulation_conditions: Union[pd.DataFrame, Dict] = None,
) -> List[amici.ExpData]:
"""Create list of :class:`amici.amici.ExpData` objects for PEtab problem.
:param amici_model:
AMICI model.
:param petab_problem:
Underlying PEtab problem.
:param simulation_conditions:
Result of `petab.get_simulation_conditions`. Can be provided to save
time if this has be obtained before.
:return:
List with one :class:`amici.amici.ExpData` per simulation condition,
with filled in timepoints and data.
"""
if simulation_conditions is None:
simulation_conditions = \
petab_problem.get_simulation_conditions_from_measurement_df()
observable_ids = amici_model.getObservableIds()
edatas = []
for _, condition in simulation_conditions.iterrows():
# Create amici.ExpData for each simulation
edata = create_edata_for_condition(
condition=condition,
amici_model=amici_model,
petab_problem=petab_problem,
observable_ids=observable_ids,
)
edatas.append(edata)
return edatas
[docs]def create_edata_for_condition(
condition: Union[Dict, pd.Series],
amici_model: AmiciModel,
petab_problem: petab.Problem,
observable_ids: List[str],
) -> amici.ExpData:
"""Get :class:`amici.amici.ExpData` for the given PEtab condition.
Sets timepoints, observed data and sigmas.
:param condition:
pandas.DataFrame row with preequilibrationConditionId and
simulationConditionId.
:param amici_model:
AMICI model
:param petab_problem:
Underlying PEtab problem
:param observable_ids:
List of observable IDs
:return:
ExpData instance.
"""
# extract measurement table rows for condition
measurement_df = petab.get_rows_for_condition(
measurement_df=petab_problem.measurement_df, condition=condition)
if amici_model.nytrue != len(observable_ids):
raise AssertionError("Number of AMICI model observables does not "
"match number of PEtab observables.")
# create an ExpData object
edata = amici.ExpData(amici_model)
##########################################################################
# enable initial parameters reinitialization
species_in_condition_table = [
col for col in petab_problem.condition_df
if not pd.isna(petab_problem.condition_df.loc[
condition[SIMULATION_CONDITION_ID], col])
and petab_problem.sbml_model.getSpecies(col) is not None
]
if condition.get(PREEQUILIBRATION_CONDITION_ID) \
and species_in_condition_table:
state_ids = amici_model.getStateIds()
state_idx_reinitalization = [state_ids.index(s)
for s in species_in_condition_table]
edata.reinitialization_state_idxs_sim = state_idx_reinitalization
logger.debug("Enabling state reinitialization for condition "
f"{condition.get(PREEQUILIBRATION_CONDITION_ID, '')} - "
f"{condition.get(SIMULATION_CONDITION_ID)} "
f"{species_in_condition_table}")
##########################################################################
# timepoints
# find replicate numbers of time points
timepoints_w_reps = _get_timepoints_with_replicates(
df_for_condition=measurement_df)
edata.setTimepoints(timepoints_w_reps)
##########################################################################
# measurements and sigmas
y, sigma_y = _get_measurements_and_sigmas(
df_for_condition=measurement_df, timepoints_w_reps=timepoints_w_reps,
observable_ids=observable_ids)
edata.setObservedData(y.flatten())
edata.setObservedDataStdDev(sigma_y.flatten())
return edata
[docs]def subset_dict(full: Dict[Any, Any],
*args: Collection[Any]) -> Iterator[Dict[Any, Any]]:
"""Get subset of dictionary based on provided keys
:param full:
Dictionary to subset
:param args:
Collections of keys to be contained in the different subsets
:return:
subsetted dictionary
"""
for keys in args:
yield {key: val for (key, val) in full.items() if key in keys}
def _get_timepoints_with_replicates(
df_for_condition: pd.DataFrame) -> List[numbers.Number]:
"""
Get list of timepoints including replicate measurements
:param df_for_condition:
PEtab measurement table subset for a single condition.
:return:
Sorted list of timepoints, including multiple timepoints accounting
for replicate measurements.
"""
# create sorted list of all timepoints for which measurements exist
timepoints = sorted(df_for_condition[TIME].unique().astype(float))
# find replicate numbers of time points
timepoints_w_reps = []
for time in timepoints:
# subselect for time
df_for_time = df_for_condition[
df_for_condition.time.astype(float) == time
]
# rep number is maximum over rep numbers for observables
n_reps = max(df_for_time.groupby(
[OBSERVABLE_ID, TIME]).size())
# append time point n_rep times
timepoints_w_reps.extend([time] * n_reps)
return timepoints_w_reps
def _get_measurements_and_sigmas(
df_for_condition: pd.DataFrame,
timepoints_w_reps: Sequence[numbers.Number],
observable_ids: Sequence[str],
) -> Tuple[np.array, np.array]:
"""
Get measurements and sigmas
Generate arrays with measurements and sigmas in AMICI format from a
PEtab measurement table subset for a single condition.
:param df_for_condition:
Subset of PEtab measurement table for one condition
:param timepoints_w_reps:
Timepoints for which there exist measurements, including replicates
:param observable_ids:
List of observable IDs for mapping IDs to indices.
:return:
arrays for measurement and sigmas
"""
# prepare measurement matrix
y = np.full(shape=(len(timepoints_w_reps), len(observable_ids)),
fill_value=np.nan)
# prepare sigma matrix
sigma_y = y.copy()
timepoints = sorted(df_for_condition[TIME].unique().astype(float))
for time in timepoints:
# subselect for time
df_for_time = df_for_condition[df_for_condition[TIME] == time]
time_ix_0 = timepoints_w_reps.index(time)
# remember used time indices for each observable
time_ix_for_obs_ix = {}
# iterate over measurements
for _, measurement in df_for_time.iterrows():
# extract observable index
observable_ix = observable_ids.index(measurement[OBSERVABLE_ID])
# update time index for observable
if observable_ix in time_ix_for_obs_ix:
time_ix_for_obs_ix[observable_ix] += 1
else:
time_ix_for_obs_ix[observable_ix] = time_ix_0
# fill observable and possibly noise parameter
y[time_ix_for_obs_ix[observable_ix],
observable_ix] = measurement[MEASUREMENT]
if isinstance(measurement.get(NOISE_PARAMETERS, None),
numbers.Number):
sigma_y[time_ix_for_obs_ix[observable_ix],
observable_ix] = measurement[NOISE_PARAMETERS]
return y, sigma_y
[docs]def rdatas_to_measurement_df(
rdatas: Sequence[amici.ReturnData],
model: AmiciModel,
measurement_df: pd.DataFrame) -> pd.DataFrame:
"""
Create a measurement dataframe in the PEtab format from the passed
`rdatas` and own information.
:param rdatas:
A sequence of rdatas with the ordering of
`petab.get_simulation_conditions`.
:param model:
AMICI model used to generate `rdatas`.
:param measurement_df:
PEtab measurement table used to generate `rdatas`.
:return:
A dataframe built from the rdatas in the format of `measurement_df`.
"""
df = pd.DataFrame(columns=list(measurement_df.columns))
simulation_conditions = petab.get_simulation_conditions(
measurement_df)
observable_ids = model.getObservableIds()
# iterate over conditions
for (_, condition), rdata in zip(simulation_conditions.iterrows(), rdatas):
# current simulation matrix
y = rdata['y']
# time array used in rdata
t = list(rdata['t'])
# extract rows for condition
cur_measurement_df = petab.get_rows_for_condition(
measurement_df, condition)
# iterate over entries for the given condition
# note: this way we only generate a dataframe entry for every
# row that existed in the original dataframe. if we want to
# e.g. have also timepoints non-existent in the original file,
# we need to instead iterate over the rdata['y'] entries
for _, row in cur_measurement_df.iterrows():
# copy row
row_sim = copy.deepcopy(row)
# extract simulated measurement value
timepoint_idx = t.index(row[TIME])
observable_idx = observable_ids.index(row[OBSERVABLE_ID])
measurement_sim = y[timepoint_idx, observable_idx]
# change measurement entry
row_sim[MEASUREMENT] = measurement_sim
# append to dataframe
df = df.append(row_sim, ignore_index=True)
return df
[docs]def rdatas_to_simulation_df(
rdatas: Sequence[amici.ReturnData],
model: AmiciModel,
measurement_df: pd.DataFrame) -> pd.DataFrame:
"""Create a PEtab simulation dataframe from amici.ReturnDatas.
See ``rdatas_to_measurement_df`` for details, only that model outputs
will appear in column "simulation" instead of "measurement"."""
df = rdatas_to_measurement_df(rdatas=rdatas, model=model,
measurement_df=measurement_df)
return df.rename(columns={MEASUREMENT: SIMULATION})