Source code for smif.data_layer

# -*- coding: utf-8 -*-
"""Data access modules for loading system-of-systems model configuration
"""
from abc import ABCMeta, abstractmethod
import csv
import os
import fiona
from smif.data_layer.load import load, dump
from csv import DictReader


[docs]class DataInterface(metaclass=ABCMeta):
[docs] @abstractmethod def read_sos_model_runs(self): raise NotImplementedError()
[docs] @abstractmethod def write_sos_model_run(self, sos_model_run): raise NotImplementedError()
[docs] @abstractmethod def update_sos_model_run(self, sos_model_run_name, sos_model_run): raise NotImplementedError()
[docs] @abstractmethod def read_sos_models(self): raise NotImplementedError()
[docs] @abstractmethod def write_sos_model(self, sos_model): raise NotImplementedError()
[docs] @abstractmethod def update_sos_model(self, sos_model_name, sos_model): raise NotImplementedError()
[docs] @abstractmethod def read_sector_models(self): raise NotImplementedError()
[docs] @abstractmethod def read_sector_model(self, sector_model_name): raise NotImplementedError()
[docs] @abstractmethod def write_sector_model(self, sector_model): raise NotImplementedError()
[docs] @abstractmethod def update_sector_model(self, sector_model_name, sector_model): raise NotImplementedError()
[docs] @abstractmethod def read_region_definitions(self): raise NotImplementedError()
[docs] @abstractmethod def read_region_definition_data(self, region_definition_name): raise NotImplementedError()
[docs] @abstractmethod def write_region_definition(self, region_definition): raise NotImplementedError()
[docs] @abstractmethod def update_region_definition(self, region_definition): raise NotImplementedError()
[docs] @abstractmethod def read_interval_definitions(self): raise NotImplementedError()
[docs] @abstractmethod def read_interval_definition_data(self, interval_definition_name): raise NotImplementedError()
[docs] @abstractmethod def write_interval_definition(self, interval_definition): raise NotImplementedError()
[docs] @abstractmethod def update_interval_definition(self, interval_definition): raise NotImplementedError()
[docs] @abstractmethod def read_scenario_sets(self): raise NotImplementedError()
[docs] @abstractmethod def read_scenario_set(self, scenario_set_name): raise NotImplementedError()
[docs] @abstractmethod def read_scenario_data(self, scenario_name): raise NotImplementedError()
[docs] @abstractmethod def write_scenario_set(self, scenario_set): raise NotImplementedError()
[docs] @abstractmethod def update_scenario_set(self, scenario_set): raise NotImplementedError()
[docs] @abstractmethod def write_scenario(self, scenario): raise NotImplementedError()
[docs] @abstractmethod def update_scenario(self, scenario): raise NotImplementedError()
[docs] @abstractmethod def read_narrative_sets(self): raise NotImplementedError()
[docs] @abstractmethod def read_narrative_set(self, narrative_set_name): raise NotImplementedError()
[docs] @abstractmethod def read_narrative_data(self, narrative_name): raise NotImplementedError()
[docs] @abstractmethod def write_narrative_set(self, narrative_set): raise NotImplementedError()
[docs] @abstractmethod def update_narrative_set(self, narrative_set): raise NotImplementedError()
[docs] @abstractmethod def write_narrative(self, narrative): raise NotImplementedError()
[docs] @abstractmethod def update_narrative(self, narrative): raise NotImplementedError()
[docs]class DatafileInterface(DataInterface): """Read and write interface to YAML / CSV configuration files Project.yml Arguments --------- base_folder: str The path to the configuration and data files """ def __init__(self, base_folder): self.base_folder = base_folder self.file_dir = {} self.file_dir['project'] = os.path.join(base_folder, 'config') config_folders = { 'sos_model_runs': 'config', 'sos_models': 'config', 'sector_models': 'config', 'initial_conditions': 'data', 'interval_definitions': 'data', 'interventions': 'data', 'narratives': 'data', 'region_definitions': 'data', 'scenarios': 'data' } for category, folder in config_folders.items(): self.file_dir[category] = os.path.join(base_folder, folder, category)
[docs] def read_sos_model_runs(self): """Read all system-of-system model runs from Yaml files sos_model_runs.yml Returns ------- list A list of sos_model_run dicts """ sos_model_runs = [] sos_model_run_names = self._read_filenames_in_dir(self.file_dir['sos_model_runs'], '.yml') for sos_model_run_name in sos_model_run_names: sos_model_runs.append(self._read_yaml_file(self.file_dir['sos_model_runs'], sos_model_run_name)) return sos_model_runs
[docs] def write_sos_model_run(self, sos_model_run): """Write system-of-system model run to Yaml file Arguments --------- sos_model_run: dict A sos_model_run dictionary """ self._write_yaml_file(self.file_dir['sos_model_runs'], sos_model_run['name'], sos_model_run)
[docs] def update_sos_model_run(self, sos_model_run_name, sos_model_run): """Update system-of-system model run in Yaml file Arguments --------- sos_model_run_name: str A sos_model_run name sos_model_run: dict A sos_model_run dictionary """ if sos_model_run_name != sos_model_run['name']: os.remove(os.path.join(self.file_dir['sos_model_runs'], sos_model_run_name + '.yml')) self.write_sos_model_run(sos_model_run)
[docs] def read_sos_models(self): """Read all system-of-system models from Yaml files Returns ------- list A list of sos_models dicts """ sos_models = [] sos_model_names = self._read_filenames_in_dir(self.file_dir['sos_models'], '.yml') for sos_model_name in sos_model_names: sos_models.append(self._read_yaml_file(self.file_dir['sos_models'], sos_model_name)) return sos_models
[docs] def read_sos_model(self, sos_model_name): """Read a specific system-of-system model Returns ------- dict A sos model configuration dictionary """ filename = sos_model_name return self._read_yaml_file(self.file_dir['sos_models'], filename)
[docs] def write_sos_model(self, sos_model): """Write system-of-system model to Yaml file Existing configuration will be overwritten without warning Arguments --------- sos_model: dict A sos_model dictionary """ self._write_yaml_file(self.file_dir['sos_models'], sos_model['name'], sos_model)
[docs] def update_sos_model(self, sos_model_name, sos_model): """Update system-of-system model in Yaml file Arguments --------- sos_model_name: str A sos_model name sos_model: dict A sos_model dictionary """ if sos_model_name != sos_model['name']: os.remove(os.path.join(self.file_dir['sos_models'], sos_model_name + '.yml')) self.write_sos_model(sos_model)
[docs] def read_sector_models(self): """Read all sector models from Yaml files Returns ------- list A list of sector_model dicts """ return self._read_filenames_in_dir(self.file_dir['sector_models'], '.yml')
[docs] def read_sector_model(self, sector_model_name): """Read a sector model from a Yaml file Raises an exception when the file does not exists Arguments --------- sector_model_name: str Name of the sector_model (sector_model['name']) """ sector_model_config = self._read_yaml_file( self.file_dir['sector_models'], sector_model_name) return sector_model_config
[docs] def write_sector_model(self, sector_model): """Write sector model to a Yaml file Existing configuration will be overwritten without warning Arguments --------- sector_model: dict A sector_model dictionary """ self._write_yaml_file(self.file_dir['sector_models'], sector_model['name'], sector_model)
[docs] def update_sector_model(self, sector_model_name, sector_model): """Update sector model in Yaml file Arguments --------- sector_model_name: str A sector_model name sector_model: dict A sector_model dictionary """ if sector_model_name != sector_model['name']: os.remove(os.path.join(self.file_dir['sector_models'], sector_model_name + '.yml')) self.write_sector_model(sector_model)
[docs] def read_interventions(self, filename): """Read the interventions from filename Arguments --------- filename: str The name of the intervention yml file to read in """ filepath = self.file_dir['interventions'] return self._read_yaml_file(filepath, filename, extension='')
[docs] def read_initial_conditions(self, filename): """Read the initial conditions from filename Arguments --------- filename: str The name of the initial conditions yml file to read in """ filepath = self.file_dir['initial_conditions'] return self._read_yaml_file(filepath, filename, extension='')
[docs] def read_region_definitions(self): """Read region_definitions from project configuration Returns ------- list A list of region_definition dicts """ project_config = self._read_project_config() return project_config['region_definitions']
[docs] def read_region_definition_data(self, region_definition_name): """Read region_definition data file into a Fiona feature collection The file format must be possible to parse with GDAL, and must contain an attribute "name" to use as an identifier for the region_definition. Arguments --------- region_definition_name: str Name of the region_definition Returns ------- list A list of data from the specified file in a fiona formatted dict """ # Find filename for this region_definition_name filename = '' for region_definition in self.read_region_definitions(): if region_definition['name'] == region_definition_name: filename = region_definition['filename'] break # Read the region data from file filepath = os.path.join(self.file_dir['region_definitions'], filename) with fiona.drivers(): with fiona.open(filepath) as src: data = [f for f in src] return data
[docs] def write_region_definition(self, region_definition): """Write region_definition to project configuration Arguments --------- region_definition: dict A region_definition dict """ project_config = self._read_project_config() project_config['region_definitions'].append(region_definition) self._write_project_config(project_config)
[docs] def update_region_definition(self, region_definition_name, region_definition): """Update region_definition to project configuration Arguments --------- region_definition_name: str Name of the (original) entry region_definition: dict The updated region_definition dict """ project_config = self._read_project_config() # Create updated list project_config['region_definitions'] = [ entry for entry in project_config['region_definitions'] if (entry['name'] != region_definition['name'] and entry['name'] != region_definition_name) ] project_config['region_definitions'].append(region_definition) self._write_project_config(project_config)
[docs] def read_interval_definitions(self): """Read interval_definition sets from project configuration Returns ------- list A list of interval_definition set dicts """ project_config = self._read_project_config() return project_config['interval_definitions']
[docs] def read_interval_definition_data(self, interval_definition_name): """ Arguments --------- interval_definition_name: str Returns ------- dict Interval definition data Notes ----- Expects csv file to contain headings of `year`, `start`, `end` """ interval_defs = self.read_interval_definitions() filename = None while not filename: for interval_def in interval_defs: if interval_def['name'] == interval_definition_name: filename = interval_def['filename'] filepath = os.path.join(self.file_dir['interval_definitions'], filename) with open(filepath, 'r') as csvfile: reader = DictReader(csvfile) data = [] for row in reader: data.append(row) return data
[docs] def write_interval_definition(self, interval_definition): """Write interval_definition to project configuration Arguments --------- interval_definition: dict A interval_definition dict """ project_config = self._read_project_config() project_config['interval_definitions'].append(interval_definition) self._write_project_config(project_config)
[docs] def update_interval_definition(self, interval_definition_name, interval_definition): """Update interval_definition to project configuration Arguments --------- interval_definition_name: str Name of the (original) entry interval_definition: dict The updated interval_definition dict """ project_config = self._read_project_config() # Create updated list project_config['interval_definitions'] = [ entry for entry in project_config['interval_definitions'] if (entry['name'] != interval_definition['name'] and entry['name'] != interval_definition_name) ] project_config['interval_definitions'].append(interval_definition) self._write_project_config(project_config)
[docs] def read_scenario_sets(self): """Read scenario sets from project configuration Returns ------- list A list of scenario set dicts """ project_config = self._read_project_config() return project_config['scenario_sets']
[docs] def read_scenario_set(self, scenario_set_name): """Read all scenarios from a certain scenario_set Arguments --------- scenario_set_name: str Name of the scenario_set Returns ------- list A list of scenarios within the specified 'scenario_set_name' """ project_config = self._read_project_config() # Filter only the scenarios of the selected scenario_set_name filtered_scenario_data = [] for scenario_data in project_config['scenarios']: if scenario_data['scenario_set'] == scenario_set_name: filtered_scenario_data.append(scenario_data) return filtered_scenario_data
[docs] def read_scenario_definition(self, scenario_name): """Read scenario definition data Arguments --------- scenario_name: str Name of the scenario Returns ------- dict The scenario definition """ project_config = self._read_project_config() for scenario_data in project_config['scenarios']: if scenario_data['name'] == scenario_name: return scenario_data
[docs] def read_scenario_data(self, scenario_name): """Read scenario data file Arguments --------- scenario_name: str Name of the scenario Returns ------- dict A dict of lists of dicts containing the contents of `scenario_name` data file(s) associated with the scenario parameters. The keys of the dict are the parameter names """ data = {} # Find filenames for this scenario filename = None project_config = self._read_project_config() for scenario_data in project_config['scenarios']: if scenario_data['name'] == scenario_name: for param in scenario_data['parameters']: filename = param['filename'] # Read the scenario data from file filepath = os.path.join(self.file_dir['scenarios'], filename) data[param['name']] = self._get_data_from_csv(filepath) return data
[docs] def write_scenario_set(self, scenario_set): """Write scenario_set to project configuration Arguments --------- scenario_set: dict A scenario_set dict """ project_config = self._read_project_config() project_config['scenario_sets'].append(scenario_set) self._write_project_config(project_config)
[docs] def update_scenario_set(self, scenario_set_name, scenario_set): """Update scenario_set to project configuration Arguments --------- scenario_set_name: str Name of the (original) entry scenario_set: dict The updated scenario_set dict """ project_config = self._read_project_config() # Create updated list project_config['scenario_sets'] = [ entry for entry in project_config['scenario_sets'] if (entry['name'] != scenario_set['name'] and entry['name'] != scenario_set_name) ] project_config['scenario_sets'].append(scenario_set) self._write_project_config(project_config)
[docs] def write_scenario(self, scenario): """Write scenario to project configuration Arguments --------- scenario: dict A scenario dict """ project_config = self._read_project_config() project_config['scenarios'].append(scenario) self._write_project_config(project_config)
[docs] def update_scenario(self, scenario_name, scenario): """Update scenario to project configuration Arguments --------- scenario_name: str Name of the (original) entry scenario: dict The updated scenario dict """ project_config = self._read_project_config() # Create updated list project_config['scenarios'] = [ entry for entry in project_config['scenarios'] if (entry['name'] != scenario['name'] and entry['name'] != scenario_name) ] project_config['scenarios'].append(scenario) self._write_project_config(project_config)
[docs] def read_narrative_sets(self): """Read narrative sets from project configuration Returns ------- list A list of narrative set dicts """ project_config = self._read_project_config() return project_config['narrative_sets']
[docs] def read_narrative_set(self, narrative_set_name): """Read all narratives from a certain narrative_set Arguments --------- narrative_set_name: str Name of the narrative_set Returns ------- list A list of narratives within the specified 'narrative_set_name' """ project_config = self._read_project_config() # Filter only the narratives of the selected narrative_set_name filtered_narrative_data = [] for narrative_data in project_config['narratives']: if narrative_data['narrative_set'] == narrative_set_name: filtered_narrative_data.append(narrative_data) return filtered_narrative_data
[docs] def read_narrative_data(self, narrative_name): """Read narrative data file Arguments --------- narrative_name: str Name of the narrative Returns ------- list A list with dictionaries containing the contents of 'narrative_name' data file """ # Find filename for this narrative filename = '' project_config = self._read_project_config() for narrative in project_config['narratives']: if narrative['name'] == narrative_name: filename = narrative['filename'] break # Read the narrative data from file return load(os.path.join(self.file_dir['narratives'], filename))
[docs] def read_narrative_definition(self, narrative_name): """Read the narrative definition Arguments --------- narrative_name: str Name of the narrative Returns ------- dict """ definition = None project_config = self._read_project_config() for narrative in project_config['narratives']: if narrative['name'] == narrative_name: definition = narrative return definition
[docs] def write_narrative_set(self, narrative_set): """Write narrative_set to project configuration Arguments --------- narrative_set: dict A narrative_set dict """ project_config = self._read_project_config() project_config['narrative_sets'].append(narrative_set) self._write_project_config(project_config)
[docs] def update_narrative_set(self, narrative_set_name, narrative_set): """Update narrative_set to project configuration Arguments --------- narrative_set_name: str Name of the (original) entry narrative_set: dict The updated narrative_set dict """ project_config = self._read_project_config() # Create updated list project_config['narrative_sets'] = [ entry for entry in project_config['narrative_sets'] if (entry['name'] != narrative_set['name'] and entry['name'] != narrative_set_name) ] project_config['narrative_sets'].append(narrative_set) self._write_project_config(project_config)
[docs] def write_narrative(self, narrative): """Write narrative to project configuration Arguments --------- narrative: dict A narrative dict """ project_config = self._read_project_config() project_config['narratives'].append(narrative) self._write_project_config(project_config)
[docs] def update_narrative(self, narrative_name, narrative): """Update narrative to project configuration Arguments --------- narrative_name: str Name of the (original) entry narrative: dict The updated narrative dict """ project_config = self._read_project_config() # Create updated list project_config['narratives'] = [ entry for entry in project_config['narratives'] if (entry['name'] != narrative['name'] and entry['name'] != narrative_name) ] project_config['narratives'].append(narrative) self._write_project_config(project_config)
def _read_project_config(self): """Read the project configuration Returns ------- dict The project configuration """ return self._read_yaml_file(self.file_dir['project'], 'project') def _write_project_config(self, data): """Write the project configuration Argument -------- data: dict The project configuration """ self._write_yaml_file(self.file_dir['project'], 'project', data) def _read_filenames_in_dir(self, path, extension): """Returns the name of the Yaml files in a certain directory Arguments --------- path: str Path to directory extension: str Extension of files (such as: '.yml' or '.csv') Returns ------- list The list of files in `path` with extension """ files = [] for filename in os.listdir(path): if filename.endswith(extension): files.append(os.path.splitext(filename)[0]) return files def _get_data_from_csv(self, filepath): scenario_data = [] with open(filepath, 'r') as csvfile: reader = csv.DictReader(csvfile) scenario_data = [] for row in reader: scenario_data.append(row) return scenario_data def _read_yaml_file(self, path, filename, extension='.yml'): """Read a Data dict from a Yaml file Arguments --------- path: str Path to directory name: str Name of file extension: str, default='.yml' The file extension Returns ------- dict The data of the Yaml file `filename` in `path` """ filename = filename + extension filepath = os.path.join(path, filename) return load(filepath) def _write_yaml_file(self, path, filename, data, extension='.yml'): """Write a data dict to a Yaml file Arguments --------- path: str Path to directory name: str Name of file data: dict Data to be written to the file extension: str, default='.yml' The file extension """ filename = filename + extension filepath = os.path.join(path, filename) dump(data, filepath)
[docs]class DatabaseInterface(DataInterface): """ Read and write interface to Database """ def __init__(self, config_path): raise NotImplementedError()
[docs] def read_sos_model_runs(self): raise NotImplementedError()
[docs] def write_sos_model_run(self, sos_model_run): raise NotImplementedError()
[docs] def read_sos_models(self): raise NotImplementedError()
[docs] def write_sos_model(self, sos_model): raise NotImplementedError()
[docs] def read_sector_models(self): raise NotImplementedError()
[docs] def read_sector_model(self, sector_model_name): raise NotImplementedError()
[docs] def write_sector_model(self, sector_model): raise NotImplementedError()
[docs] def read_units(self): raise NotImplementedError()
[docs] def write_unit(self, unit): raise NotImplementedError()
[docs] def read_regions(self): raise NotImplementedError()
[docs] def read_region_data(self, region_name): raise NotImplementedError()
[docs] def write_region(self, region): raise NotImplementedError()
[docs] def read_intervals(self): raise NotImplementedError()
[docs] def read_interval_data(self, interval_name): raise NotImplementedError()
[docs] def write_interval(self, interval): raise NotImplementedError()
[docs] def read_scenario_sets(self): raise NotImplementedError()
[docs] def read_scenario_set(self, scenario_set_name): raise NotImplementedError()
[docs] def read_scenario_data(self, scenario_name): raise NotImplementedError()
[docs] def write_scenario_set(self, scenario_set): raise NotImplementedError()
[docs] def write_scenario(self, scenario): raise NotImplementedError()
[docs] def read_narrative_sets(self): raise NotImplementedError()
[docs] def read_narrative_set(self, narrative_set_name): raise NotImplementedError()
[docs] def read_narrative_data(self, narrative_name): raise NotImplementedError()
[docs] def write_narrative_set(self, narrative_set): raise NotImplementedError()
[docs] def write_narrative(self, narrative): raise NotImplementedError()