Source code for smif.model.sos_model

# -*- coding: utf-8 -*-
"""This module coordinates the software components that make up the integration
framework.

A system of systems model contains simulation and scenario models,
and the dependencies between the models.

"""
import logging
from collections import defaultdict

import networkx
from smif.convert.area import get_register as get_region_register
from smif.convert.interval import get_register as get_interval_register
from smif.decision import Planning
from smif.intervention import InterventionRegister
from smif.model import CompositeModel, Model, element_after, element_before
from smif.model.model_set import ModelSet
from smif.model.scenario_model import ScenarioModel
from smif.model.sector_model import SectorModel

__author__ = "Will Usher, Tom Russell"
__copyright__ = "Will Usher, Tom Russell"
__license__ = "mit"


[docs]class SosModel(CompositeModel): """Consists of the collection of models joined via dependencies This class is populated at runtime by the :class:`SosModelBuilder` and called from :func:`smif.cli.run_model`. SosModel inherits from :class:`smif.composite.Model`. Arguments --------- name : str The unique name of the SosModel """ def __init__(self, name): # housekeeping super().__init__(name) self.logger = logging.getLogger(__name__) self.max_iterations = 25 self.convergence_relative_tolerance = 1e-05 self.convergence_absolute_tolerance = 1e-08 # models - includes types of SectorModel and ScenarioModel self.dependency_graph = networkx.DiGraph() # systems, interventions and (system) state self.timesteps = [] self.interventions = InterventionRegister() self.initial_conditions = [] self.planning = Planning([]) self._state = defaultdict(dict) # scenario data and results self._results = defaultdict(dict)
[docs] def as_dict(self): """Serialize the SosModel object Returns ------- dict """ dependencies = [] for model in self.models.values(): for name, dep in model.deps.items(): dep_config = {'source_model': dep.source_model.name, 'source_model_output': dep.source.name, 'sink_model': model.name, 'sink_model_input': name} dependencies.append(dep_config) config = { 'name': self.name, 'description': self.description, 'scenario_sets': [scenario.scenario_set for scenario in self.scenario_models.values()], 'sector_models': list(self.sector_models.keys()), 'dependencies': dependencies, 'max_iterations': self.max_iterations, 'convergence_absolute_tolerance': self.convergence_absolute_tolerance, 'convergence_relative_tolerance': self.convergence_relative_tolerance } return config
[docs] def add_model(self, model): """Adds a sector model to the system-of-systems model Arguments --------- model : :class:`smif.sector_model.SectorModel` A sector model wrapper """ assert isinstance(model, Model) self.logger.info("Loading model: %s", model.name) self.models[model.name] = model
@property def results(self): """Get nested dict of model results Returns ------- dict Nested dictionary in the format results[str:model][str:parameter] """ # convert from defaultdict to plain dict return dict(self._results)
[docs] def simulate(self, timestep, data=None): """Run the SosModel Returns ------- results : dict Nested dict keyed by model name, parameter name """ self.check_dependencies() run_order = self._get_model_sets_in_run_order() self.logger.info("Determined run order as %s", [x.name for x in run_order]) results = {} for model in run_order: # get data for model # TODO settle and test data dict structure/object between simple/composite models sim_data = {} for input_name, dep in model.deps.items(): input_ = model.model_inputs[input_name] if input_ in self.free_inputs: # pick external dependencies from data param_data = data[dep.source_model.name][dep.source.name] else: # pick internal dependencies from results param_data = results[dep.source_model.name][dep.source.name] param_data_converted = dep.convert(param_data, input_) sim_data[input_.name] = param_data_converted sim_data = self._get_parameter_values(model, sim_data, data) sim_results = model.simulate(timestep, sim_data) for model_name, model_results in sim_results.items(): results[model_name] = model_results return results
[docs] def check_dependencies(self): """For each contained model, compare dependency list against list of available models and build the dependency graph """ if self.free_inputs.names: msg = "A SosModel must have all inputs linked to dependencies." \ "Define dependencies for %s" raise NotImplementedError(msg, ", ".join(self.free_inputs.names)) for model in self.models.values(): if isinstance(model, SosModel): msg = "Nesting of SosModels not yet supported" raise NotImplementedError(msg) else: self.dependency_graph.add_node(model, name=model.name) for sink, dependency in model.deps.items(): provider = dependency.source_model msg = "Dependency '%s' provided by '%s'" self.logger.debug(msg, sink, provider.name) self.dependency_graph.add_edge(provider, model, {'source': dependency.source, 'sink': sink})
[docs] def get_decisions(self, model, timestep): """Gets the interventions that correspond to the decisions Parameters ---------- model: :class:`smif.sector_model.SectorModel` The instance of the sector model wrapper to run timestep: int The current model year TODO: Move into DecisionManager class """ self.logger.debug("Finding decisions for %i", timestep) current_decisions = [] for decision in self.planning.planned_interventions: if decision['build_date'] <= timestep: name = decision['name'] if name in model.intervention_names: msg = "Adding decision '%s' to instruction list" self.logger.debug(msg, name) intervention = self.interventions.get_intervention(name) current_decisions.append(intervention) # for decision in self.planning.get_rule_based_interventions(timestep): # current_decisions.append(intervention) # for decision in self.planning.get_optimised_interventions(timestep): # current_decisions.append(intervention) return current_decisions
[docs] def get_state(self, model, timestep): """Gets the state to pass to SectorModel.simulate """ if model.name not in self._state[timestep]: self.logger.warning("Found no state for %s in timestep %s", model.name, timestep) return [] return self._state[timestep][model.name]
[docs] def set_state(self, model, from_timestep, state): """Sets state output from model ready for next timestep """ for_timestep = self.timestep_after(from_timestep) self._state[for_timestep][model.name] = state
[docs] def set_data(self, model, timestep, results): """Sets results output from model as data available to other/future models Stores only latest estimated results (i.e. not holding on to iterations here while trying to solve interdependencies) """ self._results[timestep][model.name] = results
def _get_model_sets_in_run_order(self): """Returns a list of :class:`Model` in a runnable order. If a set contains more than one model, there is an interdependency and and we attempt to run the models to convergence. Returns ------- list A list of `smif.model.Model` objects """ if networkx.is_directed_acyclic_graph(self.dependency_graph): # topological sort gives a single list from directed graph, currently # ignoring opportunities to run independent models in parallel run_order = networkx.topological_sort(self.dependency_graph, reverse=False) # list of Models (typically ScenarioModel and SectorModel) ordered_sets = list(run_order) else: # contract the strongly connected components (subgraphs which # contain cycles) into single nodes, producing the 'condensation' # of the graph, where each node maps to one or more sector models condensation = networkx.condensation(self.dependency_graph) # topological sort of the condensation gives an ordering of the # contracted nodes, whose 'members' attribute refers back to the # original dependency graph ordered_sets = [] for node_id in networkx.topological_sort(condensation, reverse=False): models = condensation.node[node_id]['members'] if len(models) == 1: ordered_sets.append(models.pop()) else: ordered_sets.append(ModelSet( models, max_iterations=self.max_iterations, relative_tolerance=self.convergence_relative_tolerance, absolute_tolerance=self.convergence_absolute_tolerance)) return ordered_sets
[docs] def timestep_before(self, timestep): """Returns the timestep previous to a given timestep, or None Arguments --------- timestep : str Returns ------- str """ return element_before(timestep, self.timesteps)
[docs] def timestep_after(self, timestep): """Returns the timestep after a given timestep, or None Arguments --------- timestep : str Returns ------- str """ return element_after(timestep, self.timesteps)
@property def intervention_names(self): """Names (id-like keys) of all known asset type """ interventions = [] for model in self.sector_models.values(): interventions.extend(model.interventions) return [intervention.name for intervention in interventions] @property def sector_models(self): """Sector model objects contained in the SosModel Returns ======= dict A dict of sector model objects """ return {x: y for x, y in self.models.items() if isinstance(y, SectorModel)} @property def scenario_models(self): """Scenario model objects contained in the SosModel Returns ------- dict A dict of scenario model objects """ return {x: y for x, y in self.models.items() if isinstance(y, ScenarioModel)}
[docs]class SosModelBuilder(object): """Constructs a system-of-systems model Builds a :class:`SosModel`. Arguments --------- name: str, default='' The unique name of the SosModel Examples -------- Call :py:meth:`SosModelBuilder.construct` to populate a :py:class:`SosModel` object and :py:meth:`SosModelBuilder.finish` to return the validated and dependency-checked system-of-systems model. >>> builder = SosModelBuilder('test_model') >>> builder.construct(config_data, timesteps) >>> sos_model = builder.finish() """ def __init__(self, name='global'): self.sos_model = SosModel(name) self.region_register = get_region_register() self.interval_register = get_interval_register() self.logger = logging.getLogger(__name__)
[docs] def construct(self, sos_model_config): """Set up the whole SosModel Parameters ---------- sos_model_config : dict A valid system-of-systems model configuration dictionary """ self.sos_model.name = sos_model_config['name'] self.sos_model.description = sos_model_config['description'] self.set_max_iterations(sos_model_config) self.set_convergence_abs_tolerance(sos_model_config) self.set_convergence_rel_tolerance(sos_model_config) self.load_models(sos_model_config['sector_models']) self.load_scenario_models(sos_model_config['scenario_sets']) self.add_dependencies(sos_model_config['dependencies'])
[docs] def add_dependencies(self, dependency_list): """Add dependencies between models Arguments --------- dependency_list : list A list of dicts of dependency configuration data Examples -------- >>> dependencies = [{'source_model': 'raininess', 'source_model_output': 'raininess', 'sink_model': 'water_supply', 'sink_model_input': 'raininess'}] >>> builder.add_dependencies(dependencies) """ self.logger.debug("Available models: %s", self.sos_model.models.keys()) for dep in dependency_list: sink_model_object = self.sos_model.models[dep['sink_model']] source_model_object = self.sos_model.models[dep['source_model']] source_model_output = dep['source_model_output'] sink_model_input = dep['sink_model_input'] self.logger.debug("Adding dependency linking %s.%s to %s.%s", source_model_object.name, source_model_output, sink_model_object.name, sink_model_input) sink_model_object.add_dependency(source_model_object, source_model_output, sink_model_input)
[docs] def set_max_iterations(self, config_data): """Set the maximum iterations for iterating `class`::smif.ModelSet to convergence """ if 'max_iterations' in config_data and config_data['max_iterations'] is not None: self.sos_model.max_iterations = config_data['max_iterations']
[docs] def set_convergence_abs_tolerance(self, config_data): """Set the absolute tolerance for iterating `class`::smif.ModelSet to convergence """ if 'convergence_absolute_tolerance' in config_data and \ config_data['convergence_absolute_tolerance'] is not None: self.sos_model.convergence_absolute_tolerance = \ config_data['convergence_absolute_tolerance']
[docs] def set_convergence_rel_tolerance(self, config_data): """Set the relative tolerance for iterating `class`::smif.ModelSet to convergence """ if 'convergence_relative_tolerance' in config_data and \ config_data['convergence_relative_tolerance'] is not None: self.sos_model.convergence_relative_tolerance = \ config_data['convergence_relative_tolerance']
[docs] def load_models(self, model_list): """Loads the sector models into the system-of-systems model Parameters ---------- model_list : list A list of SectorModel objects """ self.logger.info("Loading models") for model in model_list: self.sos_model.add_model(model)
[docs] def load_scenario_models(self, scenario_list): """Loads the scenario models into the system-of-systems model Parameters ---------- scenario_list : list A list of ScenarioModel objects """ self.logger.info("Loading scenarios") for scenario in scenario_list: self.sos_model.add_model(scenario)
[docs] def finish(self): """Returns a configured system-of-systems model ready for operation Includes validation steps, e.g. to check dependencies """ return self.sos_model