Source code for amici.petab.simulations

"""Functionality related to simulation of PEtab problems.

Functionality related to running simulations or evaluating the objective
function as defined by a PEtab problem.
"""

import copy
import logging
from typing import Any
from collections.abc import Sequence

import amici
import numpy as np
import pandas as pd
import petab
from petab.C import *  # noqa: F403

from .. import AmiciExpData, AmiciModel
from ..logging import get_logger, log_execution_time

# some extra imports for backward-compatibility
# DEPRECATED: remove in 1.0
from .conditions import (  # noqa # pylint: disable=unused-import
    create_edata_for_condition,
    create_edatas,
    create_parameterized_edatas,
    fill_in_parameters,
)
from .parameter_mapping import (  # noqa # pylint: disable=unused-import
    ParameterMapping,
    create_parameter_mapping,
    create_parameter_mapping_for_condition,
)
from .util import (  # noqa # pylint: disable=unused-import
    get_states_in_condition_table,
)

# END DEPRECATED

try:
    import pysb
except ImportError:
    pysb = None

logger = get_logger(__name__)


# string constant definitions
LLH = "llh"
SLLH = "sllh"
FIM = "fim"
S2LLH = "s2llh"
RES = "res"
SRES = "sres"
RDATAS = "rdatas"
EDATAS = "edatas"


__all__ = [
    "simulate_petab",
    "LLH",
    "SLLH",
    "FIM",
    "S2LLH",
    "RES",
    "SRES",
    "RDATAS",
    "EDATAS",
]


[docs] @log_execution_time("Simulating PEtab model", logger) def simulate_petab( petab_problem: petab.Problem, amici_model: AmiciModel, solver: amici.Solver | None = None, problem_parameters: dict[str, float] | None = None, simulation_conditions: pd.DataFrame | dict = None, edatas: list[AmiciExpData] = None, parameter_mapping: ParameterMapping = None, scaled_parameters: bool | None = False, log_level: int = logging.WARNING, num_threads: int = 1, failfast: bool = True, scaled_gradients: bool = False, ) -> dict[str, Any]: """Simulate PEtab model. .. note:: Regardless of `scaled_parameters`, unscaled sensitivities are returned, unless `scaled_gradients=True`. :param petab_problem: PEtab problem to work on. :param amici_model: AMICI Model assumed to be compatible with ``petab_problem``. :param solver: An AMICI solver. Will use default options if None. :param problem_parameters: Run simulation with these parameters. If ``None``, PEtab ``nominalValues`` will be used. To be provided as dict, mapping PEtab problem parameters to SBML IDs. :param simulation_conditions: Result of :py:func:`petab.get_simulation_conditions`. Can be provided to save time if this has be obtained before. Not required if ``edatas`` and ``parameter_mapping`` are provided. :param edatas: Experimental data. Parameters are inserted in-place for simulation. :param parameter_mapping: Optional precomputed PEtab parameter mapping for efficiency, as generated by :py:func:`create_parameter_mapping` with ``scaled_parameters=True``. :param scaled_parameters: If ``True``, ``problem_parameters`` are assumed to be on the scale provided in the PEtab parameter table and will be unscaled. If ``False``, they are assumed to be in linear scale. If `parameter_mapping` is provided, this must match the value of `scaled_parameters` used to generate the mapping. :param log_level: Log level, see :mod:`amici.logging` module. :param num_threads: Number of threads to use for simulating multiple conditions (only used if compiled with OpenMP). :param failfast: Returns as soon as an integration failure is encountered, skipping any remaining simulations. :param scaled_gradients: Whether to compute gradients on parameter scale (``True``) or not (``False``). :return: Dictionary of * cost function value (``LLH``), * list of :class:`amici.amici.ReturnData` (``RDATAS``), * list of :class:`amici.amici.ExpData` (``EDATAS``), corresponding to the different simulation conditions. For ordering of simulation conditions, see :meth:`petab.Problem.get_simulation_conditions_from_measurement_df`. """ logger.setLevel(log_level) if solver is None: solver = amici_model.getSolver() # number of amici simulations will be number of unique # (preequilibrationConditionId, simulationConditionId) pairs. # Can be optimized by checking for identical condition vectors. if ( simulation_conditions is None and parameter_mapping is None and edatas is None ): simulation_conditions = ( petab_problem.get_simulation_conditions_from_measurement_df() ) # Get parameter mapping if parameter_mapping is None: parameter_mapping = create_parameter_mapping( petab_problem=petab_problem, simulation_conditions=simulation_conditions, # we will always use scaled parameters internally scaled_parameters=True, amici_model=amici_model, ) if problem_parameters is None: # scaled PEtab nominal values problem_parameters = dict( zip( petab_problem.x_ids, petab_problem.x_nominal_scaled, strict=True, ) ) # depending on `fill_fixed_parameters` for parameter mapping, the # parameter mapping may contain values instead of symbols for fixed # parameters. In this case, we need to filter them here to avoid # warnings in `fill_in_parameters`. free_parameters = parameter_mapping.free_symbols problem_parameters = { par_id: par_value for par_id, par_value in problem_parameters.items() if par_id in free_parameters } elif not scaled_parameters: problem_parameters = petab_problem.scale_parameters(problem_parameters) scaled_parameters = True # Get edatas if edatas is None: # Generate ExpData with all condition-specific information edatas = create_edatas( amici_model=amici_model, petab_problem=petab_problem, simulation_conditions=simulation_conditions, ) # Fill parameters in ExpDatas (in-place) fill_in_parameters( edatas=edatas, problem_parameters=problem_parameters, scaled_parameters=scaled_parameters, parameter_mapping=parameter_mapping, amici_model=amici_model, ) # Simulate rdatas = amici.runAmiciSimulations( amici_model, solver, edata_list=edatas, num_threads=num_threads, failfast=failfast, ) # Compute total llh llh = sum(rdata["llh"] for rdata in rdatas) # Compute total sllh sllh = None if solver.getSensitivityOrder() != amici.SensitivityOrder.none: sllh = aggregate_sllh( amici_model=amici_model, rdatas=rdatas, parameter_mapping=parameter_mapping, petab_scale=scaled_parameters, petab_problem=petab_problem, edatas=edatas, ) if not scaled_gradients and sllh is not None: sllh = { parameter_id: rescale_sensitivity( sensitivity=sensitivity, parameter_value=problem_parameters[parameter_id], old_scale=petab_problem.parameter_df.loc[ parameter_id, PARAMETER_SCALE ], new_scale=LIN, ) for parameter_id, sensitivity in sllh.items() } # Log results sim_cond = petab_problem.get_simulation_conditions_from_measurement_df() for i, rdata in enumerate(rdatas): sim_cond_id = "N/A" if sim_cond.empty else sim_cond.iloc[i, :].values logger.debug( f"Condition: {sim_cond_id}, status: {rdata['status']}, " f"llh: {rdata['llh']}" ) return { LLH: llh, SLLH: sllh, RDATAS: rdatas, EDATAS: edatas, }
[docs] def aggregate_sllh( amici_model: AmiciModel, rdatas: Sequence[amici.ReturnDataView], parameter_mapping: ParameterMapping | None, edatas: list[AmiciExpData], petab_scale: bool = True, petab_problem: petab.Problem = None, ) -> None | dict[str, float]: """ Aggregate likelihood gradient for all conditions, according to PEtab parameter mapping. :param amici_model: AMICI model from which ``rdatas`` were obtained. :param rdatas: Simulation results. :param parameter_mapping: PEtab parameter mapping to condition-specific simulation parameters. :param edatas: Experimental data used for simulation. :param petab_scale: Whether to check that sensitivities were computed with parameters on the scales provided in the PEtab parameters table. :param petab_problem: The PEtab problem that defines the parameter scales. :return: Aggregated likelihood sensitivities. """ accumulated_sllh = {} model_parameter_ids = amici_model.getParameterIds() if petab_scale and petab_problem is None: raise ValueError( "Please provide the PEtab problem, when using " "`petab_scale=True`." ) # Check for issues in all condition simulation results. for rdata in rdatas: # Condition failed during simulation. if rdata.status != amici.AMICI_SUCCESS: return None # Condition simulation result does not provide SLLH. if rdata.sllh is None: raise ValueError( "The sensitivities of the likelihood for a condition were " "not computed." ) for condition_parameter_mapping, edata, rdata in zip( parameter_mapping, edatas, rdatas, strict=True ): for sllh_parameter_index, condition_parameter_sllh in enumerate( rdata.sllh ): # Get PEtab parameter ID # Use ExpData if it provides a parameter list, else default to # Model. if edata.plist: model_parameter_index = edata.plist[sllh_parameter_index] else: model_parameter_index = amici_model.plist(sllh_parameter_index) model_parameter_id = model_parameter_ids[model_parameter_index] petab_parameter_id = condition_parameter_mapping.map_sim_var[ model_parameter_id ] # Initialize if petab_parameter_id not in accumulated_sllh: accumulated_sllh[petab_parameter_id] = 0 # Check that the scale is consistent if petab_scale: # `ParameterMappingForCondition` objects provide the scale in # terms of `petab.C` constants already, not AMICI equivalents. model_parameter_scale = ( condition_parameter_mapping.scale_map_sim_var[ model_parameter_id ] ) petab_parameter_scale = petab_problem.parameter_df.loc[ petab_parameter_id, PARAMETER_SCALE ] if model_parameter_scale != petab_parameter_scale: raise ValueError( f"The scale of the parameter `{petab_parameter_id}` " "differs between the AMICI model " f"({model_parameter_scale}) and the PEtab problem " f"({petab_parameter_scale})." ) # Accumulate accumulated_sllh[petab_parameter_id] += condition_parameter_sllh return accumulated_sllh
[docs] def rescale_sensitivity( sensitivity: float, parameter_value: float, old_scale: str, new_scale: str, ) -> float: """Rescale a sensitivity between parameter scales. :param sensitivity: The sensitivity corresponding to the parameter value. :param parameter_value: The parameter vector element, on ``old_scale``. :param old_scale: The scale of the parameter value. :param new_scale: The parameter scale on which to rescale the sensitivity. :return: The rescaled sensitivity. """ LOG_E_10 = np.log(10) if old_scale == new_scale: return sensitivity unscaled_parameter_value = petab.parameters.unscale( parameter=parameter_value, scale_str=old_scale, ) scale = { (LIN, LOG): lambda s: s * unscaled_parameter_value, (LOG, LIN): lambda s: s / unscaled_parameter_value, (LIN, LOG10): lambda s: s * (unscaled_parameter_value * LOG_E_10), (LOG10, LIN): lambda s: s / (unscaled_parameter_value * LOG_E_10), } scale[(LOG, LOG10)] = lambda s: scale[(LIN, LOG10)](scale[(LOG, LIN)](s)) scale[(LOG10, LOG)] = lambda s: scale[(LIN, LOG)](scale[(LOG10, LIN)](s)) if (old_scale, new_scale) not in scale: raise NotImplementedError( f"Old scale: {old_scale}. New scale: {new_scale}." ) return scale[(old_scale, new_scale)](sensitivity)
[docs] def rdatas_to_measurement_df( rdatas: Sequence[amici.ReturnData], model: AmiciModel, measurement_df: pd.DataFrame, ) -> pd.DataFrame: """ Create a measurement dataframe in the PEtab format from the passed ``rdatas`` and own information. :param rdatas: A sequence of rdatas with the ordering of :func:`petab.get_simulation_conditions`. :param model: AMICI model used to generate ``rdatas``. :param measurement_df: PEtab measurement table used to generate ``rdatas``. :return: A dataframe built from the rdatas in the format of ``measurement_df``. """ simulation_conditions = petab.get_simulation_conditions(measurement_df) observable_ids = model.getObservableIds() rows = [] # iterate over conditions for (_, condition), rdata in zip( simulation_conditions.iterrows(), rdatas, strict=True ): # current simulation matrix y = rdata.y # time array used in rdata t = list(rdata.ts) # extract rows for condition cur_measurement_df = petab.get_rows_for_condition( measurement_df, condition ) # iterate over entries for the given condition # note: this way we only generate a dataframe entry for every # row that existed in the original dataframe. if we want to # e.g. have also timepoints non-existent in the original file, # we need to instead iterate over the rdata['y'] entries for _, row in cur_measurement_df.iterrows(): # copy row row_sim = copy.deepcopy(row) # extract simulated measurement value timepoint_idx = t.index(row[TIME]) observable_idx = observable_ids.index(row[OBSERVABLE_ID]) measurement_sim = y[timepoint_idx, observable_idx] # change measurement entry row_sim[MEASUREMENT] = measurement_sim rows.append(row_sim) return pd.DataFrame(rows)
[docs] def rdatas_to_simulation_df( rdatas: Sequence[amici.ReturnData], model: AmiciModel, measurement_df: pd.DataFrame, ) -> pd.DataFrame: """Create a PEtab simulation dataframe from :class:`amici.amici.ReturnData` s. See :func:`rdatas_to_measurement_df` for details, only that model outputs will appear in column ``simulation`` instead of ``measurement``.""" df = rdatas_to_measurement_df( rdatas=rdatas, model=model, measurement_df=measurement_df ) return df.rename(columns={MEASUREMENT: SIMULATION})