Source code for amici.petab_objective

"""
PEtab Objective
---------------
Functionality related to running simulations or evaluating the objective
function as defined by a PEtab problem
"""

import copy
import logging
import numbers
from typing import (Any, Collection, Dict, Iterator, List, Optional, Sequence,
                    Tuple, Union)

import libsbml
import numpy as np
import pandas as pd
import petab
import sympy as sp
from petab.C import *  # noqa: F403
from sympy.abc import _clash

import amici
from amici.sbml_import import get_species_initial
from . import AmiciExpData, AmiciModel
from .logging import get_logger, log_execution_time
from .parameter_mapping import (ParameterMapping, ParameterMappingForCondition,
                                fill_in_parameters)
from .petab_import import PREEQ_INDICATOR_ID, element_is_state
from .parameter_mapping import (
    fill_in_parameters,
    ParameterMappingForCondition,
    ParameterMapping,
)

logger = get_logger(__name__)


# string constant definitions
LLH = 'llh'
SLLH = 'sllh'
FIM = 'fim'
S2LLH = 's2llh'
RES = 'res'
SRES = 'sres'
RDATAS = 'rdatas'
EDATAS = 'edatas'


[docs]@log_execution_time('Simulating PEtab model', logger) def simulate_petab( petab_problem: petab.Problem, amici_model: AmiciModel, solver: Optional[amici.Solver] = None, problem_parameters: Optional[Dict[str, float]] = None, simulation_conditions: Union[pd.DataFrame, Dict] = None, edatas: List[AmiciExpData] = None, parameter_mapping: ParameterMapping = None, scaled_parameters: Optional[bool] = False, log_level: int = logging.WARNING, num_threads: int = 1, failfast: bool = True, scaled_gradients: bool = False, ) -> Dict[str, Any]: """Simulate PEtab model. .. note:: Regardless of `scaled_parameters`, unscaled sensitivities are returned, unless `scaled_gradients=True`. :param petab_problem: PEtab problem to work on. :param amici_model: AMICI Model assumed to be compatible with ``petab_problem``. :param solver: An AMICI solver. Will use default options if None. :param problem_parameters: Run simulation with these parameters. If ``None``, PEtab ``nominalValues`` will be used. To be provided as dict, mapping PEtab problem parameters to SBML IDs. :param simulation_conditions: Result of :py:func:`petab.get_simulation_conditions`. Can be provided to save time if this has be obtained before. Not required if ``edatas`` and ``parameter_mapping`` are provided. :param edatas: Experimental data. Parameters are inserted in-place for simulation. :param parameter_mapping: Optional precomputed PEtab parameter mapping for efficiency, as generated by :py:func:`create_parameter_mapping`. :param scaled_parameters: If ``True``, ``problem_parameters`` are assumed to be on the scale provided in the PEtab parameter table and will be unscaled. If ``False``, they are assumed to be in linear scale. :param log_level: Log level, see :mod:`amici.logging` module. :param num_threads: Number of threads to use for simulating multiple conditions (only used if compiled with OpenMP). :param failfast: Returns as soon as an integration failure is encountered, skipping any remaining simulations. :param scaled_gradients: Whether to compute gradients on parameter scale (``True``) or not (``False``). :return: Dictionary of * cost function value (``LLH``), * list of :class:`amici.amici.ReturnData` (``RDATAS``), * list of :class:`amici.amici.ExpData` (``EDATAS``), corresponding to the different simulation conditions. For ordering of simulation conditions, see :meth:`petab.Problem.get_simulation_conditions_from_measurement_df`. """ logger.setLevel(log_level) if solver is None: solver = amici_model.getSolver() # Switch to scaled parameters. problem_parameters = _default_scaled_parameters( petab_problem=petab_problem, problem_parameters=problem_parameters, scaled_parameters=scaled_parameters, ) scaled_parameters = True # number of amici simulations will be number of unique # (preequilibrationConditionId, simulationConditionId) pairs. # Can be optimized by checking for identical condition vectors. if simulation_conditions is None and parameter_mapping is None \ and edatas is None: simulation_conditions = \ petab_problem.get_simulation_conditions_from_measurement_df() # Get parameter mapping if parameter_mapping is None: parameter_mapping = create_parameter_mapping( petab_problem=petab_problem, simulation_conditions=simulation_conditions, scaled_parameters=scaled_parameters, amici_model=amici_model) # Get edatas if edatas is None: # Generate ExpData with all condition-specific information edatas = create_edatas( amici_model=amici_model, petab_problem=petab_problem, simulation_conditions=simulation_conditions) # Fill parameters in ExpDatas (in-place) fill_in_parameters( edatas=edatas, problem_parameters=problem_parameters, scaled_parameters=scaled_parameters, parameter_mapping=parameter_mapping, amici_model=amici_model) # Simulate rdatas = amici.runAmiciSimulations( amici_model, solver, edata_list=edatas, num_threads=num_threads, failfast=failfast) # Compute total llh llh = sum(rdata['llh'] for rdata in rdatas) # Compute total sllh sllh = None if solver.getSensitivityOrder() != amici.SensitivityOrder.none: sllh = aggregate_sllh( amici_model=amici_model, rdatas=rdatas, parameter_mapping=parameter_mapping, petab_scale=scaled_parameters, petab_problem=petab_problem, edatas=edatas, ) if not scaled_gradients and sllh is not None: sllh = { parameter_id: rescale_sensitivity( sensitivity=sensitivity, parameter_value=problem_parameters[parameter_id], old_scale=petab_problem.parameter_df.loc[ parameter_id, PARAMETER_SCALE ], new_scale=LIN, ) for parameter_id, sensitivity in sllh.items() } # Log results sim_cond = petab_problem.get_simulation_conditions_from_measurement_df() for i, rdata in enumerate(rdatas): sim_cond_id = "N/A" if sim_cond.empty else sim_cond.iloc[i, :].values logger.debug( f"Condition: {sim_cond_id}, status: {rdata['status']}, " f"llh: {rdata['llh']}" ) return { LLH: llh, SLLH: sllh, RDATAS: rdatas, EDATAS: edatas, }
[docs]def aggregate_sllh( amici_model: AmiciModel, rdatas: Sequence[amici.ReturnDataView], parameter_mapping: Optional[ParameterMapping], edatas: List[AmiciExpData], petab_scale: bool = True, petab_problem: petab.Problem = None, ) -> Union[None, Dict[str, float]]: """ Aggregate likelihood gradient for all conditions, according to PEtab parameter mapping. :param amici_model: AMICI model from which ``rdatas`` were obtained. :param rdatas: Simulation results. :param parameter_mapping: PEtab parameter mapping to condition-specific simulation parameters. :param edatas: Experimental data used for simulation. :param petab_scale: Whether to check that sensitivities were computed with parameters on the scales provided in the PEtab parameters table. :param petab_problem: The PEtab problem that defines the parameter scales. :return: Aggregated likelihood sensitivities. """ accumulated_sllh = {} model_parameter_ids = amici_model.getParameterIds() if petab_scale and petab_problem is None: raise ValueError( 'Please provide the PEtab problem, when using ' '`petab_scale=True`.' ) # Check for issues in all condition simulation results. for rdata in rdatas: # Condition failed during simulation. if rdata.status != amici.AMICI_SUCCESS: return None # Condition simulation result does not provide SLLH. if rdata.sllh is None: raise ValueError( 'The sensitivities of the likelihood for a condition were ' 'not computed.' ) for condition_parameter_mapping, edata, rdata in \ zip(parameter_mapping, edatas, rdatas): for sllh_parameter_index, condition_parameter_sllh in \ enumerate(rdata.sllh): # Get PEtab parameter ID # Use ExpData if it provides a parameter list, else default to # Model. if edata.plist: model_parameter_index = edata.plist[sllh_parameter_index] else: model_parameter_index = amici_model.plist(sllh_parameter_index) model_parameter_id = model_parameter_ids[model_parameter_index] petab_parameter_id = ( condition_parameter_mapping .map_sim_var[model_parameter_id] ) # Initialize if petab_parameter_id not in accumulated_sllh: accumulated_sllh[petab_parameter_id] = 0 # Check that the scale is consistent if petab_scale: # `ParameterMappingForCondition` objects provide the scale in # terms of `petab.C` constants already, not AMICI equivalents. model_parameter_scale = ( condition_parameter_mapping .scale_map_sim_var[model_parameter_id] ) petab_parameter_scale = ( petab_problem .parameter_df .loc[petab_parameter_id, PARAMETER_SCALE] ) if model_parameter_scale != petab_parameter_scale: raise ValueError( f'The scale of the parameter `{petab_parameter_id}` ' 'differs between the AMICI model ' f'({model_parameter_scale}) and the PEtab problem ' f'({petab_parameter_scale}).' ) # Accumulate accumulated_sllh[petab_parameter_id] += condition_parameter_sllh return accumulated_sllh
[docs]def rescale_sensitivity( sensitivity: float, parameter_value: float, old_scale: str, new_scale: str, ) -> float: """Rescale a sensitivity between parameter scales. :param sensitivity: The sensitivity corresponding to the parameter value. :param parameter_value: The parameter vector element, on ``old_scale``. :param old_scale: The scale of the parameter value. :param new_scale: The parameter scale on which to rescale the sensitivity. :return: The rescaled sensitivity. """ LOG_E_10 = np.log(10) if old_scale == new_scale: return sensitivity unscaled_parameter_value = petab.parameters.unscale( parameter=parameter_value, scale_str=old_scale, ) scale = { (LIN, LOG): lambda s: s * unscaled_parameter_value, (LOG, LIN): lambda s: s / unscaled_parameter_value, (LIN, LOG10): lambda s: s * (unscaled_parameter_value * LOG_E_10), (LOG10, LIN): lambda s: s / (unscaled_parameter_value * LOG_E_10), } scale[(LOG, LOG10)] = lambda s: scale[(LIN, LOG10)](scale[(LOG, LIN)](s)) scale[(LOG10, LOG)] = lambda s: scale[(LIN, LOG)](scale[(LOG10, LIN)](s)) if (old_scale, new_scale) not in scale: raise NotImplementedError( f"Old scale: {old_scale}. New scale: {new_scale}." ) return scale[(old_scale, new_scale)](sensitivity)
[docs]def create_parameterized_edatas( amici_model: AmiciModel, petab_problem: petab.Problem, problem_parameters: Dict[str, numbers.Number], scaled_parameters: bool = False, parameter_mapping: ParameterMapping = None, simulation_conditions: Union[pd.DataFrame, Dict] = None, ) -> List[amici.ExpData]: """Create list of :class:amici.ExpData objects with parameters filled in. :param amici_model: AMICI Model assumed to be compatible with ``petab_problem``. :param petab_problem: PEtab problem to work on. :param problem_parameters: Run simulation with these parameters. If ``None``, PEtab ``nominalValues`` will be used. To be provided as dict, mapping PEtab problem parameters to SBML IDs. :param scaled_parameters: If ``True``, ``problem_parameters`` are assumed to be on the scale provided in the PEtab parameter table and will be unscaled. If ``False``, they are assumed to be in linear scale. :param parameter_mapping: Optional precomputed PEtab parameter mapping for efficiency, as generated by :func:`create_parameter_mapping`. :param simulation_conditions: Result of :func:`petab.get_simulation_conditions`. Can be provided to save time if this has been obtained before. :return: List with one :class:`amici.amici.ExpData` per simulation condition, with filled in timepoints, data and parameters. """ # number of amici simulations will be number of unique # (preequilibrationConditionId, simulationConditionId) pairs. # Can be optimized by checking for identical condition vectors. if simulation_conditions is None: simulation_conditions = \ petab_problem.get_simulation_conditions_from_measurement_df() # Get parameter mapping if parameter_mapping is None: parameter_mapping = create_parameter_mapping( petab_problem=petab_problem, simulation_conditions=simulation_conditions, scaled_parameters=scaled_parameters, amici_model=amici_model) # Generate ExpData with all condition-specific information edatas = create_edatas( amici_model=amici_model, petab_problem=petab_problem, simulation_conditions=simulation_conditions) # Fill parameters in ExpDatas (in-place) fill_in_parameters( edatas=edatas, problem_parameters=problem_parameters, scaled_parameters=scaled_parameters, parameter_mapping=parameter_mapping, amici_model=amici_model) return edatas
[docs]def create_parameter_mapping( petab_problem: petab.Problem, simulation_conditions: Union[pd.DataFrame, List[Dict]], scaled_parameters: bool, amici_model: AmiciModel, **parameter_mapping_kwargs, ) -> ParameterMapping: """Generate AMICI specific parameter mapping. :param petab_problem: PEtab problem :param simulation_conditions: Result of :func:`petab.get_simulation_conditions`. Can be provided to save time if this has been obtained before. :param scaled_parameters: If ``True``, problem_parameters are assumed to be on the scale provided in the PEtab parameter table and will be unscaled. If ``False``, they are assumed to be in linear scale. :param amici_model: AMICI model. :param parameter_mapping_kwargs: Optional keyword arguments passed to :func:`petab.get_optimization_to_simulation_parameter_mapping`. To allow changing fixed PEtab problem parameters (``estimate=0``), use ``fill_fixed_parameters=False``. :return: List of the parameter mappings. """ if simulation_conditions is None: simulation_conditions = \ petab_problem.get_simulation_conditions_from_measurement_df() if isinstance(simulation_conditions, list): simulation_conditions = pd.DataFrame(data=simulation_conditions) # Because AMICI globalizes all local parameters during model import, # we need to do that here as well to prevent parameter mapping errors # (PEtab does currently not care about SBML LocalParameters) if petab_problem.sbml_document: converter_config = libsbml.SBMLLocalParameterConverter() \ .getDefaultProperties() petab_problem.sbml_document.convert(converter_config) else: logger.debug("No petab_problem.sbml_document is set. Cannot convert " "SBML LocalParameters. If the model contains " "LocalParameters, parameter mapping will fail.") default_parameter_mapping_kwargs = { "warn_unmapped": False, "scaled_parameters": scaled_parameters, "allow_timepoint_specific_numeric_noise_parameters": not petab.lint.observable_table_has_nontrivial_noise_formula( petab_problem.observable_df), } if parameter_mapping_kwargs is None: parameter_mapping_kwargs = {} prelim_parameter_mapping = \ petab.get_optimization_to_simulation_parameter_mapping( condition_df=petab_problem.condition_df, measurement_df=petab_problem.measurement_df, parameter_df=petab_problem.parameter_df, observable_df=petab_problem.observable_df, model=petab_problem.model, **dict(default_parameter_mapping_kwargs, **parameter_mapping_kwargs) ) parameter_mapping = ParameterMapping() for (_, condition), prelim_mapping_for_condition in \ zip(simulation_conditions.iterrows(), prelim_parameter_mapping): mapping_for_condition = create_parameter_mapping_for_condition( prelim_mapping_for_condition, condition, petab_problem, amici_model) parameter_mapping.append(mapping_for_condition) return parameter_mapping
[docs]def create_parameter_mapping_for_condition( parameter_mapping_for_condition: petab.ParMappingDictQuadruple, condition: Union[pd.Series, Dict], petab_problem: petab.Problem, amici_model: AmiciModel ) -> ParameterMappingForCondition: """Generate AMICI specific parameter mapping for condition. :param parameter_mapping_for_condition: Preliminary parameter mapping for condition. :param condition: :class:`pandas.DataFrame` row with ``preequilibrationConditionId`` and ``simulationConditionId``. :param petab_problem: Underlying PEtab problem. :param amici_model: AMICI model. :return: The parameter and parameter scale mappings, for fixed preequilibration, fixed simulation, and variable simulation parameters, and then the respective scalings. """ (condition_map_preeq, condition_map_sim, condition_scale_map_preeq, condition_scale_map_sim) = parameter_mapping_for_condition logger.debug(f"PEtab mapping: {parameter_mapping_for_condition}") if len(condition_map_preeq) != len(condition_scale_map_preeq) \ or len(condition_map_sim) != len(condition_scale_map_sim): raise AssertionError("Number of parameters and number of parameter " "scales do not match.") if len(condition_map_preeq) \ and len(condition_map_preeq) != len(condition_map_sim): logger.debug(f"Preequilibration parameter map: {condition_map_preeq}") logger.debug(f"Simulation parameter map: {condition_map_sim}") raise AssertionError("Number of parameters for preequilbration " "and simulation do not match.") ########################################################################## # initial states # Initial states have been set during model import based on the SBML model. # If initial states were overwritten in the PEtab condition table, they are # applied here. # During model generation, parameters for initial concentrations and # respective initial assignments have been created for the # relevant species, here we add these parameters to the parameter mapping. # In absence of preequilibration this could also be handled via # ExpData.x0, but in the case of preequilibration this would not allow for # resetting initial states. states_in_condition_table = [ col for col in petab_problem.condition_df if element_is_state(petab_problem.sbml_model, col) ] if states_in_condition_table: # set indicator fixed parameter for preeq # (we expect here, that this parameter was added during import and # that it was not added by the user with a different meaning...) if condition_map_preeq: condition_map_preeq[PREEQ_INDICATOR_ID] = 1.0 condition_scale_map_preeq[PREEQ_INDICATOR_ID] = LIN condition_map_sim[PREEQ_INDICATOR_ID] = 0.0 condition_scale_map_sim[PREEQ_INDICATOR_ID] = LIN def _set_initial_state(condition_id, element_id, init_par_id, par_map, scale_map): value = petab.to_float_if_float( petab_problem.condition_df.loc[condition_id, element_id]) if pd.isna(value): element = petab_problem.sbml_model.getElementBySId(element_id) type_code = element.getTypeCode() initial_assignment = petab_problem.sbml_model\ .getInitialAssignmentBySymbol(element_id) if initial_assignment: initial_assignment = sp.sympify( libsbml.formulaToL3String(initial_assignment.getMath()), locals=_clash ) if type_code == libsbml.SBML_SPECIES: value = get_species_initial(element) \ if initial_assignment is None else initial_assignment elif type_code == libsbml.SBML_PARAMETER: value = element.getValue()\ if initial_assignment is None else initial_assignment elif type_code == libsbml.SBML_COMPARTMENT: value = element.getSize()\ if initial_assignment is None else initial_assignment else: raise NotImplementedError( f"Don't know what how to handle {element_id} in " "condition table.") try: value = float(value) except (ValueError, TypeError): if sp.nsimplify(value).is_Atom: # Get rid of multiplication with one value = sp.nsimplify(value) else: raise NotImplementedError( "Cannot handle non-trivial initial state " f"expression for {element_id}: {value}") # this should be a parameter ID value = str(value) logger.debug(f'The species {element_id} has no initial value ' f'defined for the condition {condition_id} in ' 'the PEtab conditions table. The initial value is ' f'now set to {value}, which is the initial value ' 'defined in the SBML model.') par_map[init_par_id] = value if isinstance(value, float): # numeric initial state scale_map[init_par_id] = petab.LIN else: # parametric initial state scale_map[init_par_id] = \ petab_problem.parameter_df[PARAMETER_SCALE]\ .get(value, petab.LIN) for element_id in states_in_condition_table: # for preequilibration init_par_id = f'initial_{element_id}_preeq' if condition.get(PREEQUILIBRATION_CONDITION_ID): condition_id = condition[PREEQUILIBRATION_CONDITION_ID] _set_initial_state( condition_id, element_id, init_par_id, condition_map_preeq, condition_scale_map_preeq) else: # need to set dummy value for preeq parameter anyways, as it # is expected below (set to 0, not nan, because will be # multiplied with indicator variable in initial assignment) condition_map_sim[init_par_id] = 0.0 condition_scale_map_sim[init_par_id] = LIN # for simulation condition_id = condition[SIMULATION_CONDITION_ID] init_par_id = f'initial_{element_id}_sim' _set_initial_state( condition_id, element_id, init_par_id, condition_map_sim, condition_scale_map_sim) ########################################################################## # separate fixed and variable AMICI parameters, because we may have # different fixed parameters for preeq and sim condition, but we cannot # have different variable parameters. without splitting, # merge_preeq_and_sim_pars_condition below may fail. # TODO: This can be done already in parameter mapping creation. variable_par_ids = amici_model.getParameterIds() fixed_par_ids = amici_model.getFixedParameterIds() condition_map_preeq_var, condition_map_preeq_fix = \ _subset_dict(condition_map_preeq, variable_par_ids, fixed_par_ids) condition_scale_map_preeq_var, condition_scale_map_preeq_fix = \ _subset_dict(condition_scale_map_preeq, variable_par_ids, fixed_par_ids) condition_map_sim_var, condition_map_sim_fix = \ _subset_dict(condition_map_sim, variable_par_ids, fixed_par_ids) condition_scale_map_sim_var, condition_scale_map_sim_fix = \ _subset_dict(condition_scale_map_sim, variable_par_ids, fixed_par_ids) logger.debug("Fixed parameters preequilibration: " f"{condition_map_preeq_fix}") logger.debug("Fixed parameters simulation: " f"{condition_map_sim_fix}") logger.debug("Variable parameters preequilibration: " f"{condition_map_preeq_var}") logger.debug("Variable parameters simulation: " f"{condition_map_sim_var}") petab.merge_preeq_and_sim_pars_condition( condition_map_preeq_var, condition_map_sim_var, condition_scale_map_preeq_var, condition_scale_map_sim_var, condition) logger.debug(f"Merged: {condition_map_sim_var}") parameter_mapping_for_condition = ParameterMappingForCondition( map_preeq_fix=condition_map_preeq_fix, map_sim_fix=condition_map_sim_fix, map_sim_var=condition_map_sim_var, scale_map_preeq_fix=condition_scale_map_preeq_fix, scale_map_sim_fix=condition_scale_map_sim_fix, scale_map_sim_var=condition_scale_map_sim_var ) return parameter_mapping_for_condition
[docs]def create_edatas( amici_model: AmiciModel, petab_problem: petab.Problem, simulation_conditions: Union[pd.DataFrame, Dict] = None, ) -> List[amici.ExpData]: """Create list of :class:`amici.amici.ExpData` objects for PEtab problem. :param amici_model: AMICI model. :param petab_problem: Underlying PEtab problem. :param simulation_conditions: Result of :func:`petab.get_simulation_conditions`. Can be provided to save time if this has be obtained before. :return: List with one :class:`amici.amici.ExpData` per simulation condition, with filled in timepoints and data. """ if simulation_conditions is None: simulation_conditions = \ petab_problem.get_simulation_conditions_from_measurement_df() observable_ids = amici_model.getObservableIds() measurement_dfs = dict(list( petab_problem.measurement_df.groupby( [petab.SIMULATION_CONDITION_ID, petab.PREEQUILIBRATION_CONDITION_ID] if petab.PREEQUILIBRATION_CONDITION_ID in simulation_conditions else petab.SIMULATION_CONDITION_ID ) )) edatas = [] for _, condition in simulation_conditions.iterrows(): # Create amici.ExpData for each simulation if petab.PREEQUILIBRATION_CONDITION_ID in condition: measurement_index = ( condition.get(petab.SIMULATION_CONDITION_ID), condition.get(petab.PREEQUILIBRATION_CONDITION_ID) ) else: measurement_index = condition.get(petab.SIMULATION_CONDITION_ID) edata = create_edata_for_condition( condition=condition, amici_model=amici_model, measurement_df=measurement_dfs[measurement_index], petab_problem=petab_problem, observable_ids=observable_ids, ) edatas.append(edata) return edatas
[docs]def create_edata_for_condition( condition: Union[Dict, pd.Series], measurement_df: pd.DataFrame, amici_model: AmiciModel, petab_problem: petab.Problem, observable_ids: List[str], ) -> amici.ExpData: """Get :class:`amici.amici.ExpData` for the given PEtab condition. Sets timepoints, observed data and sigmas. :param condition: :class:`pandas.DataFrame` row with ``preequilibrationConditionId`` and ``simulationConditionId``. :param measurement_df: :class:`pandas.DataFrame` with measurements for the given condition. :param amici_model: AMICI model :param petab_problem: Underlying PEtab problem :param observable_ids: List of observable IDs :return: ExpData instance. """ if amici_model.nytrue != len(observable_ids): raise AssertionError("Number of AMICI model observables does not " "match number of PEtab observables.") # create an ExpData object edata = amici.ExpData(amici_model) edata.id = condition[SIMULATION_CONDITION_ID] if condition.get(PREEQUILIBRATION_CONDITION_ID): edata.id += "+" + condition.get(PREEQUILIBRATION_CONDITION_ID) ########################################################################## # enable initial parameters reinitialization states_in_condition_table = [ col for col in petab_problem.condition_df if not pd.isna(petab_problem.condition_df.loc[ condition[SIMULATION_CONDITION_ID], col]) and element_is_state(petab_problem.sbml_model, col) ] if condition.get(PREEQUILIBRATION_CONDITION_ID) \ and states_in_condition_table: state_ids = amici_model.getStateIds() state_idx_reinitalization = [state_ids.index(s) for s in states_in_condition_table] edata.reinitialization_state_idxs_sim = state_idx_reinitalization logger.debug("Enabling state reinitialization for condition " f"{condition.get(PREEQUILIBRATION_CONDITION_ID, '')} - " f"{condition.get(SIMULATION_CONDITION_ID)} " f"{states_in_condition_table}") ########################################################################## # timepoints # find replicate numbers of time points timepoints_w_reps = _get_timepoints_with_replicates( df_for_condition=measurement_df) edata.setTimepoints(timepoints_w_reps) ########################################################################## # measurements and sigmas y, sigma_y = _get_measurements_and_sigmas( df_for_condition=measurement_df, timepoints_w_reps=timepoints_w_reps, observable_ids=observable_ids) edata.setObservedData(y.flatten()) edata.setObservedDataStdDev(sigma_y.flatten()) return edata
def _subset_dict(full: Dict[Any, Any], *args: Collection[Any]) -> Iterator[Dict[Any, Any]]: """Get subset of dictionary based on provided keys :param full: Dictionary to subset :param args: Collections of keys to be contained in the different subsets :return: subsetted dictionary """ for keys in args: yield {key: val for (key, val) in full.items() if key in keys} def _get_timepoints_with_replicates( df_for_condition: pd.DataFrame) -> List[numbers.Number]: """ Get list of timepoints including replicate measurements :param df_for_condition: PEtab measurement table subset for a single condition. :return: Sorted list of timepoints, including multiple timepoints accounting for replicate measurements. """ # create sorted list of all timepoints for which measurements exist timepoints = sorted(df_for_condition[TIME].unique().astype(float)) # find replicate numbers of time points timepoints_w_reps = [] for time in timepoints: # subselect for time df_for_time = df_for_condition[ df_for_condition.time.astype(float) == time ] # rep number is maximum over rep numbers for observables n_reps = max(df_for_time.groupby( [OBSERVABLE_ID, TIME]).size()) # append time point n_rep times timepoints_w_reps.extend([time] * n_reps) return timepoints_w_reps def _get_measurements_and_sigmas( df_for_condition: pd.DataFrame, timepoints_w_reps: Sequence[numbers.Number], observable_ids: Sequence[str], ) -> Tuple[np.array, np.array]: """ Get measurements and sigmas Generate arrays with measurements and sigmas in AMICI format from a PEtab measurement table subset for a single condition. :param df_for_condition: Subset of PEtab measurement table for one condition :param timepoints_w_reps: Timepoints for which there exist measurements, including replicates :param observable_ids: List of observable IDs for mapping IDs to indices. :return: arrays for measurement and sigmas """ # prepare measurement matrix y = np.full(shape=(len(timepoints_w_reps), len(observable_ids)), fill_value=np.nan) # prepare sigma matrix sigma_y = y.copy() timepoints = sorted(df_for_condition[TIME].unique().astype(float)) for time in timepoints: # subselect for time df_for_time = df_for_condition[df_for_condition[TIME] == time] time_ix_0 = timepoints_w_reps.index(time) # remember used time indices for each observable time_ix_for_obs_ix = {} # iterate over measurements for _, measurement in df_for_time.iterrows(): # extract observable index observable_ix = observable_ids.index(measurement[OBSERVABLE_ID]) # update time index for observable if observable_ix in time_ix_for_obs_ix: time_ix_for_obs_ix[observable_ix] += 1 else: time_ix_for_obs_ix[observable_ix] = time_ix_0 # fill observable and possibly noise parameter y[time_ix_for_obs_ix[observable_ix], observable_ix] = measurement[MEASUREMENT] if isinstance(measurement.get(NOISE_PARAMETERS, None), numbers.Number): sigma_y[time_ix_for_obs_ix[observable_ix], observable_ix] = measurement[NOISE_PARAMETERS] return y, sigma_y
[docs]def rdatas_to_measurement_df( rdatas: Sequence[amici.ReturnData], model: AmiciModel, measurement_df: pd.DataFrame) -> pd.DataFrame: """ Create a measurement dataframe in the PEtab format from the passed ``rdatas`` and own information. :param rdatas: A sequence of rdatas with the ordering of :func:`petab.get_simulation_conditions`. :param model: AMICI model used to generate ``rdatas``. :param measurement_df: PEtab measurement table used to generate ``rdatas``. :return: A dataframe built from the rdatas in the format of ``measurement_df``. """ simulation_conditions = petab.get_simulation_conditions( measurement_df) observable_ids = model.getObservableIds() rows = [] # iterate over conditions for (_, condition), rdata in zip(simulation_conditions.iterrows(), rdatas): # current simulation matrix y = rdata.y # time array used in rdata t = list(rdata.ts) # extract rows for condition cur_measurement_df = petab.get_rows_for_condition( measurement_df, condition) # iterate over entries for the given condition # note: this way we only generate a dataframe entry for every # row that existed in the original dataframe. if we want to # e.g. have also timepoints non-existent in the original file, # we need to instead iterate over the rdata['y'] entries for _, row in cur_measurement_df.iterrows(): # copy row row_sim = copy.deepcopy(row) # extract simulated measurement value timepoint_idx = t.index(row[TIME]) observable_idx = observable_ids.index(row[OBSERVABLE_ID]) measurement_sim = y[timepoint_idx, observable_idx] # change measurement entry row_sim[MEASUREMENT] = measurement_sim rows.append(row_sim) return pd.DataFrame(rows)
[docs]def rdatas_to_simulation_df( rdatas: Sequence[amici.ReturnData], model: AmiciModel, measurement_df: pd.DataFrame) -> pd.DataFrame: """Create a PEtab simulation dataframe from :class:`amici.amici.ReturnData` s. See :func:`rdatas_to_measurement_df` for details, only that model outputs will appear in column ``simulation`` instead of ``measurement``.""" df = rdatas_to_measurement_df(rdatas=rdatas, model=model, measurement_df=measurement_df) return df.rename(columns={MEASUREMENT: SIMULATION})
def _default_scaled_parameters( petab_problem: petab.Problem, problem_parameters: Optional[Dict[str, float]] = None, scaled_parameters: bool = False, ) -> Optional[Dict[str, float]]: """ Helper method to handle an unscaled or unspecified parameter vector. The parameter vector defaults to the nominal values in the PEtab parameter table. Unscaled parameter values are scaled. :param petab_problem: The PEtab problem. :param problem_parameters: Keys are PEtab parameter IDs, values are parameter values on the scale defined in the PEtab parameter table. Defaults to the nominal values in the PEtab parameter table. :param scaled_parameters: Whether `problem_parameters` are on the scale defined in the PEtab parameter table. :return: The scaled parameter vector. """ if problem_parameters is None: problem_parameters = dict(zip( petab_problem.x_ids, petab_problem.x_nominal_scaled, )) elif not scaled_parameters: problem_parameters = petab_problem.scale_parameters(problem_parameters) return problem_parameters