"""File-backed data store
"""
import glob
import os
from abc import abstractmethod
from logging import getLogger
import numpy as np # type: ignore
import pandas # type: ignore
import pyarrow as pa # type: ignore
from smif.data_layer.abstract_data_store import DataStore
from smif.exception import SmifDataMismatchError, SmifDataNotFoundError
[docs]class FileDataStore(DataStore):
"""Abstract file data store"""
def __init__(self, base_folder):
super().__init__()
self.logger = getLogger(__name__)
# extension for DataArray/list-of-dict data - override in implementations
self.ext = ""
# extension for bare numpy.ndarray data - override in implementations
self.coef_ext = ""
self.base_folder = str(base_folder)
self.data_folder = str(os.path.join(self.base_folder, "data"))
self.data_folders = {}
self.results_folder = str(os.path.join(self.base_folder, "results"))
data_folders = [
"coefficients",
"strategies",
"initial_conditions",
"interventions",
"narratives",
"scenarios",
"strategies",
"parameters",
]
for folder in data_folders:
dirname = os.path.join(self.data_folder, folder)
# ensure each directory exists
if not os.path.exists(dirname):
msg = "Expected data folder at '{}' but it does not exist"
abs_path = os.path.abspath(dirname)
raise SmifDataNotFoundError(msg.format(abs_path))
self.data_folders[folder] = dirname
# region Abstract methods
@abstractmethod
def _read_data_array(self, path, spec, timestep=None):
"""Read DataArray from file"""
@abstractmethod
def _write_data_array(self, path, data_array, timestep=None):
"""Write DataArray to file"""
@abstractmethod
def _read_list_of_dicts(self, path):
"""Read file to list[dict]"""
@abstractmethod
def _write_list_of_dicts(self, path, data):
"""Write list[dict] to file"""
@abstractmethod
def _read_ndarray(self, path):
"""Read numpy.ndarray"""
@abstractmethod
def _write_ndarray(self, path, data, header=None):
"""Write numpy.ndarray"""
# endregion
# region Data Array
[docs] def read_scenario_variant_data(self, key, spec, timestep=None, timesteps=None):
path = os.path.join(
self.data_folders["scenarios"], "{}.{}".format(key, self.ext)
)
data = self._read_data_array(path, spec, timestep, timesteps)
try:
data.validate_as_full()
except SmifDataMismatchError as ex:
self.logger.warning(str(ex))
return data
[docs] def write_scenario_variant_data(self, key, data):
path = os.path.join(
self.data_folders["scenarios"], "{}.{}".format(key, self.ext)
)
self._write_data_array(path, data)
[docs] def scenario_variant_data_exists(self, key):
path = os.path.join(
self.data_folders["scenarios"], "{}.{}".format(key, self.ext)
)
return os.path.isfile(path)
[docs] def read_narrative_variant_data(self, key, spec, timestep=None):
path = os.path.join(
self.data_folders["narratives"], "{}.{}".format(key, self.ext)
)
return self._read_data_array(path, spec, timestep)
[docs] def write_narrative_variant_data(self, key, data):
path = os.path.join(
self.data_folders["narratives"], "{}.{}".format(key, self.ext)
)
self._write_data_array(path, data)
[docs] def narrative_variant_data_exists(self, key):
path = os.path.join(
self.data_folders["narratives"], key + ".{}".format(self.ext)
)
return os.path.isfile(path)
[docs] def read_model_parameter_default(self, key, spec):
path = os.path.join(
self.data_folders["parameters"], key + ".{}".format(self.ext)
)
data = self._read_data_array(path, spec)
data.validate_as_full()
return data
[docs] def write_model_parameter_default(self, key, data):
path = os.path.join(
self.data_folders["parameters"], key + ".{}".format(self.ext)
)
self._write_data_array(path, data)
[docs] def model_parameter_default_data_exists(self, key):
path = os.path.join(
self.data_folders["parameters"], key + ".{}".format(self.ext)
)
return os.path.isfile(path)
[docs] def get_timesteps_from_data(self, key, spec_dict):
path = path = os.path.join(
self.data_folders["scenarios"], key + ".{}".format(self.ext)
)
return self._get_timesteps_from_data(path, spec_dict).tolist()
# endregion
# region Interventions
[docs] def read_interventions(self, keys):
all_interventions = []
for key in keys:
path = os.path.join(
self.data_folders["interventions"], key + ".{}".format(self.ext)
)
interventions = self._read_list_of_dicts(path)
all_interventions.extend(interventions)
seen = set()
dups = set()
for intervention in all_interventions:
try:
name = intervention["name"]
except KeyError:
msg = "Could not find `name` key in {} for {}"
raise KeyError(msg.format(intervention, keys))
if name in seen:
dups.add(name)
else:
seen.add(name)
if dups:
name = dups.pop()
msg = "An entry for intervention {} already exists. Also found duplicates for {}"
raise ValueError(msg.format(name, dups))
return {
intervention["name"]: _nest_keys(intervention)
for intervention in all_interventions
}
[docs] def write_interventions(self, key, interventions):
# convert dict[str, dict] to list[dict]
data = [_unnest_keys(intervention) for intervention in interventions.values()]
path = os.path.join(
self.data_folders["interventions"], key + ".{}".format(self.ext)
)
self._write_list_of_dicts(path, data)
[docs] def interventions_data_exists(self, key):
path = os.path.join(
self.data_folders["interventions"], key + ".{}".format(self.ext)
)
return os.path.isfile(path)
[docs] def read_strategy_interventions(self, strategy):
path = os.path.join(
self.data_folders["strategies"],
strategy["filename"] + ".{}".format(self.ext),
)
return self._read_list_of_dicts(path)
[docs] def write_strategy_interventions(self, strategy, data):
path = os.path.join(
self.data_folders["strategies"],
strategy["filename"] + ".{}".format(self.ext),
)
return self._write_list_of_dicts(path, data)
[docs] def strategy_data_exists(self, strategy):
path = os.path.join(
self.data_folders["strategies"],
strategy["filename"] + ".{}".format(self.ext),
)
return os.path.isfile(path)
[docs] def read_initial_conditions(self, keys):
conditions = []
for key in keys:
path = os.path.join(
self.data_folder, "initial_conditions", key + ".{}".format(self.ext)
)
data = self._read_list_of_dicts(path)
conditions.extend(data)
return conditions
[docs] def write_initial_conditions(self, key, initial_conditions):
path = os.path.join(
self.data_folders["initial_conditions"], key + ".{}".format(self.ext)
)
self._write_list_of_dicts(path, initial_conditions)
[docs] def initial_conditions_data_exists(self, key):
path = os.path.join(
self.data_folders["initial_conditions"], key + ".{}".format(self.ext)
)
return os.path.isfile(path)
# endregion
# region State
[docs] def read_state(self, modelrun_name, timestep, decision_iteration=None):
path = self._get_state_path(modelrun_name, timestep, decision_iteration)
try:
state = self._read_list_of_dicts(path)
except FileNotFoundError:
msg = "Decision state file not found for timestep {}, decision {}"
raise SmifDataNotFoundError(msg.format(timestep, decision_iteration))
return state
[docs] def write_state(self, state, modelrun_name, timestep=None, decision_iteration=None):
path = self._get_state_path(modelrun_name, timestep, decision_iteration)
os.makedirs(os.path.dirname(path), exist_ok=True)
self._write_list_of_dicts(path, state)
def _get_state_path(self, modelrun_name, timestep=None, decision_iteration=None):
"""Compose a unique filename for state file:
state_{timestep|0000}[_decision_{iteration}].{ext}
"""
if timestep is None:
timestep = "0000"
if decision_iteration is None:
separator = ""
decision_iteration = ""
else:
separator = "_decision_"
filename = "state_{}{}{}.{}".format(
timestep, separator, decision_iteration, self.ext
)
path = os.path.join(self.results_folder, modelrun_name, filename)
return path
# endregion
# region Conversion coefficients
[docs] def read_coefficients(self, source_dim, destination_dim):
results_path = self._get_coefficients_path(source_dim, destination_dim)
try:
return self._read_ndarray(results_path)
except FileNotFoundError:
msg = "Could not find the coefficients file for %s to %s"
self.logger.warning(msg, source_dim, destination_dim)
raise SmifDataNotFoundError(msg.format(source_dim, destination_dim))
[docs] def write_coefficients(self, source_dim, destination_dim, data):
results_path = self._get_coefficients_path(source_dim, destination_dim)
header = "Conversion coefficients {}:{}".format(source_dim, destination_dim)
self._write_ndarray(results_path, data, header)
def _get_coefficients_path(self, source_dim, destination_dim):
path = os.path.join(
self.data_folders["coefficients"],
"{}.{}.{}".format(source_dim, destination_dim, self.coef_ext),
)
return path
# endregion
# region Results
[docs] def read_results(
self, modelrun_id, model_name, output_spec, timestep, decision_iteration=None
):
if timestep is None:
raise ValueError("You must pass a timestep argument")
results_path = self._get_results_path(
modelrun_id, model_name, output_spec.name, timestep, decision_iteration
)
try:
return self._read_data_array(results_path, output_spec)
except FileNotFoundError:
key = str(
[
modelrun_id,
model_name,
output_spec.name,
timestep,
decision_iteration,
]
)
raise SmifDataNotFoundError("Could not find results for {}".format(key))
[docs] def write_results(
self,
data_array,
modelrun_id,
model_name,
timestep=None,
decision_iteration=None,
):
if timestep is None:
raise NotImplementedError()
if timestep:
assert isinstance(timestep, int), "Timestep must be an integer"
if decision_iteration:
assert isinstance(
decision_iteration, int
), "Decision iteration must be an integer"
results_path = self._get_results_path(
modelrun_id, model_name, data_array.name, timestep, decision_iteration
)
os.makedirs(os.path.dirname(results_path), exist_ok=True)
self._write_data_array(results_path, data_array)
[docs] def delete_results(
self,
model_run_name,
model_name,
output_name,
timestep=None,
decision_iteration=None,
):
if timestep is None:
raise NotImplementedError()
if timestep:
assert isinstance(timestep, int), "Timestep must be an integer"
if decision_iteration:
assert isinstance(
decision_iteration, int
), "Decision iteration must be an integer"
results_path = self._get_results_path(
model_run_name, model_name, output_name, timestep, decision_iteration
)
try:
os.remove(results_path)
except OSError as ex:
self.logger.info(
"Ignored error deleting results {} - {}".format(
ex.filename, ex.strerror
)
)
[docs] def available_results(self, modelrun_name):
"""List available results for a given model run
See _get_results_path for path construction.
On the pattern of:
results/<modelrun_name>/<model_name>/
decision_<id>/
output_<output_name>_timestep_<timestep>.csv
"""
paths = glob.glob(
os.path.join(
self.results_folder, modelrun_name, "*", "*", "*.{}".format(self.ext)
)
)
# (timestep, decision_iteration, model_name, output_name)
results_keys = []
for path in paths:
(
timestep,
decision_iteration,
model_name,
output_name,
) = self._parse_results_path(path)
results_keys.append((timestep, decision_iteration, model_name, output_name))
return results_keys
def _get_results_path(
self, modelrun_id, model_name, output_name, timestep, decision_iteration=None
):
"""Return path to filename for a given output without file extension
On the pattern of:
results/<modelrun_name>/<model_name>/
decision_<id>/
output_<output_name>_timestep_<timestep>.csv
Parameters
----------
modelrun_id : str
model_name : str
output_name : str
timestep : str or int
decision_iteration : int, optional
Returns
-------
path : strs
"""
if decision_iteration is None:
decision_iteration = "none"
path = os.path.join(
self.results_folder,
modelrun_id,
model_name,
"decision_{}".format(decision_iteration),
"output_{}_timestep_{}.{}".format(output_name, timestep, self.ext),
)
return path
def _parse_results_path(self, path):
"""Return result metadata for a given result path
On the pattern of:
results/<modelrun_name>/<model_name>/
decision_<id>/
output_<output_name>_timestep_<timestep>.<ext>
Parameters
----------
path : str
Returns
-------
tuple : (timestep, decision_iteration, model_name, output_name)
"""
# split to last directories and filename
model_name, decision_str, output_str = path.split(os.sep)[-3:]
# trim "decision_"
decision_iteration = int(decision_str[9:])
# trim "output_" [...]
output_str_trimmed = output_str[7:]
# trim extension
output_str_trimmed = output_str_trimmed.replace(".{}".format(self.ext), "")
# pick (str) output and (integer) timestep
output_name, timestep_str = output_str_trimmed.split("_timestep_")
timestep = int(timestep_str)
return (timestep, decision_iteration, model_name, output_name)
# endregion
[docs]class CSVDataStore(FileDataStore):
"""CSV text file data store"""
def __init__(self, base_folder):
super().__init__(base_folder)
self.ext = "csv"
self.coef_ext = "txt.gz"
def _read_data_array(self, path, spec, timestep=None, timesteps=None):
"""Read DataArray from file"""
try:
dataframe = pandas.read_csv(path)
except FileNotFoundError as ex:
msg = "Could not find data for {} at {}"
raise SmifDataNotFoundError(msg.format(spec.name, path)) from ex
dataframe, spec = DataStore.filter_on_timesteps(
dataframe, spec, path, timestep, timesteps
)
data_array = DataStore.dataframe_to_data_array(dataframe, spec, path)
return data_array
def _write_data_array(self, path, data_array):
"""Write DataArray to file"""
dataframe = data_array.as_df()
dataframe.reset_index().to_csv(path, index=False)
def _read_list_of_dicts(self, path):
"""Read file to list[dict]"""
try:
data = pandas.read_csv(path).to_dict("records")
except pandas.errors.EmptyDataError:
data = []
return data
def _write_list_of_dicts(self, path, data):
"""Write list[dict] to file"""
pandas.DataFrame.from_records(data).to_csv(path, index=False)
def _read_ndarray(self, path):
"""Read numpy.ndarray"""
try:
return np.loadtxt(path)
except OSError:
raise FileNotFoundError(path)
def _write_ndarray(self, path, data, header=None):
"""Write numpy.ndarray"""
np.savetxt(path, data, header=header)
[docs]class ParquetDataStore(FileDataStore):
"""Binary file data store"""
def __init__(self, base_folder):
super().__init__(base_folder)
self.ext = "parquet"
self.coef_ext = "npy"
def _read_data_array(self, path, spec, timestep=None, timesteps=None):
"""Read DataArray from file"""
try:
dataframe = pandas.read_parquet(path, engine="pyarrow")
except (pa.lib.ArrowIOError, OSError) as ex:
msg = "Could not find data for {} at {}"
raise SmifDataNotFoundError(msg.format(spec.name, path)) from ex
dataframe, spec = DataStore.filter_on_timesteps(
dataframe, spec, path, timestep, timesteps
)
data_array = DataStore.dataframe_to_data_array(dataframe, spec, path)
return data_array
def _write_data_array(self, path, data_array):
"""Write DataArray to file"""
dataframe = data_array.as_df()
dataframe.to_parquet(path, engine="pyarrow", compression="gzip")
def _read_list_of_dicts(self, path):
"""Read file to list[dict]"""
try:
return pandas.read_parquet(path, engine="pyarrow").to_dict("records")
except pa.lib.ArrowIOError as ex:
msg = "Unable to read file at {}"
raise SmifDataNotFoundError(msg.format(path)) from ex
def _write_list_of_dicts(self, path, data):
"""Write list[dict] to file"""
if data:
pandas.DataFrame.from_records(data).to_parquet(path, engine="pyarrow")
else:
pandas.DataFrame(columns=["placeholder"]).to_parquet(path, engine="pyarrow")
def _read_ndarray(self, path):
"""Read numpy.ndarray"""
try:
return np.load(path)
except OSError:
raise FileNotFoundError(path)
def _write_ndarray(self, path, data, header=None):
"""Write numpy.ndarray"""
np.save(path, data)
def _nest_keys(intervention):
nested = {}
for key, value in intervention.items():
if key.endswith(("_value", "_unit")):
new_key, sub_key = key.rsplit(sep="_", maxsplit=1)
if new_key in nested:
if not isinstance(nested[new_key], dict):
msg = "Duplicate heading in csv data: {}"
raise ValueError(msg.format(new_key))
else:
nested[new_key].update({sub_key: value})
else:
nested[new_key] = {sub_key: value}
else:
if key in nested:
msg = "Duplicate heading in csv data: {}"
raise ValueError(msg.format(new_key))
else:
nested[key] = value
return nested
def _unnest_keys(intervention):
unnested = {}
for key, value in intervention.items():
if isinstance(value, dict):
for sub_key, sub_value in value.items():
unnested["{}_{}".format(key, sub_key)] = sub_value
else:
unnested[key] = value
return unnested