Source code for amici.petab_objective

"""
PEtab Objective
---------------
Functionality related to running simulations or evaluating the objective
function as defined by a PEtab problem
"""

import copy
import logging
import numbers
from typing import (List, Sequence, Optional, Dict, Tuple, Union, Any,
                    Collection, Iterator)

import amici
from amici.sbml_import import get_species_initial
import libsbml
import numpy as np
import pandas as pd
import petab
import sympy as sp
from petab.C import *  # noqa: F403

from . import AmiciModel, AmiciExpData
from .logging import get_logger, log_execution_time
from .petab_import import PREEQ_INDICATOR_ID
from .parameter_mapping import (
    fill_in_parameters, ParameterMappingForCondition, ParameterMapping)

logger = get_logger(__name__)


# string constant definitions
LLH = 'llh'
SLLH = 'sllh'
FIM = 'fim'
S2LLH = 's2llh'
RES = 'res'
SRES = 'sres'
RDATAS = 'rdatas'


[docs]@log_execution_time('Simulating PEtab model', logger) def simulate_petab( petab_problem: petab.Problem, amici_model: AmiciModel, solver: Optional[amici.Solver] = None, problem_parameters: Optional[Dict[str, float]] = None, simulation_conditions: Union[pd.DataFrame, Dict] = None, edatas: List[AmiciExpData] = None, parameter_mapping: ParameterMapping = None, scaled_parameters: Optional[bool] = False, log_level: int = logging.WARNING, num_threads: int = 1, failfast: bool = True, ) -> Dict[str, Any]: """Simulate PEtab model. :param petab_problem: PEtab problem to work on. :param amici_model: AMICI Model assumed to be compatible with ``petab_problem``. :param solver: An AMICI solver. Will use default options if None. :param problem_parameters: Run simulation with these parameters. If None, PEtab `nominalValues` will be used). To be provided as dict, mapping PEtab problem parameters to SBML IDs. :param simulation_conditions: Result of `petab.get_simulation_conditions`. Can be provided to save time if this has be obtained before. Not required if `edatas` and `parameter_mapping` are provided. :param edatas: Experimental data. Parameters are inserted in-place for simulation. :param parameter_mapping: Optional precomputed PEtab parameter mapping for efficiency, as generated by `create_parameter_mapping`. :param scaled_parameters: If True, problem_parameters are assumed to be on the scale provided in the PEtab parameter table and will be unscaled. If False, they are assumed to be in linear scale. :param log_level: Log level, see :mod:`amici.logging` module. :param num_threads: Number of threads to use for simulating multiple conditions (only used if compiled with OpenMP). :param failfast: Returns as soon as an integration failure is encountered, skipping any remaining simulations. :return: Dictionary of * cost function value (LLH), * const function sensitivity w.r.t. parameters (SLLH), (**NOTE**: Sensitivities are computed for the scaled parameters) * list of `ReturnData` (RDATAS), corresponding to the different simulation conditions. For ordering of simulation conditions, see :meth:`petab.Problem.get_simulation_conditions_from_measurement_df`. """ logger.setLevel(log_level) if solver is None: solver = amici_model.getSolver() # Get parameters if problem_parameters is None: # Use PEtab nominal values as default problem_parameters = {t.Index: getattr(t, NOMINAL_VALUE) for t in petab_problem.parameter_df.itertuples()} scaled_parameters = False # number of amici simulations will be number of unique # (preequilibrationConditionId, simulationConditionId) pairs. # Can be optimized by checking for identical condition vectors. if simulation_conditions is None and parameter_mapping is None \ and edatas is None: simulation_conditions = \ petab_problem.get_simulation_conditions_from_measurement_df() # Get parameter mapping if parameter_mapping is None: parameter_mapping = create_parameter_mapping( petab_problem=petab_problem, simulation_conditions=simulation_conditions, scaled_parameters=scaled_parameters, amici_model=amici_model) # Get edatas if edatas is None: # Generate ExpData with all condition-specific information edatas = create_edatas( amici_model=amici_model, petab_problem=petab_problem, simulation_conditions=simulation_conditions) # Fill parameters in ExpDatas (in-place) fill_in_parameters( edatas=edatas, problem_parameters=problem_parameters, scaled_parameters=scaled_parameters, parameter_mapping=parameter_mapping, amici_model=amici_model) # Simulate rdatas = amici.runAmiciSimulations( amici_model, solver, edata_list=edatas, num_threads=num_threads, failfast=failfast) # Compute total llh llh = sum(rdata['llh'] for rdata in rdatas) # Log results sim_cond = petab_problem.get_simulation_conditions_from_measurement_df() for i, rdata in enumerate(rdatas): logger.debug(f"Condition: {sim_cond.iloc[i, :].values}, status: " f"{rdata['status']}, llh: {rdata['llh']}") return { LLH: llh, RDATAS: rdatas }
[docs]def create_parameterized_edatas( amici_model: AmiciModel, petab_problem: petab.Problem, problem_parameters: Dict[str, numbers.Number], scaled_parameters: bool = False, parameter_mapping: ParameterMapping = None, simulation_conditions: Union[pd.DataFrame, Dict] = None, ) -> List[amici.ExpData]: """Create list of :class:amici.ExpData objects with parameters filled in. :param amici_model: AMICI Model assumed to be compatible with ``petab_problem``. :param petab_problem: PEtab problem to work on. :param problem_parameters: Run simulation with these parameters. If None, PEtab `nominalValues` will be used). To be provided as dict, mapping PEtab problem parameters to SBML IDs. :param scaled_parameters: If True, problem_parameters are assumed to be on the scale provided in the PEtab parameter table and will be unscaled. If False, they are assumed to be in linear scale. :param parameter_mapping: Optional precomputed PEtab parameter mapping for efficiency, as generated by `create_parameter_mapping`. :param simulation_conditions: Result of `petab.get_simulation_conditions`. Can be provided to save time if this has been obtained before. :return: List with one :class:`amici.amici.ExpData` per simulation condition, with filled in timepoints, data and parameters. """ # number of amici simulations will be number of unique # (preequilibrationConditionId, simulationConditionId) pairs. # Can be optimized by checking for identical condition vectors. if simulation_conditions is None: simulation_conditions = \ petab_problem.get_simulation_conditions_from_measurement_df() # Get parameter mapping if parameter_mapping is None: parameter_mapping = create_parameter_mapping( petab_problem=petab_problem, simulation_conditions=simulation_conditions, scaled_parameters=scaled_parameters, amici_model=amici_model) # Generate ExpData with all condition-specific information edatas = create_edatas( amici_model=amici_model, petab_problem=petab_problem, simulation_conditions=simulation_conditions) # Fill parameters in ExpDatas (in-place) fill_in_parameters( edatas=edatas, problem_parameters=problem_parameters, scaled_parameters=scaled_parameters, parameter_mapping=parameter_mapping, amici_model=amici_model) return edatas
[docs]def create_parameter_mapping( petab_problem: petab.Problem, simulation_conditions: Union[pd.DataFrame, Dict], scaled_parameters: bool, amici_model: AmiciModel, ) -> ParameterMapping: """Generate AMICI specific parameter mapping. :param petab_problem: PEtab problem :param simulation_conditions: Result of `petab.get_simulation_conditions`. Can be provided to save time if this has been obtained before. :param scaled_parameters: If True, problem_parameters are assumed to be on the scale provided in the PEtab parameter table and will be unscaled. If False, they are assumed to be in linear scale. :param amici_model: AMICI model. :return: List of the parameter mappings. """ if simulation_conditions is None: simulation_conditions = \ petab_problem.get_simulation_conditions_from_measurement_df() # Because AMICI globalizes all local parameters during model import, # we need to do that here as well to prevent parameter mapping errors # (PEtab does currently not care about SBML LocalParameters) if petab_problem.sbml_document: converter_config = libsbml.SBMLLocalParameterConverter() \ .getDefaultProperties() petab_problem.sbml_document.convert(converter_config) else: logger.debug("No petab_problem.sbml_document is set. Cannot convert " "SBML LocalParameters. If the model contains " "LocalParameters, parameter mapping will fail.") prelim_parameter_mapping = \ petab_problem.get_optimization_to_simulation_parameter_mapping( warn_unmapped=False, scaled_parameters=scaled_parameters, allow_timepoint_specific_numeric_noise_parameters= not petab.lint.observable_table_has_nontrivial_noise_formula( petab_problem.observable_df ) ) parameter_mapping = ParameterMapping() for (_, condition), prelim_mapping_for_condition in \ zip(simulation_conditions.iterrows(), prelim_parameter_mapping): mapping_for_condition = create_parameter_mapping_for_condition( prelim_mapping_for_condition, condition, petab_problem, amici_model) parameter_mapping.append(mapping_for_condition) return parameter_mapping
[docs]def create_parameter_mapping_for_condition( parameter_mapping_for_condition: petab.ParMappingDictQuadruple, condition: Union[pd.Series, Dict], petab_problem: petab.Problem, amici_model: AmiciModel ) -> ParameterMappingForCondition: """Generate AMICI specific parameter mapping for condition. :param parameter_mapping_for_condition: Preliminary parameter mapping for condition. :param condition: pandas.DataFrame row with preequilibrationConditionId and simulationConditionId. :param petab_problem: Underlying PEtab problem. :param amici_model: AMICI model. :return: The parameter and parameter scale mappings, for fixed preequilibration, fixed simulation, and variable simulation parameters, and then the respective scalings. """ (condition_map_preeq, condition_map_sim, condition_scale_map_preeq, condition_scale_map_sim) = parameter_mapping_for_condition logger.debug(f"PEtab mapping: {parameter_mapping_for_condition}") if len(condition_map_preeq) != len(condition_scale_map_preeq) \ or len(condition_map_sim) != len(condition_scale_map_sim): raise AssertionError("Number of parameters and number of parameter " "scales do not match.") if len(condition_map_preeq) \ and len(condition_map_preeq) != len(condition_map_sim): logger.debug(f"Preequilibration parameter map: {condition_map_preeq}") logger.debug(f"Simulation parameter map: {condition_map_sim}") raise AssertionError("Number of parameters for preequilbration " "and simulation do not match.") ########################################################################## # initial states # Initial states have been set during model import based on the SBML model. # If initial states were overwritten in the PEtab condition table, they are # applied here. # During model generation, parameters for initial concentrations and # respective initial assignments have been created for the # relevant species, here we add these parameters to the parameter mapping. # In absence of preequilibration this could also be handled via # ExpData.x0, but in the case of preequilibration this would not allow for # resetting initial states. species_in_condition_table = [ col for col in petab_problem.condition_df if petab_problem.sbml_model.getSpecies(col) is not None] if species_in_condition_table: # set indicator fixed parameter for preeq # (we expect here, that this parameter was added during import and # that it was not added by the user with a different meaning...) if condition_map_preeq: condition_map_preeq[PREEQ_INDICATOR_ID] = 1.0 condition_scale_map_preeq[PREEQ_INDICATOR_ID] = LIN condition_map_sim[PREEQ_INDICATOR_ID] = 0.0 condition_scale_map_sim[PREEQ_INDICATOR_ID] = LIN def _set_initial_concentration(condition_id, species_id, init_par_id, par_map, scale_map): value = petab.to_float_if_float( petab_problem.condition_df.loc[condition_id, species_id]) if pd.isna(value): value = get_species_initial( petab_problem.sbml_model.getSpecies(species_id) ) try: value = float(value) except (ValueError, TypeError): if sp.nsimplify(value).is_Atom: # Get rid of multiplication with one value = sp.nsimplify(value) else: raise NotImplementedError( "Cannot handle non-trivial expressions for " f"species initial for {species_id}: {value}") # this should be a parameter ID value = str(value) logger.debug(f'The species {species_id} has no initial value ' f'defined for the condition {condition_id} in ' 'the PEtab conditions table. The initial value is ' f'now set to {value}, which is the initial value ' 'defined in the SBML model.') par_map[init_par_id] = value if isinstance(value, float): # numeric initial state scale_map[init_par_id] = petab.LIN else: # parametric initial state scale_map[init_par_id] = petab_problem.parameter_df.loc[ value, PARAMETER_SCALE] for species_id in species_in_condition_table: # for preequilibration init_par_id = f'initial_{species_id}_preeq' if condition.get(PREEQUILIBRATION_CONDITION_ID): condition_id = condition[PREEQUILIBRATION_CONDITION_ID] _set_initial_concentration( condition_id, species_id, init_par_id, condition_map_preeq, condition_scale_map_preeq) else: # need to set dummy value for preeq parameter anyways, as it # is expected below (set to 0, not nan, because will be # multiplied with indicator variable in initial assignment) condition_map_sim[init_par_id] = 0.0 condition_scale_map_sim[init_par_id] = LIN # for simulation condition_id = condition[SIMULATION_CONDITION_ID] init_par_id = f'initial_{species_id}_sim' _set_initial_concentration( condition_id, species_id, init_par_id, condition_map_sim, condition_scale_map_sim) ########################################################################## # separate fixed and variable AMICI parameters, because we may have # different fixed parameters for preeq and sim condition, but we cannot # have different variable parameters. without splitting, # merge_preeq_and_sim_pars_condition below may fail. # TODO: This can be done already in parameter mapping creation. variable_par_ids = amici_model.getParameterIds() fixed_par_ids = amici_model.getFixedParameterIds() condition_map_preeq_var, condition_map_preeq_fix = \ subset_dict(condition_map_preeq, variable_par_ids, fixed_par_ids) condition_scale_map_preeq_var, condition_scale_map_preeq_fix = \ subset_dict(condition_scale_map_preeq, variable_par_ids, fixed_par_ids) condition_map_sim_var, condition_map_sim_fix = \ subset_dict(condition_map_sim, variable_par_ids, fixed_par_ids) condition_scale_map_sim_var, condition_scale_map_sim_fix = \ subset_dict(condition_scale_map_sim, variable_par_ids, fixed_par_ids) logger.debug("Fixed parameters preequilibration: " f"{condition_map_preeq_fix}") logger.debug("Fixed parameters simulation: " f"{condition_map_sim_fix}") logger.debug("Variable parameters preequilibration: " f"{condition_map_preeq_var}") logger.debug("Variable parameters simulation: " f"{condition_map_sim_var}") petab.merge_preeq_and_sim_pars_condition( condition_map_preeq_var, condition_map_sim_var, condition_scale_map_preeq_var, condition_scale_map_sim_var, condition) logger.debug(f"Merged: {condition_map_sim_var}") parameter_mapping_for_condition = ParameterMappingForCondition( map_preeq_fix=condition_map_preeq_fix, map_sim_fix=condition_map_sim_fix, map_sim_var=condition_map_sim_var, scale_map_preeq_fix=condition_scale_map_preeq_fix, scale_map_sim_fix=condition_scale_map_sim_fix, scale_map_sim_var=condition_scale_map_sim_var ) return parameter_mapping_for_condition
[docs]def create_edatas( amici_model: AmiciModel, petab_problem: petab.Problem, simulation_conditions: Union[pd.DataFrame, Dict] = None, ) -> List[amici.ExpData]: """Create list of :class:`amici.amici.ExpData` objects for PEtab problem. :param amici_model: AMICI model. :param petab_problem: Underlying PEtab problem. :param simulation_conditions: Result of `petab.get_simulation_conditions`. Can be provided to save time if this has be obtained before. :return: List with one :class:`amici.amici.ExpData` per simulation condition, with filled in timepoints and data. """ if simulation_conditions is None: simulation_conditions = \ petab_problem.get_simulation_conditions_from_measurement_df() observable_ids = amici_model.getObservableIds() edatas = [] for _, condition in simulation_conditions.iterrows(): # Create amici.ExpData for each simulation edata = create_edata_for_condition( condition=condition, amici_model=amici_model, petab_problem=petab_problem, observable_ids=observable_ids, ) edatas.append(edata) return edatas
[docs]def create_edata_for_condition( condition: Union[Dict, pd.Series], amici_model: AmiciModel, petab_problem: petab.Problem, observable_ids: List[str], ) -> amici.ExpData: """Get :class:`amici.amici.ExpData` for the given PEtab condition. Sets timepoints, observed data and sigmas. :param condition: pandas.DataFrame row with preequilibrationConditionId and simulationConditionId. :param amici_model: AMICI model :param petab_problem: Underlying PEtab problem :param observable_ids: List of observable IDs :return: ExpData instance. """ # extract measurement table rows for condition measurement_df = petab.get_rows_for_condition( measurement_df=petab_problem.measurement_df, condition=condition) if amici_model.nytrue != len(observable_ids): raise AssertionError("Number of AMICI model observables does not " "match number of PEtab observables.") # create an ExpData object edata = amici.ExpData(amici_model) ########################################################################## # enable initial parameters reinitialization species_in_condition_table = [ col for col in petab_problem.condition_df if not pd.isna(petab_problem.condition_df.loc[ condition[SIMULATION_CONDITION_ID], col]) and petab_problem.sbml_model.getSpecies(col) is not None ] if condition.get(PREEQUILIBRATION_CONDITION_ID) \ and species_in_condition_table: state_ids = amici_model.getStateIds() state_idx_reinitalization = [state_ids.index(s) for s in species_in_condition_table] edata.reinitialization_state_idxs_sim = state_idx_reinitalization logger.debug("Enabling state reinitialization for condition " f"{condition.get(PREEQUILIBRATION_CONDITION_ID, '')} - " f"{condition.get(SIMULATION_CONDITION_ID)} " f"{species_in_condition_table}") ########################################################################## # timepoints # find replicate numbers of time points timepoints_w_reps = _get_timepoints_with_replicates( df_for_condition=measurement_df) edata.setTimepoints(timepoints_w_reps) ########################################################################## # measurements and sigmas y, sigma_y = _get_measurements_and_sigmas( df_for_condition=measurement_df, timepoints_w_reps=timepoints_w_reps, observable_ids=observable_ids) edata.setObservedData(y.flatten()) edata.setObservedDataStdDev(sigma_y.flatten()) return edata
[docs]def subset_dict(full: Dict[Any, Any], *args: Collection[Any]) -> Iterator[Dict[Any, Any]]: """Get subset of dictionary based on provided keys :param full: Dictionary to subset :param args: Collections of keys to be contained in the different subsets :return: subsetted dictionary """ for keys in args: yield {key: val for (key, val) in full.items() if key in keys}
def _get_timepoints_with_replicates( df_for_condition: pd.DataFrame) -> List[numbers.Number]: """ Get list of timepoints including replicate measurements :param df_for_condition: PEtab measurement table subset for a single condition. :return: Sorted list of timepoints, including multiple timepoints accounting for replicate measurements. """ # create sorted list of all timepoints for which measurements exist timepoints = sorted(df_for_condition[TIME].unique().astype(float)) # find replicate numbers of time points timepoints_w_reps = [] for time in timepoints: # subselect for time df_for_time = df_for_condition[ df_for_condition.time.astype(float) == time ] # rep number is maximum over rep numbers for observables n_reps = max(df_for_time.groupby( [OBSERVABLE_ID, TIME]).size()) # append time point n_rep times timepoints_w_reps.extend([time] * n_reps) return timepoints_w_reps def _get_measurements_and_sigmas( df_for_condition: pd.DataFrame, timepoints_w_reps: Sequence[numbers.Number], observable_ids: Sequence[str], ) -> Tuple[np.array, np.array]: """ Get measurements and sigmas Generate arrays with measurements and sigmas in AMICI format from a PEtab measurement table subset for a single condition. :param df_for_condition: Subset of PEtab measurement table for one condition :param timepoints_w_reps: Timepoints for which there exist measurements, including replicates :param observable_ids: List of observable IDs for mapping IDs to indices. :return: arrays for measurement and sigmas """ # prepare measurement matrix y = np.full(shape=(len(timepoints_w_reps), len(observable_ids)), fill_value=np.nan) # prepare sigma matrix sigma_y = y.copy() timepoints = sorted(df_for_condition[TIME].unique().astype(float)) for time in timepoints: # subselect for time df_for_time = df_for_condition[df_for_condition[TIME] == time] time_ix_0 = timepoints_w_reps.index(time) # remember used time indices for each observable time_ix_for_obs_ix = {} # iterate over measurements for _, measurement in df_for_time.iterrows(): # extract observable index observable_ix = observable_ids.index(measurement[OBSERVABLE_ID]) # update time index for observable if observable_ix in time_ix_for_obs_ix: time_ix_for_obs_ix[observable_ix] += 1 else: time_ix_for_obs_ix[observable_ix] = time_ix_0 # fill observable and possibly noise parameter y[time_ix_for_obs_ix[observable_ix], observable_ix] = measurement[MEASUREMENT] if isinstance(measurement.get(NOISE_PARAMETERS, None), numbers.Number): sigma_y[time_ix_for_obs_ix[observable_ix], observable_ix] = measurement[NOISE_PARAMETERS] return y, sigma_y
[docs]def rdatas_to_measurement_df( rdatas: Sequence[amici.ReturnData], model: AmiciModel, measurement_df: pd.DataFrame) -> pd.DataFrame: """ Create a measurement dataframe in the PEtab format from the passed `rdatas` and own information. :param rdatas: A sequence of rdatas with the ordering of `petab.get_simulation_conditions`. :param model: AMICI model used to generate `rdatas`. :param measurement_df: PEtab measurement table used to generate `rdatas`. :return: A dataframe built from the rdatas in the format of `measurement_df`. """ df = pd.DataFrame(columns=list(measurement_df.columns)) simulation_conditions = petab.get_simulation_conditions( measurement_df) observable_ids = model.getObservableIds() # iterate over conditions for (_, condition), rdata in zip(simulation_conditions.iterrows(), rdatas): # current simulation matrix y = rdata['y'] # time array used in rdata t = list(rdata['t']) # extract rows for condition cur_measurement_df = petab.get_rows_for_condition( measurement_df, condition) # iterate over entries for the given condition # note: this way we only generate a dataframe entry for every # row that existed in the original dataframe. if we want to # e.g. have also timepoints non-existent in the original file, # we need to instead iterate over the rdata['y'] entries for _, row in cur_measurement_df.iterrows(): # copy row row_sim = copy.deepcopy(row) # extract simulated measurement value timepoint_idx = t.index(row[TIME]) observable_idx = observable_ids.index(row[OBSERVABLE_ID]) measurement_sim = y[timepoint_idx, observable_idx] # change measurement entry row_sim[MEASUREMENT] = measurement_sim # append to dataframe df = df.append(row_sim, ignore_index=True) return df
[docs]def rdatas_to_simulation_df( rdatas: Sequence[amici.ReturnData], model: AmiciModel, measurement_df: pd.DataFrame) -> pd.DataFrame: """Create a PEtab simulation dataframe from amici.ReturnDatas. See ``rdatas_to_measurement_df`` for details, only that model outputs will appear in column "simulation" instead of "measurement".""" df = rdatas_to_measurement_df(rdatas=rdatas, model=model, measurement_df=measurement_df) return df.rename(columns={MEASUREMENT: SIMULATION})