Source code for amici.petab_objective

"""
PEtab Objective
---------------
Functionality related to running simulations or evaluating the objective
function as defined by a PEtab problem
"""

import copy
import logging
import numbers
from typing import (List, Sequence, Optional, Dict, Tuple, Union, Any,
                    Collection, Iterator)

import amici
import libsbml
import numpy as np
import pandas as pd
import petab
from petab.C import *  # noqa: F403

from . import AmiciModel, AmiciExpData
from .logging import get_logger, log_execution_time
from .petab_import import PREEQ_INDICATOR_ID
from .parameter_mapping import (
    fill_in_parameters, ParameterMappingForCondition, ParameterMapping)

logger = get_logger(__name__)


# string constant definitions
LLH = 'llh'
SLLH = 'sllh'
FIM = 'fim'
S2LLH = 's2llh'
RES = 'res'
SRES = 'sres'
RDATAS = 'rdatas'


[docs]@log_execution_time('Simulating PEtab model', logger) def simulate_petab( petab_problem: petab.Problem, amici_model: AmiciModel, solver: Optional[amici.Solver] = None, problem_parameters: Optional[Dict[str, float]] = None, simulation_conditions: Union[pd.DataFrame, Dict] = None, edatas: List[AmiciExpData] = None, parameter_mapping: ParameterMapping = None, scaled_parameters: Optional[bool] = False, log_level: int = logging.WARNING ) -> Dict[str, Any]: """Simulate PEtab model. :param petab_problem: PEtab problem to work on. :param amici_model: AMICI Model assumed to be compatible with ``petab_problem``. :param solver: An AMICI solver. Will use default options if None. :param problem_parameters: Run simulation with these parameters. If None, PEtab `nominalValues` will be used). To be provided as dict, mapping PEtab problem parameters to SBML IDs. :param simulation_conditions: Result of `petab.get_simulation_conditions`. Can be provided to save time if this has be obtained before. Not required if `edatas` and `parameter_mapping` are provided. :param edatas: Experimental data. Parameters are inserted in-place for simulation. :param parameter_mapping: Optional precomputed PEtab parameter mapping for efficiency, as generated by `create_parameter_mapping`. :param scaled_parameters: If True, problem_parameters are assumed to be on the scale provided in the PEtab parameter table and will be unscaled. If False, they are assumed to be in linear scale. :param log_level: Log level, see :mod:`amici.logging` module. :return: Dictionary of * cost function value (LLH), * const function sensitivity w.r.t. parameters (SLLH), (**NOTE**: Sensitivities are computed for the scaled parameters) * list of `ReturnData` (RDATAS), corresponding to the different simulation conditions. For ordering of simulation conditions, see :meth:`petab.Problem.get_simulation_conditions_from_measurement_df`. """ logger.setLevel(log_level) if solver is None: solver = amici_model.getSolver() # Get parameters if problem_parameters is None: # Use PEtab nominal values as default problem_parameters = {t.Index: getattr(t, NOMINAL_VALUE) for t in petab_problem.parameter_df.itertuples()} scaled_parameters = False # number of amici simulations will be number of unique # (preequilibrationConditionId, simulationConditionId) pairs. # Can be optimized by checking for identical condition vectors. if simulation_conditions is None and parameter_mapping is None \ and edatas is None: simulation_conditions = \ petab_problem.get_simulation_conditions_from_measurement_df() # Get parameter mapping if parameter_mapping is None: parameter_mapping = create_parameter_mapping( petab_problem=petab_problem, simulation_conditions=simulation_conditions, scaled_parameters=scaled_parameters, amici_model=amici_model) # Get edatas if edatas is None: # Generate ExpData with all condition-specific information edatas = create_edatas( amici_model=amici_model, petab_problem=petab_problem, simulation_conditions=simulation_conditions) # Fill parameters in ExpDatas (in-place) fill_in_parameters( edatas=edatas, problem_parameters=problem_parameters, scaled_parameters=scaled_parameters, parameter_mapping=parameter_mapping, amici_model=amici_model) # Simulate rdatas = amici.runAmiciSimulations(amici_model, solver, edata_list=edatas) # Compute total llh llh = sum(rdata['llh'] for rdata in rdatas) # Compute total sllh sllh = aggregate_sllh(amici_model=amici_model, rdatas=rdatas, parameter_mapping=parameter_mapping) # Log results sim_cond = petab_problem.get_simulation_conditions_from_measurement_df() for i, rdata in enumerate(rdatas): logger.debug(f"Condition: {sim_cond.iloc[i, :].values}, status: " f"{rdata['status']}, llh: {rdata['llh']}") return { LLH: llh, SLLH: sllh, RDATAS: rdatas }
[docs]def create_parameterized_edatas( amici_model: AmiciModel, petab_problem: petab.Problem, problem_parameters: Dict[str, numbers.Number], scaled_parameters: bool = False, parameter_mapping: ParameterMapping = None, simulation_conditions: Union[pd.DataFrame, Dict] = None, ) -> List[amici.ExpData]: """Create list of :class:amici.ExpData objects with parameters filled in. :param amici_model: AMICI Model assumed to be compatible with ``petab_problem``. :param petab_problem: PEtab problem to work on. :param problem_parameters: Run simulation with these parameters. If None, PEtab `nominalValues` will be used). To be provided as dict, mapping PEtab problem parameters to SBML IDs. :param scaled_parameters: If True, problem_parameters are assumed to be on the scale provided in the PEtab parameter table and will be unscaled. If False, they are assumed to be in linear scale. :param parameter_mapping: Optional precomputed PEtab parameter mapping for efficiency, as generated by `create_parameter_mapping`. :param simulation_conditions: Result of `petab.get_simulation_conditions`. Can be provided to save time if this has been obtained before. :return: List with one :class:`amici.amici.ExpData` per simulation condition, with filled in timepoints, data and parameters. """ # number of amici simulations will be number of unique # (preequilibrationConditionId, simulationConditionId) pairs. # Can be optimized by checking for identical condition vectors. if simulation_conditions is None: simulation_conditions = \ petab_problem.get_simulation_conditions_from_measurement_df() # Get parameter mapping if parameter_mapping is None: parameter_mapping = create_parameter_mapping( petab_problem=petab_problem, simulation_conditions=simulation_conditions, scaled_parameters=scaled_parameters, amici_model=amici_model) # Generate ExpData with all condition-specific information edatas = create_edatas( amici_model=amici_model, petab_problem=petab_problem, simulation_conditions=simulation_conditions) # Fill parameters in ExpDatas (in-place) fill_in_parameters( edatas=edatas, problem_parameters=problem_parameters, scaled_parameters=scaled_parameters, parameter_mapping=parameter_mapping, amici_model=amici_model) return edatas
[docs]def create_parameter_mapping( petab_problem: petab.Problem, simulation_conditions: Union[pd.DataFrame, Dict], scaled_parameters: bool, amici_model: AmiciModel, ) -> ParameterMapping: """Generate AMICI specific parameter mapping. :param petab_problem: PEtab problem :param simulation_conditions: Result of `petab.get_simulation_conditions`. Can be provided to save time if this has been obtained before. :param scaled_parameters: If True, problem_parameters are assumed to be on the scale provided in the PEtab parameter table and will be unscaled. If False, they are assumed to be in linear scale. :param amici_model: AMICI model. :return: List of the parameter mappings. """ if simulation_conditions is None: simulation_conditions = \ petab_problem.get_simulation_conditions_from_measurement_df() # Because AMICI globalizes all local parameters during model import, # we need to do that here as well to prevent parameter mapping errors # (PEtab does currently not care about SBML LocalParameters) if petab_problem.sbml_document: converter_config = libsbml.SBMLLocalParameterConverter() \ .getDefaultProperties() petab_problem.sbml_document.convert(converter_config) else: logger.debug("No petab_problem.sbml_document is set. Cannot convert " "SBML LocalParameters. If the model contains " "LocalParameters, parameter mapping will fail.") prelim_parameter_mapping = \ petab_problem.get_optimization_to_simulation_parameter_mapping( warn_unmapped=False, scaled_parameters=scaled_parameters) parameter_mapping = ParameterMapping() for (_, condition), prelim_mapping_for_condition in \ zip(simulation_conditions.iterrows(), prelim_parameter_mapping): mapping_for_condition = create_parameter_mapping_for_condition( prelim_mapping_for_condition, condition, petab_problem, amici_model) parameter_mapping.append(mapping_for_condition) return parameter_mapping
[docs]def create_parameter_mapping_for_condition( parameter_mapping_for_condition: petab.ParMappingDictQuadruple, condition: Union[pd.Series, Dict], petab_problem: petab.Problem, amici_model: AmiciModel ) -> ParameterMappingForCondition: """Generate AMICI specific parameter mapping for condition. :param parameter_mapping_for_condition: Preliminary parameter mapping for condition. :param condition: pandas.DataFrame row with preequilibrationConditionId and simulationConditionId. :param petab_problem: Underlying PEtab problem. :param amici_model: AMICI model. :return: The parameter and parameter scale mappings, for fixed preequilibration, fixed simulation, and variable simulation parameters, and then the respective scalings. """ (condition_map_preeq, condition_map_sim, condition_scale_map_preeq, condition_scale_map_sim) = parameter_mapping_for_condition logger.debug(f"PEtab mapping: {parameter_mapping_for_condition}") if len(condition_map_preeq) != len(condition_scale_map_preeq) \ or len(condition_map_sim) != len(condition_scale_map_sim): raise AssertionError("Number of parameters and number of parameter " "scales do not match.") if len(condition_map_preeq) \ and len(condition_map_preeq) != len(condition_map_sim): logger.debug(f"Preequilibration parameter map: {condition_map_preeq}") logger.debug(f"Simulation parameter map: {condition_map_sim}") raise AssertionError("Number of parameters for preequilbration " "and simulation do not match.") ########################################################################## # initial states # Initial states have been set during model import based on the SBML model. # If initial states were overwritten in the PEtab condition table, they are # applied here. # During model generation, parameters for initial concentrations and # respective initial assignments have been created for the # relevant species, here we add these parameters to the parameter mapping. # In absence of preequilibration this could also be handled via # ExpData.x0, but in the case of preequilibration this would not allow for # resetting initial states. species_in_condition_table = [ col for col in petab_problem.condition_df if petab_problem.sbml_model.getSpecies(col) is not None] if species_in_condition_table: # set indicator fixed parameter for preeq # (we expect here, that this parameter was added during import and # that it was not added by the user with a different meaning...) if condition_map_preeq: condition_map_preeq[PREEQ_INDICATOR_ID] = 1.0 condition_scale_map_preeq[PREEQ_INDICATOR_ID] = LIN condition_map_sim[PREEQ_INDICATOR_ID] = 0.0 condition_scale_map_sim[PREEQ_INDICATOR_ID] = LIN def _set_initial_concentration(condition_id, species_id, init_par_id, par_map, scale_map): value = petab.to_float_if_float( petab_problem.condition_df.loc[condition_id, species_id]) par_map[init_par_id] = value if isinstance(value, float): # numeric initial state scale_map[init_par_id] = petab.LIN else: # parametric initial state scale_map[init_par_id] = petab_problem.parameter_df.loc[ value, PARAMETER_SCALE] for species_id in species_in_condition_table: # for preequilibration init_par_id = f'initial_{species_id}_preeq' if condition.get(PREEQUILIBRATION_CONDITION_ID): condition_id = condition[PREEQUILIBRATION_CONDITION_ID] _set_initial_concentration( condition_id, species_id, init_par_id, condition_map_preeq, condition_scale_map_preeq) else: # need to set dummy value for preeq parameter anyways, as it # is expected below (set to 0, not nan, because will be # multiplied with indicator variable in initial assignment) condition_map_sim[init_par_id] = 0.0 condition_scale_map_sim[init_par_id] = LIN # for simulation condition_id = condition[SIMULATION_CONDITION_ID] init_par_id = f'initial_{species_id}_sim' _set_initial_concentration( condition_id, species_id, init_par_id, condition_map_sim, condition_scale_map_sim) ########################################################################## # separate fixed and variable AMICI parameters, because we may have # different fixed parameters for preeq and sim condition, but we cannot # have different variable parameters. without splitting, # merge_preeq_and_sim_pars_condition below may fail. # TODO: This can be done already in parameter mapping creation. variable_par_ids = amici_model.getParameterIds() fixed_par_ids = amici_model.getFixedParameterIds() condition_map_preeq_var, condition_map_preeq_fix = \ subset_dict(condition_map_preeq, variable_par_ids, fixed_par_ids) condition_scale_map_preeq_var, condition_scale_map_preeq_fix = \ subset_dict(condition_scale_map_preeq, variable_par_ids, fixed_par_ids) condition_map_sim_var, condition_map_sim_fix = \ subset_dict(condition_map_sim, variable_par_ids, fixed_par_ids) condition_scale_map_sim_var, condition_scale_map_sim_fix = \ subset_dict(condition_scale_map_sim, variable_par_ids, fixed_par_ids) logger.debug("Fixed parameters preequilibration: " f"{condition_map_preeq_fix}") logger.debug("Fixed parameters simulation: " f"{condition_map_sim_fix}") logger.debug("Variable parameters preequilibration: " f"{condition_map_preeq_var}") logger.debug("Variable parameters simulation: " f"{condition_map_sim_var}") petab.merge_preeq_and_sim_pars_condition( condition_map_preeq_var, condition_map_sim_var, condition_scale_map_preeq_var, condition_scale_map_sim_var, condition) logger.debug(f"Merged: {condition_map_sim_var}") parameter_mapping_for_condition = ParameterMappingForCondition( map_preeq_fix=condition_map_preeq_fix, map_sim_fix=condition_map_sim_fix, map_sim_var=condition_map_sim_var, scale_map_preeq_fix=condition_scale_map_preeq_fix, scale_map_sim_fix=condition_scale_map_sim_fix, scale_map_sim_var=condition_scale_map_sim_var ) return parameter_mapping_for_condition
[docs]def create_edatas( amici_model: AmiciModel, petab_problem: petab.Problem, simulation_conditions: Union[pd.DataFrame, Dict] = None, ) -> List[amici.ExpData]: """Create list of :class:`amici.amici.ExpData` objects for PEtab problem. :param amici_model: AMICI model. :param petab_problem: Underlying PEtab problem. :param simulation_conditions: Result of `petab.get_simulation_conditions`. Can be provided to save time if this has be obtained before. :return: List with one :class:`amici.amici.ExpData` per simulation condition, with filled in timepoints and data. """ if simulation_conditions is None: simulation_conditions = \ petab_problem.get_simulation_conditions_from_measurement_df() observable_ids = amici_model.getObservableIds() edatas = [] for _, condition in simulation_conditions.iterrows(): # Create amici.ExpData for each simulation edata = create_edata_for_condition( condition=condition, amici_model=amici_model, petab_problem=petab_problem, observable_ids=observable_ids, ) edatas.append(edata) return edatas
[docs]def create_edata_for_condition( condition: Union[Dict, pd.Series], amici_model: AmiciModel, petab_problem: petab.Problem, observable_ids: List[str], ) -> amici.ExpData: """Get :class:`amici.amici.ExpData` for the given PEtab condition. Sets timepoints, observed data and sigmas. :param condition: pandas.DataFrame row with preequilibrationConditionId and simulationConditionId. :param amici_model: AMICI model :param petab_problem: Underlying PEtab problem :param observable_ids: List of observable IDs :return: ExpData instance. """ # extract measurement table rows for condition measurement_df = petab.get_rows_for_condition( measurement_df=petab_problem.measurement_df, condition=condition) if amici_model.nytrue != len(observable_ids): raise AssertionError("Number of AMICI model observables does not " "match number of PEtab observables.") # create an ExpData object edata = amici.ExpData(amici_model) ########################################################################## # enable initial parameters reinitialization species_in_condition_table = any( col for col in petab_problem.condition_df if petab_problem.sbml_model.getSpecies(col) is not None) if condition.get(PREEQUILIBRATION_CONDITION_ID) \ and species_in_condition_table: edata.reinitializeFixedParameterInitialStates = True ########################################################################## # timepoints # find replicate numbers of time points timepoints_w_reps = _get_timepoints_with_replicates( df_for_condition=measurement_df) edata.setTimepoints(timepoints_w_reps) ########################################################################## # measurements and sigmas y, sigma_y = _get_measurements_and_sigmas( df_for_condition=measurement_df, timepoints_w_reps=timepoints_w_reps, observable_ids=observable_ids) edata.setObservedData(y.flatten()) edata.setObservedDataStdDev(sigma_y.flatten()) return edata
[docs]def subset_dict(full: Dict[Any, Any], *args: Collection[Any]) -> Iterator[Dict[Any, Any]]: """Get subset of dictionary based on provided keys :param full: Dictionary to subset :param args: Collections of keys to be contained in the different subsets :return: subsetted dictionary """ for keys in args: yield {key: val for (key, val) in full.items() if key in keys}
def _get_timepoints_with_replicates( df_for_condition: pd.DataFrame) -> List[numbers.Number]: """ Get list of timepoints including replicate measurements :param df_for_condition: PEtab measurement table subset for a single condition. :return: Sorted list of timepoints, including multiple timepoints accounting for replicate measurements. """ # create sorted list of all timepoints for which measurements exist timepoints = sorted(df_for_condition[TIME].unique().astype(float)) # find replicate numbers of time points timepoints_w_reps = [] for time in timepoints: # subselect for time df_for_time = df_for_condition[ df_for_condition.time.astype(float) == time ] # rep number is maximum over rep numbers for observables n_reps = max(df_for_time.groupby( [OBSERVABLE_ID, TIME]).size()) # append time point n_rep times timepoints_w_reps.extend([time] * n_reps) return timepoints_w_reps def _get_measurements_and_sigmas( df_for_condition: pd.DataFrame, timepoints_w_reps: Sequence[numbers.Number], observable_ids: Sequence[str], ) -> Tuple[np.array, np.array]: """ Get measurements and sigmas Generate arrays with measurements and sigmas in AMICI format from a PEtab measurement table subset for a single condition. :param df_for_condition: Subset of PEtab measurement table for one condition :param timepoints_w_reps: Timepoints for which there exist measurements, including replicates :param observable_ids: List of observable IDs for mapping IDs to indices. :return: arrays for measurement and sigmas """ # prepare measurement matrix y = np.full(shape=(len(timepoints_w_reps), len(observable_ids)), fill_value=np.nan) # prepare sigma matrix sigma_y = y.copy() timepoints = sorted(df_for_condition[TIME].unique().astype(float)) for time in timepoints: # subselect for time df_for_time = df_for_condition[df_for_condition[TIME] == time] time_ix_0 = timepoints_w_reps.index(time) # remember used time indices for each observable time_ix_for_obs_ix = {} # iterate over measurements for _, measurement in df_for_time.iterrows(): # extract observable index observable_ix = observable_ids.index(measurement[OBSERVABLE_ID]) # update time index for observable if observable_ix in time_ix_for_obs_ix: time_ix_for_obs_ix[observable_ix] += 1 else: time_ix_for_obs_ix[observable_ix] = time_ix_0 # fill observable and possibly noise parameter y[time_ix_for_obs_ix[observable_ix], observable_ix] = measurement[MEASUREMENT] if isinstance(measurement.get(NOISE_PARAMETERS, None), numbers.Number): sigma_y[time_ix_for_obs_ix[observable_ix], observable_ix] = measurement[NOISE_PARAMETERS] return y, sigma_y
[docs]def rdatas_to_measurement_df( rdatas: Sequence[amici.ReturnData], model: AmiciModel, measurement_df: pd.DataFrame) -> pd.DataFrame: """ Create a measurement dataframe in the PEtab format from the passed `rdatas` and own information. :param rdatas: A sequence of rdatas with the ordering of `petab.get_simulation_conditions`. :param model: AMICI model used to generate `rdatas`. :param measurement_df: PEtab measurement table used to generate `rdatas`. :return: A dataframe built from the rdatas in the format of `measurement_df`. """ df = pd.DataFrame(columns=list(measurement_df.columns)) simulation_conditions = petab.get_simulation_conditions( measurement_df) observable_ids = model.getObservableIds() # iterate over conditions for (_, condition), rdata in zip(simulation_conditions.iterrows(), rdatas): # current simulation matrix y = rdata['y'] # time array used in rdata t = list(rdata['t']) # extract rows for condition cur_measurement_df = petab.get_rows_for_condition( measurement_df, condition) # iterate over entries for the given condition # note: this way we only generate a dataframe entry for every # row that existed in the original dataframe. if we want to # e.g. have also timepoints non-existent in the original file, # we need to instead iterate over the rdata['y'] entries for _, row in cur_measurement_df.iterrows(): # copy row row_sim = copy.deepcopy(row) # extract simulated measurement value timepoint_idx = t.index(row[TIME]) observable_idx = observable_ids.index(row[OBSERVABLE_ID]) measurement_sim = y[timepoint_idx, observable_idx] # change measurement entry row_sim[MEASUREMENT] = measurement_sim # append to dataframe df = df.append(row_sim, ignore_index=True) return df
[docs]def rdatas_to_simulation_df( rdatas: Sequence[amici.ReturnData], model: AmiciModel, measurement_df: pd.DataFrame) -> pd.DataFrame: """Create a PEtab simulation dataframe from amici.ReturnDatas. See ``rdatas_to_measurement_df`` for details, only that model outputs will appear in column "simulation" instead of "measurement".""" df = rdatas_to_measurement_df(rdatas=rdatas, model=model, measurement_df=measurement_df) return df.rename(columns={MEASUREMENT: SIMULATION})
[docs]def aggregate_sllh( amici_model: AmiciModel, rdatas: Sequence[amici.ReturnDataView], parameter_mapping: Optional[ParameterMapping], ) -> Union[None, Dict[str, float]]: """ Aggregate likelihood gradient for all conditions, according to PEtab parameter mapping. :param amici_model: AMICI model from which ``rdatas`` were obtained. :param rdatas: Simulation results. :param parameter_mapping: PEtab parameter mapping to condition-specific simulation parameters :return: aggregated sllh """ sllh = {} model_par_ids = amici_model.getParameterIds() for condition_par_map, rdata in \ zip(parameter_mapping, rdatas): par_map_sim_var = condition_par_map.map_sim_var if rdata['status'] != amici.AMICI_SUCCESS \ or 'sllh' not in rdata \ or rdata['sllh'] is None: return None for model_par_id, problem_par_id in par_map_sim_var.items(): if isinstance(problem_par_id, str): model_par_idx = model_par_ids.index(model_par_id) cur_par_sllh = rdata['sllh'][model_par_idx] try: sllh[problem_par_id] += cur_par_sllh except KeyError: sllh[problem_par_id] = cur_par_sllh return sllh