"""The store provides a common data interface to smif configuration, data and metadata.
Raises
------
SmifDataNotFoundError
If data cannot be found in the store when try to read from the store
SmifDataExistsError
If data already exists in the store when trying to write to the store
(use an update method instead)
SmifDataMismatchError
Data presented to read, write and update methods is in the
incorrect format or of wrong dimensions to that expected
SmifDataReadError
When unable to read data e.g. unable to handle file type or connect
to database
"""
import itertools
import logging
import os
from collections import OrderedDict, defaultdict
from copy import deepcopy
from operator import itemgetter
from os.path import splitext
from typing import Dict, List, Optional, Union
import numpy as np # type: ignore
from smif.data_layer import DataArray
from smif.data_layer.abstract_data_store import DataStore
from smif.data_layer.abstract_metadata_store import MetadataStore
from smif.data_layer.file import (
CSVDataStore,
FileMetadataStore,
ParquetDataStore,
YamlConfigStore,
)
from smif.data_layer.validate import (
validate_sos_model_config,
validate_sos_model_format,
)
from smif.exception import SmifDataError, SmifDataNotFoundError
from smif.metadata.spec import Spec
[docs]class Store:
"""Common interface to data store, composed of config, metadata and data store
implementations.
Parameters
----------
config_store: ~smif.data_layer.abstract_config_store.ConfigStore
metadata_store: ~smif.data_layer.abstract_metadata_store.MetadataStore
data_store: ~smif.data_layer.abstract_data_store.DataStore
"""
def __init__(
self,
config_store,
metadata_store: MetadataStore,
data_store: DataStore,
model_base_folder=".",
):
self.logger = logging.getLogger(__name__)
self.config_store = config_store
self.metadata_store = metadata_store
self.data_store = data_store
# base folder for any relative paths to models
self.model_base_folder = str(model_base_folder)
[docs] @classmethod
def from_dict(cls, config):
"""Create Store from configuration dict"""
try:
interface = config["interface"]
except KeyError:
logging.warning("No interface provided for Results(). Assuming local_csv")
interface = "local_csv"
try:
directory = config["dir"]
except KeyError:
logging.warning("No directory provided for Results(). Assuming '.'")
directory = "."
# Check that the directory is valid
if not os.path.isdir(directory):
raise ValueError("Expected {} to be a valid directory".format(directory))
if interface == "local_csv":
data_store = CSVDataStore(directory)
elif interface == "local_parquet":
data_store = ParquetDataStore(directory)
else:
raise ValueError(
'Unsupported interface "{}". Supply local_csv or local_parquet'.format(
interface
)
)
return cls(
config_store=YamlConfigStore(directory),
metadata_store=FileMetadataStore(directory),
data_store=data_store,
model_base_folder=directory,
)
#
# CONFIG
#
# region Model runs
[docs] def read_model_runs(self):
"""Read all system-of-system model runs
Returns
-------
list[~smif.controller.modelrun.ModelRun]
"""
return sorted(self.config_store.read_model_runs(), key=itemgetter("name"))
[docs] def read_model_run(self, model_run_name):
"""Read a system-of-system model run
Parameters
----------
model_run_name : str
Returns
-------
~smif.controller.modelrun.ModelRun
"""
return self.config_store.read_model_run(model_run_name)
[docs] def write_model_run(self, model_run):
"""Write system-of-system model run
Parameters
----------
model_run : ~smif.controller.modelrun.ModelRun
"""
self.config_store.write_model_run(model_run)
[docs] def update_model_run(self, model_run_name, model_run):
"""Update system-of-system model run
Parameters
----------
model_run_name : str
model_run : ~smif.controller.modelrun.ModelRun
"""
self.config_store.update_model_run(model_run_name, model_run)
[docs] def delete_model_run(self, model_run_name):
"""Delete a system-of-system model run
Parameters
----------
model_run_name : str
"""
self.config_store.delete_model_run(model_run_name)
# endregion
# region System-of-systems models
[docs] def read_sos_models(self):
"""Read all system-of-system models
Returns
-------
list[~smif.model.sos_model.SosModel]
"""
return sorted(self.config_store.read_sos_models(), key=itemgetter("name"))
[docs] def read_sos_model(self, sos_model_name):
"""Read a specific system-of-system model
Parameters
----------
sos_model_name : str
Returns
-------
~smif.model.sos_model.SosModel
"""
return self.config_store.read_sos_model(sos_model_name)
[docs] def write_sos_model(self, sos_model):
"""Write system-of-system model
Parameters
----------
sos_model : ~smif.model.sos_model.SosModel
"""
validate_sos_model_format(sos_model)
self.config_store.write_sos_model(sos_model)
[docs] def update_sos_model(self, sos_model_name, sos_model):
"""Update system-of-system model
Parameters
----------
sos_model_name : str
sos_model : ~smif.model.sos_model.SosModel
"""
models = self.config_store.read_models()
scenarios = self.config_store.read_scenarios()
validate_sos_model_config(sos_model, models, scenarios)
self.config_store.update_sos_model(sos_model_name, sos_model)
[docs] def delete_sos_model(self, sos_model_name):
"""Delete a system-of-system model
Parameters
----------
sos_model_name : str
"""
self.config_store.delete_sos_model(sos_model_name)
# endregion
# region Models
[docs] def read_models(self, skip_coords=False):
"""Read all models
Returns
-------
list[~smif.model.model.Model]
"""
models = sorted(self.config_store.read_models(), key=itemgetter("name"))
if not skip_coords:
models = [
self._add_coords(model, ("inputs", "outputs", "parameters"))
for model in models
]
return models
[docs] def read_model(self, model_name, skip_coords=False):
"""Read a model
Parameters
----------
model_name : str
Returns
-------
~smif.model.model.Model
"""
model = self.config_store.read_model(model_name)
if not skip_coords:
model = self._add_coords(model, ("inputs", "outputs", "parameters"))
return model
[docs] def write_model(self, model):
"""Write a model
Parameters
----------
model : ~smif.model.model.Model
"""
self.config_store.write_model(model)
[docs] def update_model(self, model_name, model):
"""Update a model
Parameters
----------
model_name : str
model : ~smif.model.model.Model
"""
self.config_store.update_model(model_name, model)
[docs] def delete_model(self, model_name):
"""Delete a model
Parameters
----------
model_name : str
"""
self.config_store.delete_model(model_name)
# endregion
# region Scenarios
[docs] def read_scenarios(self, skip_coords=False):
"""Read scenarios
Returns
-------
list[~smif.model.ScenarioModel]
"""
scenarios = sorted(self.config_store.read_scenarios(), key=itemgetter("name"))
if not skip_coords:
scenarios = [
self._add_coords(scenario, ["provides"]) for scenario in scenarios
]
return scenarios
[docs] def read_scenario(self, scenario_name, skip_coords=False):
"""Read a scenario
Parameters
----------
scenario_name : str
Returns
-------
~smif.model.ScenarioModel
"""
scenario = self.config_store.read_scenario(scenario_name)
if not skip_coords:
scenario = self._add_coords(scenario, ["provides"])
return scenario
[docs] def write_scenario(self, scenario):
"""Write scenario
Parameters
----------
scenario : ~smif.model.ScenarioModel
"""
self.config_store.write_scenario(scenario)
[docs] def update_scenario(self, scenario_name, scenario):
"""Update scenario
Parameters
----------
scenario_name : str
scenario : ~smif.model.ScenarioModel
"""
self.config_store.update_scenario(scenario_name, scenario)
[docs] def delete_scenario(self, scenario_name):
"""Delete scenario from project configuration
Parameters
----------
scenario_name : str
"""
self.config_store.delete_scenario(scenario_name)
[docs] def prepare_scenario(self, scenario_name, list_of_variants):
"""Modify {scenario_name} config file to include multiple
scenario variants.
Parameters
----------
scenario_name: str
list_of_variants: list[int] - indices of scenario variants
"""
scenario = self.read_scenario(scenario_name)
# Check that template scenario file does not define more than one variant
if not scenario["variants"] or len(scenario["variants"]) > 1:
raise SmifDataError(
"Template scenario file must define one" " unique template variant."
)
# Read variant defined in template scenario file
variant_template_name = scenario["variants"][0]["name"]
base_variant = self.read_scenario_variant(scenario_name, variant_template_name)
self.delete_scenario_variant(scenario_name, variant_template_name)
# Read template names of scenario variant data files
output_filenames = {} # output_name => (base, ext)
# root is a dict. keyed on scenario outputs.
# Entries contain the root of the variants filenames
for output in scenario["provides"]:
output_name = output["name"]
base, ext = splitext(base_variant["data"][output_name])
output_filenames[output_name] = base, ext
# Now modify scenario file
for variant_number in list_of_variants:
# Copying the variant dict is required when underlying config_store
# is an instance of MemoryConfigStore, which attribute _scenarios holds
# a reference to the variant object passed to update or
# write_scenario_variant
variant = deepcopy(base_variant)
variant["name"] = "{}_{:03d}".format(scenario_name, variant_number)
variant["description"] = "{} variant number {:03d}".format(
scenario_name, variant_number
)
for output_name, (base, ext) in output_filenames.items():
variant["data"][output_name] = "{}{:03d}{}".format(
base, variant_number, ext
)
self.write_scenario_variant(scenario_name, variant)
[docs] def prepare_model_runs(self, model_run_name, scenario_name, first_var, last_var):
"""Write multiple model run config files corresponding to multiple
scenario variants of {scenario_name}, based on template {model_run_name}
Write batchfile containing each of the generated model runs
Parameters
----------
model_run_name: str
scenario_name: str
first_var: int - between 0 and number of variants-1
last_var: int - between first_var and number of variants-1
"""
model_run = self.read_model_run(model_run_name)
scenario = self.read_scenario(scenario_name)
# read strategies from config store (Store.read_strategies pulls together data on
# interventions as well, which we don't need here)
config_strategies = self.config_store.read_strategies(model_run_name)
# Open batchfile
f_handle = open(model_run_name + ".batch", "w")
# For each variant model_run, write a new model run file with corresponding
# scenario variant and update batchfile
for variant in scenario["variants"][first_var : last_var + 1]:
variant_model_run_name = model_run_name + "_" + variant["name"]
model_run_copy = deepcopy(model_run)
model_run_copy["name"] = variant_model_run_name
model_run_copy["scenarios"][scenario_name] = variant["name"]
self.write_model_run(model_run_copy)
self.config_store.write_strategies(
variant_model_run_name, config_strategies
)
f_handle.write(model_run_name + "_" + variant["name"] + "\n")
# Close batchfile
f_handle.close()
# endregion
# region Scenario Variants
[docs] def read_scenario_variants(self, scenario_name):
"""Read variants of a given scenario
Parameters
----------
scenario_name : str
Returns
-------
list[dict]
"""
scenario_variants = self.config_store.read_scenario_variants(scenario_name)
return sorted(scenario_variants, key=itemgetter("name"))
[docs] def read_scenario_variant(self, scenario_name, variant_name):
"""Read a scenario variant
Parameters
----------
scenario_name : str
variant_name : str
Returns
-------
dict
"""
return self.config_store.read_scenario_variant(scenario_name, variant_name)
[docs] def write_scenario_variant(self, scenario_name, variant):
"""Write scenario to project configuration
Parameters
----------
scenario_name : str
variant : dict
"""
self.config_store.write_scenario_variant(scenario_name, variant)
[docs] def update_scenario_variant(self, scenario_name, variant_name, variant):
"""Update scenario to project configuration
Parameters
----------
scenario_name : str
variant_name : str
variant : dict
"""
self.config_store.update_scenario_variant(scenario_name, variant_name, variant)
[docs] def delete_scenario_variant(self, scenario_name, variant_name):
"""Delete scenario from project configuration
Parameters
----------
scenario_name : str
variant_name : str
"""
self.config_store.delete_scenario_variant(scenario_name, variant_name)
# endregion
# region Narratives
[docs] def read_narrative(self, sos_model_name, narrative_name):
"""Read narrative from sos_model
Parameters
----------
sos_model_name : str
narrative_name : str
"""
return self.config_store.read_narrative(sos_model_name, narrative_name)
# endregion
# region Strategies
[docs] def read_strategies(self, model_run_name):
"""Read strategies for a given model run
Parameters
----------
model_run_name : str
Returns
-------
list[dict]
"""
strategies = deepcopy(self.config_store.read_strategies(model_run_name))
for strategy in strategies:
if strategy["type"] == "pre-specified-planning":
strategy["interventions"] = self.data_store.read_strategy_interventions(
strategy
)
return strategies
[docs] def write_strategies(self, model_run_name, strategies):
"""Write strategies for a given model_run
Parameters
----------
model_run_name : str
strategies : list[dict]
"""
self.config_store.write_strategies(model_run_name, strategies)
[docs] def convert_strategies_data(self, model_run_name, tgt_store, noclobber=False):
strategies = self.read_strategies(model_run_name)
for strategy in strategies:
if strategy["type"] == "pre-specified-planning":
data_exists = tgt_store.read_strategy_interventions(
strategy, assert_exists=True
)
if not (noclobber and data_exists):
data = self.read_strategy_interventions(strategy)
tgt_store.write_strategy_interventions(strategy, data)
# endregion
#
# METADATA
#
# region Units
[docs] def read_unit_definitions(self) -> List[str]:
"""Reads custom unit definitions
Returns
-------
list[str]
Pint-compatible unit definitions
"""
return self.metadata_store.read_unit_definitions()
[docs] def write_unit_definitions(self, definitions):
"""Reads custom unit definitions
Parameters
----------
list[str]
Pint-compatible unit definitions
"""
self.metadata_store.write_unit_definitions(definitions)
# endregion
# region Dimensions
[docs] def read_dimensions(self, skip_coords=False):
"""Read dimensions
Returns
-------
list[~smif.metadata.coords.Coords]
"""
return self.metadata_store.read_dimensions(skip_coords)
[docs] def read_dimension(self, dimension_name, skip_coords=False):
"""Return dimension
Parameters
----------
dimension_name : str
Returns
-------
~smif.metadata.coords.Coords
A dimension definition (including elements)
"""
return self.metadata_store.read_dimension(dimension_name, skip_coords)
[docs] def write_dimension(self, dimension):
"""Write dimension to project configuration
Parameters
----------
dimension : ~smif.metadata.coords.Coords
"""
self.metadata_store.write_dimension(dimension)
[docs] def update_dimension(self, dimension_name, dimension):
"""Update dimension
Parameters
----------
dimension_name : str
dimension : ~smif.metadata.coords.Coords
"""
self.metadata_store.update_dimension(dimension_name, dimension)
[docs] def delete_dimension(self, dimension_name):
"""Delete dimension
Parameters
----------
dimension_name : str
"""
self.metadata_store.delete_dimension(dimension_name)
def _add_coords(self, item, keys):
"""Add coordinates to spec definitions on an object"""
item = deepcopy(item)
for key in keys:
spec_list = item[key]
for spec in spec_list:
if "dims" in spec and spec["dims"]:
spec["coords"] = {
dim: self.read_dimension(dim)["elements"]
for dim in spec["dims"]
}
return item
# endregion
#
# DATA
#
# region Scenario Variant Data
[docs] def read_scenario_variant_data(
self,
scenario_name: str,
variant_name: str,
variable: str,
timestep: Optional[int] = None,
timesteps: Optional[List[int]] = None,
assert_exists: bool = False,
) -> Union[DataArray, bool]:
"""Read scenario data file
Parameters
----------
scenario_name : str
variant_name : str
variable : str
timestep : int
Returns
-------
data : ~smif.data_layer.data_array.DataArray
"""
variant = self.read_scenario_variant(scenario_name, variant_name)
key = self._key_from_data(
variant["data"][variable], scenario_name, variant_name, variable
)
scenario = self.read_scenario(scenario_name)
spec_dict = _pick_from_list(scenario["provides"], variable)
spec = Spec.from_dict(spec_dict)
if assert_exists:
return self.data_store.scenario_variant_data_exists(key)
else:
return self.data_store.read_scenario_variant_data(
key, spec, timestep, timesteps
)
[docs] def write_scenario_variant_data(self, scenario_name, variant_name, data):
"""Write scenario data file
Parameters
----------
scenario_name : str
variant_name : str
data : ~smif.data_layer.data_array.DataArray
"""
variant = self.read_scenario_variant(scenario_name, variant_name)
key = self._key_from_data(
variant["data"][data.spec.name], scenario_name, variant_name, data.spec.name
)
self.data_store.write_scenario_variant_data(key, data)
[docs] def convert_scenario_data(self, model_run_name, tgt_store, noclobber=False):
model_run = self.read_model_run(model_run_name)
# Convert scenario data for model run
for scenario_name in model_run["scenarios"]:
for variant in self.read_scenario_variants(scenario_name):
for variable in variant["data"]:
data_exists = tgt_store.read_scenario_variant_data(
scenario_name, variant["name"], variable, assert_exists=True
)
if not (noclobber and data_exists):
data_array = self.read_scenario_variant_data(
scenario_name, variant["name"], variable
)
tgt_store.write_scenario_variant_data(
scenario_name, variant["name"], data_array
)
# endregion
# region Narrative Data
[docs] def read_narrative_variant_data(
self,
sos_model_name,
narrative_name,
variant_name,
parameter_name,
timestep=None,
assert_exists=False,
):
"""Read narrative data file
Parameters
----------
sos_model_name : str
narrative_name : str
variant_name : str
parameter_name : str
timestep : int (optional)
If None, read data for all timesteps
Returns
-------
~smif.data_layer.data_array.DataArray
"""
sos_model = self.read_sos_model(sos_model_name)
narrative = _pick_from_list(sos_model["narratives"], narrative_name)
if narrative is None:
msg = "Narrative name '{}' does not exist in sos_model '{}'"
raise SmifDataNotFoundError(msg.format(narrative_name, sos_model_name))
variant = _pick_from_list(narrative["variants"], variant_name)
if variant is None:
msg = "Variant name '{}' does not exist in narrative '{}'"
raise SmifDataNotFoundError(msg.format(variant_name, narrative_name))
key = self._key_from_data(
variant["data"][parameter_name],
narrative_name,
variant_name,
parameter_name,
)
if assert_exists:
return self.data_store.narrative_variant_data_exists(key)
else:
spec_dict = None
# find sector model which needs this parameter, to get spec definition
for model_name, params in narrative["provides"].items():
if parameter_name in params:
sector_model = self.read_model(model_name)
spec_dict = _pick_from_list(
sector_model["parameters"], parameter_name
)
break
# find spec
if spec_dict is None:
raise SmifDataNotFoundError(
"Parameter {} not found in any of {}".format(
parameter_name, sos_model["sector_models"]
)
)
spec = Spec.from_dict(spec_dict)
return self.data_store.read_narrative_variant_data(key, spec, timestep)
[docs] def write_narrative_variant_data(
self, sos_model_name, narrative_name, variant_name, data
):
"""Read narrative data file
Parameters
----------
sos_model_name : str
narrative_name : str
variant_name : str
data : ~smif.data_layer.data_array.DataArray
"""
sos_model = self.read_sos_model(sos_model_name)
narrative = _pick_from_list(sos_model["narratives"], narrative_name)
variant = _pick_from_list(narrative["variants"], variant_name)
key = self._key_from_data(
variant["data"][data.spec.name],
narrative_name,
variant_name,
data.spec.name,
)
self.data_store.write_narrative_variant_data(key, data)
[docs] def convert_narrative_data(self, sos_model_name, tgt_store, noclobber=False):
sos_model = self.read_sos_model(sos_model_name)
for narrative in sos_model["narratives"]:
for variant in narrative["variants"]:
for param in variant["data"]:
data_exists = tgt_store.read_narrative_variant_data(
sos_model_name,
narrative["name"],
variant["name"],
param,
assert_exists=True,
)
if not (noclobber and data_exists):
data_array = self.read_narrative_variant_data(
sos_model_name, narrative["name"], variant["name"], param
)
tgt_store.write_narrative_variant_data(
sos_model_name, narrative["name"], variant["name"], data_array
)
[docs] def read_model_parameter_default(
self, model_name, parameter_name, assert_exists=False
):
"""Read default data for a sector model parameter
Parameters
----------
model_name : str
parameter_name : str
Returns
-------
~smif.data_layer.data_array.DataArray
"""
model = self.read_model(model_name)
param = _pick_from_list(model["parameters"], parameter_name)
spec = Spec.from_dict(param)
try:
path = param["default"]
except TypeError:
raise SmifDataNotFoundError(
"Parameter {} not found in model {}".format(parameter_name, model_name)
)
except KeyError:
path = "default__{}__{}.csv".format(model_name, parameter_name)
key = self._key_from_data(path, model_name, parameter_name)
if assert_exists:
return self.data_store.model_parameter_default_data_exists(key)
else:
return self.data_store.read_model_parameter_default(key, spec)
[docs] def write_model_parameter_default(self, model_name, parameter_name, data):
"""Write default data for a sector model parameter
Parameters
----------
model_name : str
parameter_name : str
data : ~smif.data_layer.data_array.DataArray
"""
model = self.read_model(model_name, skip_coords=True)
param = _pick_from_list(model["parameters"], parameter_name)
try:
path = param["default"]
except TypeError:
raise SmifDataNotFoundError(
"Parameter {} not found in model {}".format(parameter_name, model_name)
)
except KeyError:
path = "default__{}__{}.csv".format(model_name, parameter_name)
key = self._key_from_data(path, model_name, parameter_name)
self.data_store.write_model_parameter_default(key, data)
[docs] def convert_model_parameter_default_data(
self, sector_model_name, tgt_store, noclobber=False
):
sector_model = self.read_model(sector_model_name)
for parameter in sector_model["parameters"]:
data_exists = tgt_store.read_model_parameter_default(
sector_model_name, parameter["name"], assert_exists=True
)
if not (noclobber and data_exists):
data_array = self.read_model_parameter_default(
sector_model_name, parameter["name"]
)
tgt_store.write_model_parameter_default(
sector_model_name, parameter["name"], data_array
)
# endregion
# region Interventions
[docs] def read_interventions(self, model_name):
"""Read interventions data for `model_name`
Returns
-------
dict[str, dict]
A dict of intervention dictionaries containing intervention
attributes keyed by intervention name
"""
model = self.read_model(model_name, skip_coords=True)
if model["interventions"] != []:
return self.data_store.read_interventions(model["interventions"])
else:
return {}
[docs] def write_interventions(self, model_name, interventions):
"""Write interventions data for a model
Parameters
----------
dict[str, dict]
A dict of intervention dictionaries containing intervention
attributes keyed by intervention name
"""
model = self.read_model(model_name)
model["interventions"] = [model_name + ".csv"]
self.update_model(model_name, model)
self.data_store.write_interventions(model["interventions"][0], interventions)
[docs] def write_interventions_file(self, model_name, string_id, interventions):
model = self.read_model(model_name)
if string_id in model["interventions"]:
self.data_store.write_interventions(string_id, interventions)
else:
raise SmifDataNotFoundError(
"Intervention {} not found for"
" sector model {}.".format(string_id, model_name)
)
[docs] def read_interventions_file(self, model_name, string_id, assert_exists=False):
model = self.read_model(model_name)
if string_id in model["interventions"]:
if assert_exists:
return self.data_store.interventions_data_exists(string_id)
else:
return self.data_store.read_interventions([string_id])
else:
raise SmifDataNotFoundError(
"Intervention {} not found for"
" sector model {}.".format(string_id, model_name)
)
[docs] def convert_interventions_data(self, sector_model_name, tgt_store, noclobber=False):
sector_model = self.read_model(sector_model_name)
for intervention in sector_model["interventions"]:
data_exists = tgt_store.read_interventions_file(
sector_model_name, intervention, assert_exists=True
)
if not (noclobber and data_exists):
interventions = self.read_interventions_file(
sector_model_name, intervention
)
tgt_store.write_interventions_file(
sector_model_name, intervention, interventions
)
[docs] def read_strategy_interventions(self, strategy, assert_exists=False):
"""Read interventions as defined in a model run strategy"""
if assert_exists:
return self.data_store.strategy_data_exists(strategy)
else:
return self.data_store.read_strategy_interventions(strategy)
[docs] def write_strategy_interventions(self, strategy, data):
"""
Parameters
----------
list[dicts]
"""
self.data_store.write_strategy_interventions(strategy, data)
[docs] def read_initial_conditions(self, model_name) -> List[Dict]:
"""Read historical interventions for `model_name`
Returns
-------
list[dict]
A list of historical interventions, with keys 'name' and 'build_year'
"""
model = self.read_model(model_name)
if model["initial_conditions"] != []:
return self.data_store.read_initial_conditions(model["initial_conditions"])
else:
return []
[docs] def write_initial_conditions(self, model_name, initial_conditions):
"""Write historical interventions for a model
Parameters
----------
list[dict]
A list of historical interventions, with keys 'name' and 'build_year'
"""
model = self.read_model(model_name)
model["initial_conditions"] = [model_name + ".csv"]
self.update_model(model_name, model)
self.data_store.write_initial_conditions(
model["initial_conditions"][0], initial_conditions
)
[docs] def write_initial_conditions_file(self, model_name, string_id, initial_conditions):
model = self.read_model(model_name)
if string_id in model["initial_conditions"]:
self.data_store.write_initial_conditions(string_id, initial_conditions)
else:
raise SmifDataNotFoundError(
"Initial condition {} not found for"
" sector model {}.".format(string_id, model_name)
)
[docs] def read_initial_conditions_file(self, model_name, string_id, assert_exists=False):
model = self.read_model(model_name)
if string_id in model["initial_conditions"]:
if assert_exists:
return self.data_store.initial_conditions_data_exists(string_id)
else:
return self.data_store.read_initial_conditions([string_id])
else:
raise SmifDataNotFoundError(
"Initial conditions {} not found for"
" sector model {}.".format(string_id, model_name)
)
[docs] def convert_initial_conditions_data(
self, sector_model_name, tgt_store, noclobber=False
):
sector_model = self.read_model(sector_model_name)
for initial_condition in sector_model["initial_conditions"]:
data_exists = tgt_store.read_initial_conditions_file(
sector_model_name, initial_condition, assert_exists=True
)
if not (noclobber and data_exists):
initial_conditions = self.read_initial_conditions_file(
sector_model_name, initial_condition
)
tgt_store.write_initial_conditions_file(
sector_model_name, initial_condition, initial_conditions
)
[docs] def read_all_initial_conditions(self, model_run_name) -> List[Dict]:
"""A list of all historical interventions
Returns
-------
list[dict]
"""
historical_interventions = [] # type: List
model_run = self.read_model_run(model_run_name)
sos_model_name = model_run["sos_model"]
sos_model = self.read_sos_model(sos_model_name)
sector_model_names = sos_model["sector_models"]
for sector_model_name in sector_model_names:
historical_interventions.extend(
self.read_initial_conditions(sector_model_name)
)
return historical_interventions
# endregion
# region State
[docs] def read_state(
self, model_run_name, timestep, decision_iteration=None
) -> List[Dict]:
"""Read list of (name, build_year) for a given model_run, timestep,
decision
Parameters
----------
model_run_name : str
timestep : int
decision_iteration : int, optional
Returns
-------
list[dict]
"""
return self.data_store.read_state(model_run_name, timestep, decision_iteration)
[docs] def write_state(self, state, model_run_name, timestep, decision_iteration=None):
"""State is a list of decisions with name and build_year.
State is output from the DecisionManager
Parameters
----------
state : list[dict]
model_run_name : str
timestep : int
decision_iteration : int, optional
"""
self.data_store.write_state(state, model_run_name, timestep, decision_iteration)
# endregion
# region Conversion coefficients
[docs] def read_coefficients(self, source_dim: str, destination_dim: str) -> np.ndarray:
"""Reads coefficients from the store
Coefficients are uniquely identified by their source/destination dimensions.
This method and `write_coefficients` implement caching of conversion
coefficients between dimensions.
Parameters
----------
source_dim : str
Dimension name
destination_dim : str
Dimension name
Returns
-------
numpy.ndarray
Notes
-----
To be called from :class:`~smif.convert.adaptor.Adaptor` implementations.
"""
return self.data_store.read_coefficients(source_dim, destination_dim)
[docs] def write_coefficients(
self, source_dim: str, destination_dim: str, data: np.ndarray
):
"""Writes coefficients to the store
Coefficients are uniquely identified by their source/destination dimensions.
This method and `read_coefficients` implement caching of conversion
coefficients between dimensions.
Parameters
----------
source_dim : str
Dimension name
destination_dim : str
Dimension name
data : numpy.ndarray
Notes
-----
To be called from :class:`~smif.convert.adaptor.Adaptor` implementations.
"""
self.data_store.write_coefficients(source_dim, destination_dim, data)
# endregion
# region Results
[docs] def read_results(
self,
model_run_name: str,
model_name: str,
output_spec: Spec,
timestep: Optional[int] = None,
decision_iteration: Optional[int] = None,
) -> DataArray:
"""Return results of a `model_name` in `model_run_name` for a given `output_name`
Parameters
----------
model_run_name : str
model_name : str
output_spec : smif.metadata.Spec
timestep : int, default=None
decision_iteration : int, default=None
Returns
-------
~smif.data_layer.data_array.DataArray
"""
return self.data_store.read_results(
model_run_name, model_name, output_spec, timestep, decision_iteration
)
[docs] def write_results(
self,
data_array,
model_run_name,
model_name,
timestep=None,
decision_iteration=None,
):
"""Write results of a `model_name` in `model_run_name` for a given `output_name`
Parameters
----------
data_array : ~smif.data_layer.data_array.DataArray
model_run_id : str
model_name : str
timestep : int, optional
decision_iteration : int, optional
"""
self.data_store.write_results(
data_array, model_run_name, model_name, timestep, decision_iteration
)
[docs] def delete_results(
self,
model_run_name,
model_name,
output_name,
timestep=None,
decision_iteration=None,
):
"""Delete results for a single timestep/iteration of a model output in a model run
Parameters
----------
model_run_name : str
model_name : str
output_name : str
timestep : int, default=None
decision_iteration : int, default=None
"""
self.data_store.delete_results(
model_run_name, model_name, output_name, timestep, decision_iteration
)
[docs] def clear_results(self, model_run_name):
"""Clear all results from a single model run
Parameters
----------
model_run_name : str
"""
available = self.available_results(model_run_name)
for timestep, decision_iteration, model_name, output_name in available:
self.data_store.delete_results(
model_run_name, model_name, output_name, timestep, decision_iteration
)
[docs] def available_results(self, model_run_name):
"""List available results from a model run
Parameters
----------
model_run_name : str
Returns
-------
list[tuple]
Each tuple is (timestep, decision_iteration, model_name, output_name)
"""
return self.data_store.available_results(model_run_name)
[docs] def completed_jobs(self, model_run_name):
"""List completed jobs from a model run
Parameters
----------
model_run_name : str
Returns
-------
list[tuple]
Each tuple is (timestep, decision_iteration, model_name)
"""
available_results = self.available_results(
model_run_name
) # {(t, d, model, output)}
model_outputs = self.expected_model_outputs(model_run_name) # [(model, output)]
completed_jobs = self.filter_complete_available_results(
available_results, model_outputs
)
return completed_jobs
[docs] @staticmethod
def filter_complete_available_results(available_results, expected_model_outputs):
"""Filter available results from a model run to include only complete timestep/decision
iteration combinations
Parameters
----------
available_results: list[tuple]
List of (timestep, decision_iteration, model_name, output_name)
expected_model_outputs: list[tuple]
List or set of (model_name, output_name)
Returns
-------
list[tuple]
Each tuple is (timestep, decision_iteration, model_name)
"""
expected_model_outputs = set(expected_model_outputs)
model_names = {model_name for model_name, _ in expected_model_outputs}
model_outputs_by_td = defaultdict(set)
for timestep, decision, model_name, output_name in available_results:
model_outputs_by_td[(timestep, decision)].add((model_name, output_name))
completed_jobs = []
for (timestep, decision), td_model_outputs in model_outputs_by_td.items():
if td_model_outputs == expected_model_outputs:
for model_name in model_names:
completed_jobs.append((timestep, decision, model_name))
return completed_jobs
[docs] def expected_model_outputs(self, model_run_name):
"""List expected model outputs from a model run
Parameters
----------
model_run_name : str
Returns
-------
list[tuple]
Each tuple is (model_name, output_name)
"""
model_run = self.read_model_run(model_run_name)
sos_model_name = model_run["sos_model"]
sos_config = self.read_sos_model(sos_model_name)
# For each model, get the outputs and create (model_name, output_name) tuples
expected_model_outputs = []
for model_name in sos_config["sector_models"]:
model_config = self.read_model(model_name)
for output in model_config["outputs"]:
expected_model_outputs.append((model_name, output["name"]))
return expected_model_outputs
[docs] def prepare_warm_start(self, model_run_name):
"""Copy the results from the previous model_run if available
The method allows a previous unsuccessful model_run to 'warm start' a new model run
from a later timestep. Model results are recovered from the timestep that the previous
model_run was run until, and the new model run runs from the returned timestep
Parameters
----------
model_run_name : str
Returns
-------
int The timestep to which the data store was recovered
Notes
-----
Called from smif.controller.execute
"""
available_results = self.available_results(model_run_name)
if available_results:
max_timestep = max(
timestep
for timestep, decision_iteration, model_name, output_name in available_results
)
# could explicitly clear results for max timestep
else:
max_timestep = None
return max_timestep
[docs] def canonical_available_results(self, model_run_name):
"""List the results that are available from a model run, collapsing all decision
iterations.
This is the unique items from calling `available_results`, with all decision iterations
set to 0.
This method is used to determine whether a model run is complete, given that it is
impossible to know how many decision iterations to expect: we simply check that each
expected timestep has been completed.
Parameters
----------
model_run_name : str
Returns
-------
set Set of tuples representing available results
"""
available_results = self.available_results(model_run_name)
canonical_list = []
for t, d, model_name, output_name in available_results:
canonical_list.append((t, 0, model_name, output_name))
# Return as a set to remove duplicates
return set(canonical_list)
[docs] def canonical_expected_results(self, model_run_name):
"""List the results that are expected from a model run, collapsing all decision
iterations.
For a complete model run, this would coincide with the unique list returned from
`available_results`, where all decision iterations are set to 0.
This method is used to determine whether a model run is complete, given that it is
impossible to know how many decision iterations to expect: we simply check that each
expected timestep has been completed.
Parameters
----------
model_run_name : str
Returns
-------
set Set of tuples representing expected results
"""
# Model results are returned as a tuple
# (timestep, decision_it, model_name, output_name)
# so we first build the full list of expected results tuples.
expected_results = []
# Get the sos model name given the model run name, and the full list of timesteps
model_run = self.read_model_run(model_run_name)
timesteps = sorted(model_run["timesteps"])
sos_model_name = model_run["sos_model"]
# Get the list of sector models in the sos model
sos_config = self.read_sos_model(sos_model_name)
# For each sector model, get the outputs and create the tuples
for model_name in sos_config["sector_models"]:
model_config = self.read_model(model_name)
outputs = model_config["outputs"]
for output, t in itertools.product(outputs, timesteps):
expected_results.append((t, 0, model_name, output["name"]))
# Return as a set to remove duplicates
return set(expected_results)
[docs] def canonical_missing_results(self, model_run_name):
"""List the results that are missing from a model run, collapsing all decision
iterations.
For a complete model run, this is what is left after removing
canonical_available_results from canonical_expected_results.
Parameters
----------
model_run_name : str
Returns
-------
set Set of tuples representing missing results
"""
return self.canonical_expected_results(
model_run_name
) - self.canonical_available_results(model_run_name)
def _get_result_darray_internal(
self, model_run_name, model_name, output_name, time_decision_tuples
):
"""Internal implementation for `get_result_darray`, after the unique list of
(timestep, decision) tuples has been generated and validated.
This method gets the spec for the output defined by the model_run_name, model_name
and output_name and expands the spec to include an additional dimension for the list of
tuples.
Then, for each tuple, the data array from the corresponding read_results call is
stacked, and together with the new spec this information is returned as a new
DataArray.
Parameters
----------
model_run_name : str
model_name : str
output_name : str
time_decision_tuples : list of unique (timestep, decision) tuples
Returns
-------
DataArray with expanded spec and data for each (timestep, decision) tuple
"""
# Get the output spec given the name of the sector model and output
output_spec = None
model = self.read_model(model_name)
for output in model["outputs"]:
# Ignore if the output name doesn't match
if output_name != output["name"]:
continue
output_spec = Spec.from_dict(output)
assert output_spec, "Output name was not found in model outputs"
# Read the results for each (timestep, decision) tuple and stack them
list_of_numpy_arrays = []
for t, d in time_decision_tuples:
d_array = self.read_results(model_run_name, model_name, output_spec, t, d)
list_of_numpy_arrays.append(d_array.data)
stacked_data = np.vstack(list_of_numpy_arrays)
# Add new dimensions to the data spec
output_dict = output_spec.as_dict()
output_dict["dims"] = ["timestep_decision"] + output_dict["dims"]
output_dict["coords"]["timestep_decision"] = time_decision_tuples
output_spec = Spec.from_dict(output_dict)
# Create a new DataArray from the modified spec and stacked data
return DataArray(output_spec, np.reshape(stacked_data, output_spec.shape))
[docs] def get_result_darray(
self,
model_run_name,
model_name,
output_name,
timesteps=None,
decision_iterations=None,
time_decision_tuples=None,
):
"""Return data for multiple timesteps and decision iterations for a given output from
a given sector model in a specific model run.
You can specify either:
a list of (timestep, decision) tuples
in which case data for all of those tuples matching the available results will
be returned
or:
a list of timesteps
in which case data for all of those timesteps (and any decision iterations)
matching the available results will be returned
or:
a list of decision iterations
in which case data for all of those decision iterations (and any timesteps)
matching the available results will be returned
or:
a list of timesteps and a list of decision iterations
in which case data for the Cartesian product of those timesteps and those
decision iterations matching the available results will be returned
or:
nothing
in which case all available results will be returned
Then, for each tuple, the data array from the corresponding read_results call is
stacked, and together with the new spec this information is returned as a new
DataArray.
Parameters
----------
model_run_name : str
model_name : str
output_name : str
timesteps : optional list of timesteps
decision_iterations : optional list of decision iterations
time_decision_tuples : optional list of unique (timestep, decision) tuples
Returns
-------
DataArray with expanded spec and the data requested
"""
available = self.available_results(model_run_name)
# Build up the necessary list of tuples
if not timesteps and not decision_iterations and not time_decision_tuples:
list_of_tuples = [
(t, d)
for t, d, m, out in available
if m == model_name and out == output_name
]
elif timesteps and not decision_iterations and not time_decision_tuples:
list_of_tuples = [
(t, d)
for t, d, m, out in available
if m == model_name and out == output_name and t in timesteps
]
elif decision_iterations and not timesteps and not time_decision_tuples:
list_of_tuples = [
(t, d)
for t, d, m, out in available
if m == model_name and out == output_name and d in decision_iterations
]
elif time_decision_tuples and not timesteps and not decision_iterations:
list_of_tuples = [
(t, d)
for t, d, m, out in available
if m == model_name
and out == output_name
and (t, d) in time_decision_tuples
]
elif timesteps and decision_iterations and not time_decision_tuples:
t_d = list(itertools.product(timesteps, decision_iterations))
list_of_tuples = [
(t, d)
for t, d, m, out in available
if m == model_name and out == output_name and (t, d) in t_d
]
else:
msg = (
"Expected either timesteps, or decisions, or (timestep, decision) "
+ "tuples, or timesteps and decisions, or none of the above."
)
raise ValueError(msg)
if not list_of_tuples:
raise SmifDataNotFoundError("None of the requested data is available.")
return self._get_result_darray_internal(
model_run_name, model_name, output_name, sorted(list_of_tuples)
)
[docs] def get_results(
self,
model_run_names: list,
model_name: str,
output_names: list,
timesteps: list = None,
decisions: list = None,
time_decision_tuples: list = None,
):
"""Return data for multiple timesteps and decision iterations for a given output from
a given sector model for multiple model runs.
Parameters
----------
model_run_names: list[str]
the requested model run names
model_name: str
the requested sector model name
output_names: list[str]
the requested output names (output specs must all match)
timesteps: list[int]
the requested timesteps
decisions: list[int]
the requested decision iterations
time_decision_tuples: list[tuple]
a list of requested (timestep, decision) tuples
Returns
-------
dict
Nested dictionary of DataArray objects, keyed on model run name and output name.
Returned DataArrays include one extra (timestep, decision_iteration) dimension.
"""
# List the available output names and verify requested outputs match
outputs = self.read_model(model_name)["outputs"]
available_outputs = [output["name"] for output in outputs]
for output_name in output_names:
assert (
output_name in available_outputs
), "{} is not an output of sector model {}.".format(output_name, model_name)
# The spec for each requested output must be the same. We check they have the same
# coordinates
coords = [
Spec.from_dict(output).coords
for output in outputs
if output["name"] in output_names
]
for coord in coords:
if coord != coords[0]:
raise ValueError("Different outputs must have the same coordinates")
# Now actually obtain the requested results
results_dict = OrderedDict() # type: OrderedDict
for model_run_name in model_run_names:
results_dict[model_run_name] = OrderedDict()
for output_name in output_names:
results_dict[model_run_name][output_name] = self.get_result_darray(
model_run_name,
model_name,
output_name,
timesteps,
decisions,
time_decision_tuples,
)
return results_dict
# endregion
# region data store utilities
def _key_from_data(self, path, *args):
"""Return path or generate a unique key for a given set of args"""
if isinstance(self.data_store, (CSVDataStore, ParquetDataStore)):
return path
else:
return tuple(args)
# endregion
def _pick_from_list(list_of_dicts, name):
for item in list_of_dicts:
if "name" in item and item["name"] == name:
return item
return None