"""Provide an interface to data, parameters and results
A :class:`DataHandle` is passed in to a :class:`Model` at runtime, to provide
transparent access to the relevant data and parameters for the current
:class:`ModelRun` and iteration. It gives read access to parameters and input
data (at any computed or pre-computed timestep) and write access to output data
(at the current timestep).
"""
from copy import copy
from logging import getLogger
from types import MappingProxyType
from typing import Dict, List, Optional, Union
import numpy as np # type: ignore
from smif.data_layer.data_array import DataArray
from smif.data_layer.store import Store
from smif.exception import SmifDataError
from smif.metadata import RelativeTimestep
[docs]class DataHandle(object):
"""Get/set model parameters and data"""
def __init__(
self,
store: Store,
modelrun_name,
current_timestep,
timesteps,
model,
decision_iteration=None,
):
"""Create a DataHandle for a Model to access data, parameters and state, and to
communicate results.
Parameters
----------
store : Store
Backing store for inputs, parameters, results
modelrun_name : str
Name of the current modelrun
current_timestep : str
timesteps : list
model : Model
Model which will use this DataHandle
decision_iteration : int, default=None
ID of the current Decision iteration
"""
self.logger = getLogger(__name__)
self._store = store
self._modelrun_name = modelrun_name
self._current_timestep = current_timestep
self._timesteps = timesteps
self._decision_iteration = decision_iteration
self._model_name = model.name
self._inputs = model.inputs
self._outputs = model.outputs
self._model = model
modelrun = self._store.read_model_run(self._modelrun_name)
sos_model = self._store.read_sos_model(modelrun["sos_model"])
self._scenario_dependencies = {} # type: Dict[str, Dict]
self._model_dependencies = {} # type: Dict[str, Dict]
scenario_variants = modelrun["scenarios"]
self._load_dependencies(sos_model, scenario_variants)
self.logger.debug(
"Create with %s model, %s scenario dependencies",
len(self._scenario_dependencies),
len(self._model_dependencies),
)
self._parameters = {} # type: Dict[str, DataArray]
self._load_parameters(sos_model, modelrun["narratives"])
def _load_dependencies(self, sos_model, scenario_variants):
"""Load Model dependencies as a dict with {input_name: list[Dependency]}"""
for dep in sos_model["model_dependencies"]:
if dep["sink"] == self._model_name:
input_name = dep["sink_input"]
self._model_dependencies[input_name] = {
"source_model_name": dep["source"],
"source_output_name": dep["source_output"],
"type": "model",
}
for dep in sos_model["scenario_dependencies"]:
if dep["sink"] == self._model_name:
input_name = dep["sink_input"]
self._scenario_dependencies[input_name] = {
"source_model_name": dep["source"],
"source_output_name": dep["source_output"],
"type": "scenario",
"variant": scenario_variants[dep["source"]],
}
def _load_parameters(self, sos_model, concrete_narratives):
"""Load parameter values for model run
Parameters for each of the contained sector models are loaded
into memory as a data_handle is initialised.
Firstly, default values for the parameters are loaded from the parameter
specs contained within each of the sector models
Then, the data from the list of narrative variants linked to the current
model run are loaded into the parameters contained within the
Arguments
---------
sos_model : dict
A configuration dictionary of a system-of-systems model
concrete_narratives: dict of list
Links narrative names to a list of variants to furnish parameters
with values {narrative_name: [variant_name, ...]}
"""
# Populate the parameters with their default values
for parameter in self._model.parameters.values():
self._parameters[parameter.name] = self._store.read_model_parameter_default(
self._model.name, parameter.name
)
# Load in the concrete narrative and selected variants from the model run
for narrative_name, variant_names in concrete_narratives.items():
# Load the narrative
try:
narrative = [
x for x in sos_model["narratives"] if x["name"] == narrative_name
][0]
except IndexError:
msg = "Couldn't find a match for {} in {}"
raise IndexError(msg.format(narrative_name, sos_model["name"]))
self.logger.debug("Loaded narrative: %s", narrative)
self.logger.debug("Considering variants: %s", variant_names)
# Read parameter data from each variant, later variants overriding
# previous parameter values
for variant_name in variant_names:
try:
parameter_list = narrative["provides"][self._model.name]
except KeyError:
parameter_list = []
for parameter in parameter_list:
da = self._store.read_narrative_variant_data(
sos_model["name"], narrative_name, variant_name, parameter
)
self._parameters[parameter].update(da)
[docs] def derive_for(self, model):
"""Derive a new DataHandle configured for the given Model
Parameters
----------
model : Model
Model which will use this DataHandle
"""
return DataHandle(
store=self._store,
modelrun_name=self._modelrun_name,
current_timestep=self._current_timestep,
timesteps=list(self.timesteps),
model=model,
decision_iteration=self._decision_iteration,
)
def __getitem__(self, key):
if key in self._parameters:
return self.get_parameter(key)
elif key in self._inputs:
return self.get_data(key)
elif key in self._outputs:
return self.get_results(key)
else:
raise KeyError(
"'%s' not recognised as input, output or parameter for '%s'"
% (key, self._model_name)
)
def __setitem__(self, key, value):
if hasattr(value, "as_ndarray"):
raise TypeError("Pass in a numpy array")
self.set_results(key, value)
@property
def current_timestep(self):
"""Current timestep"""
return self._current_timestep
@property
def previous_timestep(self):
"""Previous timestep"""
return RelativeTimestep.PREVIOUS.resolve_relative_to(
self._current_timestep, self._timesteps
)
@property
def base_timestep(self):
"""Base timestep"""
return RelativeTimestep.BASE.resolve_relative_to(
self._current_timestep, self._timesteps
)
@property
def timesteps(self):
"""All timesteps (as tuple)"""
return tuple(self._timesteps)
@property
def decision_iteration(self):
return self._decision_iteration
[docs] def get_state(self):
"""The current state of the model
If the DataHandle instance has a timestep, then state is
established from the state file.
Returns
-------
list of tuple
A list of (intervention name, build_year) installed in the current timestep
Raises
------
ValueError
If self._current_timestep is None an error is raised.
"""
if self._current_timestep is None:
raise ValueError("You must pass a timestep value to get state")
else:
sos_state = self._store.read_state(
self._modelrun_name, self._current_timestep, self._decision_iteration
)
return sos_state
[docs] def get_current_interventions(self):
"""Get the interventions that exist in the current state
Returns
-------
dict of dicts
A dict of intervention dicts with build_year attribute keyed by name
"""
state = self.get_state()
current_interventions = {}
all_interventions = self._store.read_interventions(self._model_name)
for decision in state:
name = decision["name"]
build_year = decision["build_year"]
try:
serialised = all_interventions[name]
serialised["build_year"] = build_year
current_interventions[name] = serialised
except KeyError:
# ignore if intervention is not in current set
pass
msg = "State matched with %s interventions"
self.logger.info(msg, len(current_interventions))
return current_interventions
[docs] def get_data(self, input_name: str, timestep=None) -> DataArray:
"""Get data required for model inputs
Parameters
----------
input_name : str
timestep : RelativeTimestep or int, optional
defaults to RelativeTimestep.CURRENT
Returns
-------
smif.data_layer.data_array.DataArray
Contains data annotated with the metadata and provides utility methods
to access the data in different ways
Raises
------
SmifDataError
If any data reading error occurs below this method, the error is
handled and reraised within the context of the current call
"""
if input_name not in self._inputs:
raise KeyError(
"'{}' not recognised as input for '{}'".format(
input_name, self._model_name
)
)
timestep = self._resolve_timestep(timestep)
dep = self._resolve_source(input_name)
self.logger.debug(
"Read %s %s %s",
dep["source_model_name"],
dep["source_output_name"],
timestep,
)
if dep["type"] == "scenario":
data = self._get_scenario(dep, timestep, input_name)
else:
input_spec = self._inputs[input_name]
data = self._get_result(dep, timestep, input_spec)
return data
def _resolve_timestep(self, timestep):
"""Resolves a relative timestep to an absolute timestep
Arguments
---------
timestep : RelativeTimestep or int
Returns
-------
int
"""
if self._current_timestep is None:
if timestep is None:
raise ValueError("You must provide a timestep to obtain data")
elif hasattr(timestep, "resolve_relative_to"):
timestep = timestep.resolve_relative_to(
self._timesteps[0], self._timesteps
)
else:
assert isinstance(timestep, int) and timestep in self._timesteps
else:
if timestep is None:
timestep = self._current_timestep
elif hasattr(timestep, "resolve_relative_to"):
timestep = timestep.resolve_relative_to(
self._current_timestep, self._timesteps
)
else:
assert isinstance(timestep, int) and timestep <= self._current_timestep
return timestep
def _get_result(self, dep, timestep, input_spec) -> DataArray:
"""Retrieves a model result for a dependency"""
output_spec = copy(input_spec)
output_spec.name = dep["source_output_name"]
self.logger.debug(
"Getting model result for %s via %s from %s", input_spec, dep, output_spec
)
try:
data = self._store.read_results(
self._modelrun_name,
dep["source_model_name"], # read from source model
output_spec, # using source model output spec
timestep,
self._decision_iteration,
)
data.name = input_spec.name # ensure name matches input (as caller expects)
except SmifDataError as ex:
msg = "Could not read data for output '{}' from '{}' in {}, iteration {}"
raise SmifDataError(
msg.format(
output_spec.name,
dep["source_model_name"],
timestep,
self._decision_iteration,
)
) from ex
return data
def _get_scenario(self, dep, timestep, input_name) -> DataArray:
"""Retrieves data from a scenario
Arguments
---------
dep : dict
A scenario dependency
timestep : int
Returns
-------
DataArray
"""
try:
data = self._store.read_scenario_variant_data(
dep["source_model_name"], # read from a given scenario model
dep["variant"], # with given scenario variant
dep["source_output_name"], # using output (variable) name
timestep,
)
data.name = input_name # ensure name matches input (as caller expects)
except SmifDataError as ex:
msg = "Could not read data for output '{}' from '{}.{}' in {}"
raise SmifDataError(
msg.format(
dep["source_output_name"],
dep["source_model_name"],
dep["variant"],
timestep,
)
) from ex
return data
def _resolve_source(self, input_name) -> Dict:
"""Find best dependency to provide input data
Returns
-------
dep : dict
A scenario or model dependency dictionary
"""
scenario_dep = None
try:
scenario_dep = self._scenario_dependencies[input_name]
except KeyError:
pass
model_dep = None
try:
model_dep = self._model_dependencies[input_name]
except KeyError:
pass
if scenario_dep is not None and model_dep is not None:
# if multiple dependencies, use scenario for timestep 0, model for
# subsequent timesteps
if self._current_timestep == self._timesteps[0]:
dep = scenario_dep
else:
dep = model_dep
elif scenario_dep is not None:
# else assume single dependency per input
dep = scenario_dep
elif model_dep is not None:
dep = model_dep
else:
raise SmifDataError(
"Dependency not defined for input '{}' in model '{}'".format(
input_name, self._model_name
)
)
return dep
[docs] def get_base_timestep_data(self, input_name):
"""Get data from the base timestep as required for model inputs
Parameters
----------
input_name : str
Returns
-------
smif.data_layer.data_array.DataArray
"""
return self.get_data(input_name, RelativeTimestep.BASE)
[docs] def get_previous_timestep_data(self, input_name):
"""Get data from the previous timestep as required for model inputs
Parameters
----------
input_name : str
Returns
-------
smif.data_layer.data_array.DataArray
"""
return self.get_data(input_name, RelativeTimestep.PREVIOUS)
[docs] def get_parameter(self, parameter_name):
"""Get the value for a parameter
Parameters
----------
parameter_name : str
Returns
-------
smif.data_layer.data_array.DataArray
Contains data annotated with the metadata and provides utility methods
to access the data in different ways
"""
if parameter_name not in self._parameters:
raise KeyError(
"'{}' not recognised as parameter for '{}'".format(
parameter_name, self._model_name
)
)
return self._parameters[parameter_name]
[docs] def get_parameters(self):
"""Get all parameter values
Returns
-------
parameters : MappingProxyType
Read-only view of parameters (like a read-only dict)
"""
return MappingProxyType(self._parameters)
[docs] def set_results(self, output_name, data):
"""Set results values for model outputs
Parameters
----------
output_name : str
data : numpy.ndarray
"""
if hasattr(data, "as_ndarray"):
raise TypeError("Pass in a numpy array")
if output_name not in self._outputs:
raise KeyError(
"'{}' not recognised as output for '{}'".format(
output_name, self._model_name
)
)
self.logger.debug(
"Write %s %s %s", self._model_name, output_name, self._current_timestep
)
spec = self._outputs[output_name]
da = DataArray(spec, data)
self._store.write_results(
da,
self._modelrun_name,
self._model_name,
self._current_timestep,
self._decision_iteration,
)
[docs] def get_results(self, output_name, decision_iteration=None, timestep=None):
"""Get results values for model outputs
Parameters
----------
output_name : str
The name of an output for `model_name`
decision_iteration : int, default=None
timestep : int or RelativeTimestep, default=None
Returns
-------
smif.data_layer.data_array.DataArray
Contains data annotated with the metadata and provides utility methods
to access the data in different ways
Notes
-----
Access to model results is only granted to models contained
within self._model if self._model is a smif.model.model.CompositeModel
"""
model_name = self._model.name
# resolve timestep
if timestep is None:
timestep = self._current_timestep
elif isinstance(timestep, RelativeTimestep):
timestep = timestep.resolve_relative_to(
self._current_timestep, self._timesteps
)
else:
assert isinstance(timestep, int) and timestep <= self._current_timestep
# find output spec
try:
spec = self._model.outputs[output_name]
except KeyError:
msg = "'{}' not recognised as output for '{}'"
raise KeyError(msg.format(output_name, model_name))
if decision_iteration is None:
decision_iteration = self._decision_iteration
self.logger.debug("Read %s %s %s", model_name, output_name, timestep)
return self._store.read_results(
self._modelrun_name, model_name, spec, timestep, decision_iteration
)
[docs] def read_unit_definitions(self) -> List[str]:
"""Read unit definitions
Returns
-------
list[str]
"""
return self._store.read_unit_definitions()
[docs] def read_coefficients(self, source_dim: str, destination_dim: str) -> np.ndarray:
"""Reads coefficients from the store
Coefficients are uniquely identified by their source/destination dimensions.
This method and `write_coefficients` implement caching of conversion
coefficients between dimensions.
Parameters
----------
source_dim: str
Dimension name
destination_dim: str
Dimension name
Returns
-------
numpy.ndarray
"""
data = self._store.read_coefficients(source_dim, destination_dim)
return data
[docs] def write_coefficients(
self, source_dim: str, destination_dim: str, data: np.ndarray
):
"""Writes coefficients to the store
Coefficients are uniquely identified by their source/destination dimensions.
This method and `read_coefficients` implement caching of conversion
coefficients between dimensions.
Parameters
----------
source_dim: str
Dimension name
destination_dim: str
Dimension name
data : numpy.ndarray
"""
data = self._store.write_coefficients(source_dim, destination_dim, data)
return data
[docs]class ResultsHandle(object):
"""Results access for decision modules"""
def __init__(
self,
store: Store,
modelrun_name: str,
sos_model,
current_timestep: int,
timesteps: Optional[List[int]] = None,
decision_iteration: Optional[int] = None,
):
self._store = store
self._modelrun_name = modelrun_name
self._sos_model = sos_model
self._current_timestep = current_timestep
self._timesteps = timesteps
self._decision_iteration = decision_iteration
@property
def base_timestep(self) -> int:
return self._timesteps[0]
@property
def current_timestep(self) -> int:
return self._current_timestep
@property
def previous_timestep(self) -> Union[None, int]:
rel = RelativeTimestep.PREVIOUS
return rel.resolve_relative_to(self._current_timestep, self._timesteps)
@property
def decision_iteration(self) -> int:
return self._decision_iteration
[docs] def get_results(
self,
model_name: str,
output_name: str,
timestep: Union[int, RelativeTimestep],
decision_iteration: int,
) -> DataArray:
"""Access model results
Parameters
----------
model_name : str
output_name : str
timestep : [int, RelativeTimestep]
decision_iteration : int
Returns
-------
smif.data_layer.data_array.DataArray
Contains data annotated with the metadata and provides utility methods
to access the data in different ways
"""
# resolve timestep
if hasattr(timestep, "resolve_relative_to"):
timestep_value = timestep.resolve_relative_to(
self._current_timestep, self._timesteps
) # type: Union[int, None]
else:
assert isinstance(timestep, int) and timestep <= self._current_timestep
timestep_value = timestep
if model_name in [model.name for model in self._sos_model.models]:
results_model = self._sos_model.get_model(model_name)
else:
msg = "Model '{}' is not contained in SosModel '{}'. Found {}."
raise KeyError(
msg.format(model_name, self._sos_model.name, self._sos_model.models)
)
try:
spec = results_model.outputs[output_name]
except KeyError:
msg = "'{}' not recognised as output for '{}'"
raise KeyError(msg.format(output_name, model_name))
results = self._store.read_results(
self._modelrun_name, model_name, spec, timestep_value, decision_iteration
)
return results
[docs] def get_state(self, timestep: int, decision_iteration: int) -> List[Dict]:
"""Retrieve the pre-decision state of the model
If the DataHandle instance has a timestep, then state is
established from the state file.
Returns
-------
List[Dict]
A list of {'name', 'build_year'} dictionaries showing the history of
decisions made up to this point
"""
state = self._store.read_state(
self._modelrun_name, timestep, decision_iteration
)
return state