Source code for amici.petab_objective

"""
PEtab Objective
---------------
Functionality related to running simulations or evaluating the objective
function as defined by a PEtab problem
"""

import copy
import logging
import numbers
import re
from typing import (
    Any,
    Collection,
    Dict,
    Iterator,
    List,
    Optional,
    Sequence,
    Tuple,
    Union,
)

import amici
import libsbml
import numpy as np
import pandas as pd
import petab
import sympy as sp
from amici.sbml_import import get_species_initial
from petab.C import *  # noqa: F403
from petab.models import MODEL_TYPE_PYSB, MODEL_TYPE_SBML
from sympy.abc import _clash

from . import AmiciExpData, AmiciModel
from .logging import get_logger, log_execution_time
from .parameter_mapping import (
    ParameterMapping,
    ParameterMappingForCondition,
    fill_in_parameters,
)
from .petab_import import PREEQ_INDICATOR_ID
from .petab_util import get_states_in_condition_table

try:
    import pysb
except ImportError:
    pysb = None

logger = get_logger(__name__)


# string constant definitions
LLH = "llh"
SLLH = "sllh"
FIM = "fim"
S2LLH = "s2llh"
RES = "res"
SRES = "sres"
RDATAS = "rdatas"
EDATAS = "edatas"


[docs]@log_execution_time("Simulating PEtab model", logger) def simulate_petab( petab_problem: petab.Problem, amici_model: AmiciModel, solver: Optional[amici.Solver] = None, problem_parameters: Optional[Dict[str, float]] = None, simulation_conditions: Union[pd.DataFrame, Dict] = None, edatas: List[AmiciExpData] = None, parameter_mapping: ParameterMapping = None, scaled_parameters: Optional[bool] = False, log_level: int = logging.WARNING, num_threads: int = 1, failfast: bool = True, scaled_gradients: bool = False, ) -> Dict[str, Any]: """Simulate PEtab model. .. note:: Regardless of `scaled_parameters`, unscaled sensitivities are returned, unless `scaled_gradients=True`. :param petab_problem: PEtab problem to work on. :param amici_model: AMICI Model assumed to be compatible with ``petab_problem``. :param solver: An AMICI solver. Will use default options if None. :param problem_parameters: Run simulation with these parameters. If ``None``, PEtab ``nominalValues`` will be used. To be provided as dict, mapping PEtab problem parameters to SBML IDs. :param simulation_conditions: Result of :py:func:`petab.get_simulation_conditions`. Can be provided to save time if this has be obtained before. Not required if ``edatas`` and ``parameter_mapping`` are provided. :param edatas: Experimental data. Parameters are inserted in-place for simulation. :param parameter_mapping: Optional precomputed PEtab parameter mapping for efficiency, as generated by :py:func:`create_parameter_mapping`. :param scaled_parameters: If ``True``, ``problem_parameters`` are assumed to be on the scale provided in the PEtab parameter table and will be unscaled. If ``False``, they are assumed to be in linear scale. :param log_level: Log level, see :mod:`amici.logging` module. :param num_threads: Number of threads to use for simulating multiple conditions (only used if compiled with OpenMP). :param failfast: Returns as soon as an integration failure is encountered, skipping any remaining simulations. :param scaled_gradients: Whether to compute gradients on parameter scale (``True``) or not (``False``). :return: Dictionary of * cost function value (``LLH``), * list of :class:`amici.amici.ReturnData` (``RDATAS``), * list of :class:`amici.amici.ExpData` (``EDATAS``), corresponding to the different simulation conditions. For ordering of simulation conditions, see :meth:`petab.Problem.get_simulation_conditions_from_measurement_df`. """ logger.setLevel(log_level) if solver is None: solver = amici_model.getSolver() # Switch to scaled parameters. problem_parameters = _default_scaled_parameters( petab_problem=petab_problem, problem_parameters=problem_parameters, scaled_parameters=scaled_parameters, ) scaled_parameters = True # number of amici simulations will be number of unique # (preequilibrationConditionId, simulationConditionId) pairs. # Can be optimized by checking for identical condition vectors. if simulation_conditions is None and parameter_mapping is None and edatas is None: simulation_conditions = ( petab_problem.get_simulation_conditions_from_measurement_df() ) # Get parameter mapping if parameter_mapping is None: parameter_mapping = create_parameter_mapping( petab_problem=petab_problem, simulation_conditions=simulation_conditions, scaled_parameters=scaled_parameters, amici_model=amici_model, ) # Get edatas if edatas is None: # Generate ExpData with all condition-specific information edatas = create_edatas( amici_model=amici_model, petab_problem=petab_problem, simulation_conditions=simulation_conditions, ) # Fill parameters in ExpDatas (in-place) fill_in_parameters( edatas=edatas, problem_parameters=problem_parameters, scaled_parameters=scaled_parameters, parameter_mapping=parameter_mapping, amici_model=amici_model, ) # Simulate rdatas = amici.runAmiciSimulations( amici_model, solver, edata_list=edatas, num_threads=num_threads, failfast=failfast, ) # Compute total llh llh = sum(rdata["llh"] for rdata in rdatas) # Compute total sllh sllh = None if solver.getSensitivityOrder() != amici.SensitivityOrder.none: sllh = aggregate_sllh( amici_model=amici_model, rdatas=rdatas, parameter_mapping=parameter_mapping, petab_scale=scaled_parameters, petab_problem=petab_problem, edatas=edatas, ) if not scaled_gradients and sllh is not None: sllh = { parameter_id: rescale_sensitivity( sensitivity=sensitivity, parameter_value=problem_parameters[parameter_id], old_scale=petab_problem.parameter_df.loc[ parameter_id, PARAMETER_SCALE ], new_scale=LIN, ) for parameter_id, sensitivity in sllh.items() } # Log results sim_cond = petab_problem.get_simulation_conditions_from_measurement_df() for i, rdata in enumerate(rdatas): sim_cond_id = "N/A" if sim_cond.empty else sim_cond.iloc[i, :].values logger.debug( f"Condition: {sim_cond_id}, status: {rdata['status']}, " f"llh: {rdata['llh']}" ) return { LLH: llh, SLLH: sllh, RDATAS: rdatas, EDATAS: edatas, }
[docs]def aggregate_sllh( amici_model: AmiciModel, rdatas: Sequence[amici.ReturnDataView], parameter_mapping: Optional[ParameterMapping], edatas: List[AmiciExpData], petab_scale: bool = True, petab_problem: petab.Problem = None, ) -> Union[None, Dict[str, float]]: """ Aggregate likelihood gradient for all conditions, according to PEtab parameter mapping. :param amici_model: AMICI model from which ``rdatas`` were obtained. :param rdatas: Simulation results. :param parameter_mapping: PEtab parameter mapping to condition-specific simulation parameters. :param edatas: Experimental data used for simulation. :param petab_scale: Whether to check that sensitivities were computed with parameters on the scales provided in the PEtab parameters table. :param petab_problem: The PEtab problem that defines the parameter scales. :return: Aggregated likelihood sensitivities. """ accumulated_sllh = {} model_parameter_ids = amici_model.getParameterIds() if petab_scale and petab_problem is None: raise ValueError( "Please provide the PEtab problem, when using " "`petab_scale=True`." ) # Check for issues in all condition simulation results. for rdata in rdatas: # Condition failed during simulation. if rdata.status != amici.AMICI_SUCCESS: return None # Condition simulation result does not provide SLLH. if rdata.sllh is None: raise ValueError( "The sensitivities of the likelihood for a condition were " "not computed." ) for condition_parameter_mapping, edata, rdata in zip( parameter_mapping, edatas, rdatas ): for sllh_parameter_index, condition_parameter_sllh in enumerate(rdata.sllh): # Get PEtab parameter ID # Use ExpData if it provides a parameter list, else default to # Model. if edata.plist: model_parameter_index = edata.plist[sllh_parameter_index] else: model_parameter_index = amici_model.plist(sllh_parameter_index) model_parameter_id = model_parameter_ids[model_parameter_index] petab_parameter_id = condition_parameter_mapping.map_sim_var[ model_parameter_id ] # Initialize if petab_parameter_id not in accumulated_sllh: accumulated_sllh[petab_parameter_id] = 0 # Check that the scale is consistent if petab_scale: # `ParameterMappingForCondition` objects provide the scale in # terms of `petab.C` constants already, not AMICI equivalents. model_parameter_scale = condition_parameter_mapping.scale_map_sim_var[ model_parameter_id ] petab_parameter_scale = petab_problem.parameter_df.loc[ petab_parameter_id, PARAMETER_SCALE ] if model_parameter_scale != petab_parameter_scale: raise ValueError( f"The scale of the parameter `{petab_parameter_id}` " "differs between the AMICI model " f"({model_parameter_scale}) and the PEtab problem " f"({petab_parameter_scale})." ) # Accumulate accumulated_sllh[petab_parameter_id] += condition_parameter_sllh return accumulated_sllh
[docs]def rescale_sensitivity( sensitivity: float, parameter_value: float, old_scale: str, new_scale: str, ) -> float: """Rescale a sensitivity between parameter scales. :param sensitivity: The sensitivity corresponding to the parameter value. :param parameter_value: The parameter vector element, on ``old_scale``. :param old_scale: The scale of the parameter value. :param new_scale: The parameter scale on which to rescale the sensitivity. :return: The rescaled sensitivity. """ LOG_E_10 = np.log(10) if old_scale == new_scale: return sensitivity unscaled_parameter_value = petab.parameters.unscale( parameter=parameter_value, scale_str=old_scale, ) scale = { (LIN, LOG): lambda s: s * unscaled_parameter_value, (LOG, LIN): lambda s: s / unscaled_parameter_value, (LIN, LOG10): lambda s: s * (unscaled_parameter_value * LOG_E_10), (LOG10, LIN): lambda s: s / (unscaled_parameter_value * LOG_E_10), } scale[(LOG, LOG10)] = lambda s: scale[(LIN, LOG10)](scale[(LOG, LIN)](s)) scale[(LOG10, LOG)] = lambda s: scale[(LIN, LOG)](scale[(LOG10, LIN)](s)) if (old_scale, new_scale) not in scale: raise NotImplementedError(f"Old scale: {old_scale}. New scale: {new_scale}.") return scale[(old_scale, new_scale)](sensitivity)
[docs]def create_parameterized_edatas( amici_model: AmiciModel, petab_problem: petab.Problem, problem_parameters: Dict[str, numbers.Number], scaled_parameters: bool = False, parameter_mapping: ParameterMapping = None, simulation_conditions: Union[pd.DataFrame, Dict] = None, ) -> List[amici.ExpData]: """Create list of :class:amici.ExpData objects with parameters filled in. :param amici_model: AMICI Model assumed to be compatible with ``petab_problem``. :param petab_problem: PEtab problem to work on. :param problem_parameters: Run simulation with these parameters. If ``None``, PEtab ``nominalValues`` will be used. To be provided as dict, mapping PEtab problem parameters to SBML IDs. :param scaled_parameters: If ``True``, ``problem_parameters`` are assumed to be on the scale provided in the PEtab parameter table and will be unscaled. If ``False``, they are assumed to be in linear scale. :param parameter_mapping: Optional precomputed PEtab parameter mapping for efficiency, as generated by :func:`create_parameter_mapping`. :param simulation_conditions: Result of :func:`petab.get_simulation_conditions`. Can be provided to save time if this has been obtained before. :return: List with one :class:`amici.amici.ExpData` per simulation condition, with filled in timepoints, data and parameters. """ # number of amici simulations will be number of unique # (preequilibrationConditionId, simulationConditionId) pairs. # Can be optimized by checking for identical condition vectors. if simulation_conditions is None: simulation_conditions = ( petab_problem.get_simulation_conditions_from_measurement_df() ) # Get parameter mapping if parameter_mapping is None: parameter_mapping = create_parameter_mapping( petab_problem=petab_problem, simulation_conditions=simulation_conditions, scaled_parameters=scaled_parameters, amici_model=amici_model, ) # Generate ExpData with all condition-specific information edatas = create_edatas( amici_model=amici_model, petab_problem=petab_problem, simulation_conditions=simulation_conditions, ) # Fill parameters in ExpDatas (in-place) fill_in_parameters( edatas=edatas, problem_parameters=problem_parameters, scaled_parameters=scaled_parameters, parameter_mapping=parameter_mapping, amici_model=amici_model, ) return edatas
[docs]def create_parameter_mapping( petab_problem: petab.Problem, simulation_conditions: Union[pd.DataFrame, List[Dict]], scaled_parameters: bool, amici_model: AmiciModel, **parameter_mapping_kwargs, ) -> ParameterMapping: """Generate AMICI specific parameter mapping. :param petab_problem: PEtab problem :param simulation_conditions: Result of :func:`petab.get_simulation_conditions`. Can be provided to save time if this has been obtained before. :param scaled_parameters: If ``True``, problem_parameters are assumed to be on the scale provided in the PEtab parameter table and will be unscaled. If ``False``, they are assumed to be in linear scale. :param amici_model: AMICI model. :param parameter_mapping_kwargs: Optional keyword arguments passed to :func:`petab.get_optimization_to_simulation_parameter_mapping`. To allow changing fixed PEtab problem parameters (``estimate=0``), use ``fill_fixed_parameters=False``. :return: List of the parameter mappings. """ if simulation_conditions is None: simulation_conditions = ( petab_problem.get_simulation_conditions_from_measurement_df() ) if isinstance(simulation_conditions, list): simulation_conditions = pd.DataFrame(data=simulation_conditions) # Because AMICI globalizes all local parameters during model import, # we need to do that here as well to prevent parameter mapping errors # (PEtab does currently not care about SBML LocalParameters) if petab_problem.model.type_id == MODEL_TYPE_SBML: if petab_problem.sbml_document: converter_config = ( libsbml.SBMLLocalParameterConverter().getDefaultProperties() ) petab_problem.sbml_document.convert(converter_config) else: logger.debug( "No petab_problem.sbml_document is set. Cannot " "convert SBML LocalParameters. If the model contains " "LocalParameters, parameter mapping will fail." ) default_parameter_mapping_kwargs = { "warn_unmapped": False, "scaled_parameters": scaled_parameters, "allow_timepoint_specific_numeric_noise_parameters": not petab.lint.observable_table_has_nontrivial_noise_formula( petab_problem.observable_df ), } if parameter_mapping_kwargs is None: parameter_mapping_kwargs = {} prelim_parameter_mapping = petab.get_optimization_to_simulation_parameter_mapping( condition_df=petab_problem.condition_df, measurement_df=petab_problem.measurement_df, parameter_df=petab_problem.parameter_df, observable_df=petab_problem.observable_df, mapping_df=petab_problem.mapping_df, model=petab_problem.model, **dict(default_parameter_mapping_kwargs, **parameter_mapping_kwargs), ) parameter_mapping = ParameterMapping() for (_, condition), prelim_mapping_for_condition in zip( simulation_conditions.iterrows(), prelim_parameter_mapping ): mapping_for_condition = create_parameter_mapping_for_condition( prelim_mapping_for_condition, condition, petab_problem, amici_model ) parameter_mapping.append(mapping_for_condition) return parameter_mapping
def _get_initial_state_sbml( petab_problem: petab.Problem, element_id: str ) -> Union[float, sp.Basic]: element = petab_problem.sbml_model.getElementBySId(element_id) type_code = element.getTypeCode() initial_assignment = petab_problem.sbml_model.getInitialAssignmentBySymbol( element_id ) if initial_assignment: initial_assignment = sp.sympify( libsbml.formulaToL3String(initial_assignment.getMath()), locals=_clash ) if type_code == libsbml.SBML_SPECIES: value = ( get_species_initial(element) if initial_assignment is None else initial_assignment ) elif type_code == libsbml.SBML_PARAMETER: value = element.getValue() if initial_assignment is None else initial_assignment elif type_code == libsbml.SBML_COMPARTMENT: value = element.getSize() if initial_assignment is None else initial_assignment else: raise NotImplementedError( f"Don't know what how to handle {element_id} in " "condition table." ) return value def _get_initial_state_pysb( petab_problem: petab.Problem, element_id: str ) -> Union[float, sp.Symbol]: species_idx = int(re.match(r"__s(\d+)$", element_id)[1]) species_pattern = petab_problem.model.model.species[species_idx] from pysb.pattern import match_complex_pattern value = next( ( initial.value for initial in petab_problem.model.model.initials if match_complex_pattern(initial.pattern, species_pattern, exact=True) ), 0.0, ) if isinstance(value, pysb.Parameter): if value.name in petab_problem.parameter_df.index: value = value.name else: value = value.value return value def _set_initial_state( petab_problem, condition_id, element_id, init_par_id, par_map, scale_map, value, ): value = petab.to_float_if_float(value) if pd.isna(value): if petab_problem.model.type_id == MODEL_TYPE_SBML: value = _get_initial_state_sbml(petab_problem, element_id) elif petab_problem.model.type_id == MODEL_TYPE_PYSB: value = _get_initial_state_pysb(petab_problem, element_id) try: value = float(value) except (ValueError, TypeError): if sp.nsimplify(value).is_Atom and ( pysb is None or not isinstance(value, pysb.Component) ): # Get rid of multiplication with one value = sp.nsimplify(value) else: raise NotImplementedError( "Cannot handle non-trivial initial state " f"expression for {element_id}: {value}" ) # this should be a parameter ID value = str(value) logger.debug( f"The species {element_id} has no initial value " f"defined for the condition {condition_id} in " "the PEtab conditions table. The initial value is " f"now set to {value}, which is the initial value " "defined in the SBML model." ) par_map[init_par_id] = value if isinstance(value, float): # numeric initial state scale_map[init_par_id] = petab.LIN else: # parametric initial state scale_map[init_par_id] = petab_problem.parameter_df[PARAMETER_SCALE].get( value, petab.LIN )
[docs]def create_parameter_mapping_for_condition( parameter_mapping_for_condition: petab.ParMappingDictQuadruple, condition: Union[pd.Series, Dict], petab_problem: petab.Problem, amici_model: AmiciModel, ) -> ParameterMappingForCondition: """Generate AMICI specific parameter mapping for condition. :param parameter_mapping_for_condition: Preliminary parameter mapping for condition. :param condition: :class:`pandas.DataFrame` row with ``preequilibrationConditionId`` and ``simulationConditionId``. :param petab_problem: Underlying PEtab problem. :param amici_model: AMICI model. :return: The parameter and parameter scale mappings, for fixed preequilibration, fixed simulation, and variable simulation parameters, and then the respective scalings. """ ( condition_map_preeq, condition_map_sim, condition_scale_map_preeq, condition_scale_map_sim, ) = parameter_mapping_for_condition logger.debug(f"PEtab mapping: {parameter_mapping_for_condition}") if len(condition_map_preeq) != len(condition_scale_map_preeq) or len( condition_map_sim ) != len(condition_scale_map_sim): raise AssertionError( "Number of parameters and number of parameter " "scales do not match." ) if len(condition_map_preeq) and len(condition_map_preeq) != len(condition_map_sim): logger.debug(f"Preequilibration parameter map: {condition_map_preeq}") logger.debug(f"Simulation parameter map: {condition_map_sim}") raise AssertionError( "Number of parameters for preequilbration " "and simulation do not match." ) ########################################################################## # initial states # Initial states have been set during model import based on the SBML model. # If initial states were overwritten in the PEtab condition table, they are # applied here. # During model generation, parameters for initial concentrations and # respective initial assignments have been created for the # relevant species, here we add these parameters to the parameter mapping. # In absence of preequilibration this could also be handled via # ExpData.x0, but in the case of preequilibration this would not allow for # resetting initial states. if states_in_condition_table := get_states_in_condition_table( petab_problem, condition ): # set indicator fixed parameter for preeq # (we expect here, that this parameter was added during import and # that it was not added by the user with a different meaning...) if condition_map_preeq: condition_map_preeq[PREEQ_INDICATOR_ID] = 1.0 condition_scale_map_preeq[PREEQ_INDICATOR_ID] = LIN condition_map_sim[PREEQ_INDICATOR_ID] = 0.0 condition_scale_map_sim[PREEQ_INDICATOR_ID] = LIN for element_id, (value, preeq_value) in states_in_condition_table.items(): # for preequilibration init_par_id = f"initial_{element_id}_preeq" if ( condition_id := condition.get(PREEQUILIBRATION_CONDITION_ID) ) is not None: _set_initial_state( petab_problem, condition_id, element_id, init_par_id, condition_map_preeq, condition_scale_map_preeq, preeq_value, ) else: # need to set dummy value for preeq parameter anyways, as it # is expected below (set to 0, not nan, because will be # multiplied with indicator variable in initial assignment) condition_map_sim[init_par_id] = 0.0 condition_scale_map_sim[init_par_id] = LIN # for simulation condition_id = condition[SIMULATION_CONDITION_ID] init_par_id = f"initial_{element_id}_sim" _set_initial_state( petab_problem, condition_id, element_id, init_par_id, condition_map_sim, condition_scale_map_sim, value, ) ########################################################################## # separate fixed and variable AMICI parameters, because we may have # different fixed parameters for preeq and sim condition, but we cannot # have different variable parameters. without splitting, # merge_preeq_and_sim_pars_condition below may fail. # TODO: This can be done already in parameter mapping creation. variable_par_ids = amici_model.getParameterIds() fixed_par_ids = amici_model.getFixedParameterIds() condition_map_preeq_var, condition_map_preeq_fix = _subset_dict( condition_map_preeq, variable_par_ids, fixed_par_ids ) condition_scale_map_preeq_var, condition_scale_map_preeq_fix = _subset_dict( condition_scale_map_preeq, variable_par_ids, fixed_par_ids ) condition_map_sim_var, condition_map_sim_fix = _subset_dict( condition_map_sim, variable_par_ids, fixed_par_ids ) condition_scale_map_sim_var, condition_scale_map_sim_fix = _subset_dict( condition_scale_map_sim, variable_par_ids, fixed_par_ids ) logger.debug("Fixed parameters preequilibration: " f"{condition_map_preeq_fix}") logger.debug("Fixed parameters simulation: " f"{condition_map_sim_fix}") logger.debug("Variable parameters preequilibration: " f"{condition_map_preeq_var}") logger.debug("Variable parameters simulation: " f"{condition_map_sim_var}") petab.merge_preeq_and_sim_pars_condition( condition_map_preeq_var, condition_map_sim_var, condition_scale_map_preeq_var, condition_scale_map_sim_var, condition, ) logger.debug(f"Merged: {condition_map_sim_var}") parameter_mapping_for_condition = ParameterMappingForCondition( map_preeq_fix=condition_map_preeq_fix, map_sim_fix=condition_map_sim_fix, map_sim_var=condition_map_sim_var, scale_map_preeq_fix=condition_scale_map_preeq_fix, scale_map_sim_fix=condition_scale_map_sim_fix, scale_map_sim_var=condition_scale_map_sim_var, ) return parameter_mapping_for_condition
[docs]def create_edatas( amici_model: AmiciModel, petab_problem: petab.Problem, simulation_conditions: Union[pd.DataFrame, Dict] = None, ) -> List[amici.ExpData]: """Create list of :class:`amici.amici.ExpData` objects for PEtab problem. :param amici_model: AMICI model. :param petab_problem: Underlying PEtab problem. :param simulation_conditions: Result of :func:`petab.get_simulation_conditions`. Can be provided to save time if this has be obtained before. :return: List with one :class:`amici.amici.ExpData` per simulation condition, with filled in timepoints and data. """ if simulation_conditions is None: simulation_conditions = ( petab_problem.get_simulation_conditions_from_measurement_df() ) observable_ids = amici_model.getObservableIds() measurement_groupvar = [SIMULATION_CONDITION_ID] if PREEQUILIBRATION_CONDITION_ID in simulation_conditions: measurement_groupvar.append(petab.PREEQUILIBRATION_CONDITION_ID) measurement_dfs = dict( list(petab_problem.measurement_df.groupby(measurement_groupvar)) ) edatas = [] for _, condition in simulation_conditions.iterrows(): # Create amici.ExpData for each simulation if PREEQUILIBRATION_CONDITION_ID in condition: measurement_index = ( condition.get(SIMULATION_CONDITION_ID), condition.get(PREEQUILIBRATION_CONDITION_ID), ) else: measurement_index = (condition.get(SIMULATION_CONDITION_ID),) edata = create_edata_for_condition( condition=condition, amici_model=amici_model, measurement_df=measurement_dfs[measurement_index], petab_problem=petab_problem, observable_ids=observable_ids, ) edatas.append(edata) return edatas
[docs]def create_edata_for_condition( condition: Union[Dict, pd.Series], measurement_df: pd.DataFrame, amici_model: AmiciModel, petab_problem: petab.Problem, observable_ids: List[str], ) -> amici.ExpData: """Get :class:`amici.amici.ExpData` for the given PEtab condition. Sets timepoints, observed data and sigmas. :param condition: :class:`pandas.DataFrame` row with ``preequilibrationConditionId`` and ``simulationConditionId``. :param measurement_df: :class:`pandas.DataFrame` with measurements for the given condition. :param amici_model: AMICI model :param petab_problem: Underlying PEtab problem :param observable_ids: List of observable IDs :return: ExpData instance. """ if amici_model.nytrue != len(observable_ids): raise AssertionError( "Number of AMICI model observables does not " "match number of PEtab observables." ) # create an ExpData object edata = amici.ExpData(amici_model) edata.id = condition[SIMULATION_CONDITION_ID] if condition.get(PREEQUILIBRATION_CONDITION_ID): edata.id += "+" + condition.get(PREEQUILIBRATION_CONDITION_ID) ########################################################################## # enable initial parameters reinitialization states_in_condition_table = get_states_in_condition_table( petab_problem, condition=condition ) if condition.get(PREEQUILIBRATION_CONDITION_ID) and states_in_condition_table: state_ids = amici_model.getStateIds() state_idx_reinitalization = [ state_ids.index(s) for s, (v, v_preeq) in states_in_condition_table.items() if not np.isnan(v) ] edata.reinitialization_state_idxs_sim = state_idx_reinitalization logger.debug( "Enabling state reinitialization for condition " f"{condition.get(PREEQUILIBRATION_CONDITION_ID, '')} - " f"{condition.get(SIMULATION_CONDITION_ID)} " f"{states_in_condition_table}" ) ########################################################################## # timepoints # find replicate numbers of time points timepoints_w_reps = _get_timepoints_with_replicates(df_for_condition=measurement_df) edata.setTimepoints(timepoints_w_reps) ########################################################################## # measurements and sigmas y, sigma_y = _get_measurements_and_sigmas( df_for_condition=measurement_df, timepoints_w_reps=timepoints_w_reps, observable_ids=observable_ids, ) edata.setObservedData(y.flatten()) edata.setObservedDataStdDev(sigma_y.flatten()) return edata
def _subset_dict( full: Dict[Any, Any], *args: Collection[Any] ) -> Iterator[Dict[Any, Any]]: """Get subset of dictionary based on provided keys :param full: Dictionary to subset :param args: Collections of keys to be contained in the different subsets :return: subsetted dictionary """ for keys in args: yield {key: val for (key, val) in full.items() if key in keys} def _get_timepoints_with_replicates( df_for_condition: pd.DataFrame, ) -> List[numbers.Number]: """ Get list of timepoints including replicate measurements :param df_for_condition: PEtab measurement table subset for a single condition. :return: Sorted list of timepoints, including multiple timepoints accounting for replicate measurements. """ # create sorted list of all timepoints for which measurements exist timepoints = sorted(df_for_condition[TIME].unique().astype(float)) # find replicate numbers of time points timepoints_w_reps = [] for time in timepoints: # subselect for time df_for_time = df_for_condition[df_for_condition.time.astype(float) == time] # rep number is maximum over rep numbers for observables n_reps = max(df_for_time.groupby([OBSERVABLE_ID, TIME]).size()) # append time point n_rep times timepoints_w_reps.extend([time] * n_reps) return timepoints_w_reps def _get_measurements_and_sigmas( df_for_condition: pd.DataFrame, timepoints_w_reps: Sequence[numbers.Number], observable_ids: Sequence[str], ) -> Tuple[np.array, np.array]: """ Get measurements and sigmas Generate arrays with measurements and sigmas in AMICI format from a PEtab measurement table subset for a single condition. :param df_for_condition: Subset of PEtab measurement table for one condition :param timepoints_w_reps: Timepoints for which there exist measurements, including replicates :param observable_ids: List of observable IDs for mapping IDs to indices. :return: arrays for measurement and sigmas """ # prepare measurement matrix y = np.full(shape=(len(timepoints_w_reps), len(observable_ids)), fill_value=np.nan) # prepare sigma matrix sigma_y = y.copy() timepoints = sorted(df_for_condition[TIME].unique().astype(float)) for time in timepoints: # subselect for time df_for_time = df_for_condition[df_for_condition[TIME] == time] time_ix_0 = timepoints_w_reps.index(time) # remember used time indices for each observable time_ix_for_obs_ix = {} # iterate over measurements for _, measurement in df_for_time.iterrows(): # extract observable index observable_ix = observable_ids.index(measurement[OBSERVABLE_ID]) # update time index for observable if observable_ix in time_ix_for_obs_ix: time_ix_for_obs_ix[observable_ix] += 1 else: time_ix_for_obs_ix[observable_ix] = time_ix_0 # fill observable and possibly noise parameter y[time_ix_for_obs_ix[observable_ix], observable_ix] = measurement[ MEASUREMENT ] if isinstance(measurement.get(NOISE_PARAMETERS, None), numbers.Number): sigma_y[time_ix_for_obs_ix[observable_ix], observable_ix] = measurement[ NOISE_PARAMETERS ] return y, sigma_y
[docs]def rdatas_to_measurement_df( rdatas: Sequence[amici.ReturnData], model: AmiciModel, measurement_df: pd.DataFrame ) -> pd.DataFrame: """ Create a measurement dataframe in the PEtab format from the passed ``rdatas`` and own information. :param rdatas: A sequence of rdatas with the ordering of :func:`petab.get_simulation_conditions`. :param model: AMICI model used to generate ``rdatas``. :param measurement_df: PEtab measurement table used to generate ``rdatas``. :return: A dataframe built from the rdatas in the format of ``measurement_df``. """ simulation_conditions = petab.get_simulation_conditions(measurement_df) observable_ids = model.getObservableIds() rows = [] # iterate over conditions for (_, condition), rdata in zip(simulation_conditions.iterrows(), rdatas): # current simulation matrix y = rdata.y # time array used in rdata t = list(rdata.ts) # extract rows for condition cur_measurement_df = petab.get_rows_for_condition(measurement_df, condition) # iterate over entries for the given condition # note: this way we only generate a dataframe entry for every # row that existed in the original dataframe. if we want to # e.g. have also timepoints non-existent in the original file, # we need to instead iterate over the rdata['y'] entries for _, row in cur_measurement_df.iterrows(): # copy row row_sim = copy.deepcopy(row) # extract simulated measurement value timepoint_idx = t.index(row[TIME]) observable_idx = observable_ids.index(row[OBSERVABLE_ID]) measurement_sim = y[timepoint_idx, observable_idx] # change measurement entry row_sim[MEASUREMENT] = measurement_sim rows.append(row_sim) return pd.DataFrame(rows)
[docs]def rdatas_to_simulation_df( rdatas: Sequence[amici.ReturnData], model: AmiciModel, measurement_df: pd.DataFrame ) -> pd.DataFrame: """Create a PEtab simulation dataframe from :class:`amici.amici.ReturnData` s. See :func:`rdatas_to_measurement_df` for details, only that model outputs will appear in column ``simulation`` instead of ``measurement``.""" df = rdatas_to_measurement_df( rdatas=rdatas, model=model, measurement_df=measurement_df ) return df.rename(columns={MEASUREMENT: SIMULATION})
def _default_scaled_parameters( petab_problem: petab.Problem, problem_parameters: Optional[Dict[str, float]] = None, scaled_parameters: bool = False, ) -> Optional[Dict[str, float]]: """ Helper method to handle an unscaled or unspecified parameter vector. The parameter vector defaults to the nominal values in the PEtab parameter table. Unscaled parameter values are scaled. :param petab_problem: The PEtab problem. :param problem_parameters: Keys are PEtab parameter IDs, values are parameter values on the scale defined in the PEtab parameter table. Defaults to the nominal values in the PEtab parameter table. :param scaled_parameters: Whether `problem_parameters` are on the scale defined in the PEtab parameter table. :return: The scaled parameter vector. """ if problem_parameters is None: problem_parameters = dict( zip( petab_problem.x_ids, petab_problem.x_nominal_scaled, ) ) elif not scaled_parameters: problem_parameters = petab_problem.scale_parameters(problem_parameters) return problem_parameters