Source code for smif.decision.decision

"""The decision module handles the three planning levels

Currently, only pre-specified planning is implemented.

The choices made in the three planning levels influence the set of interventions
and assets available within a model run.

The interventions available in a model run are stored in a dict keyed by name.

"""
__author__ = "Will Usher, Tom Russell"
__copyright__ = "Will Usher, Tom Russell"
__license__ = "mit"

import itertools
import os
from abc import ABCMeta, abstractmethod
from logging import getLogger
from types import MappingProxyType
from typing import Dict, List, Optional, Set, Tuple

from smif.data_layer.data_handle import ResultsHandle
from smif.data_layer.model_loader import ModelLoader
from smif.data_layer.store import Store
from smif.exception import SmifDataNotFoundError


[docs]class DecisionManager(object): """A DecisionManager is initialised with one or more model run strategies that refer to DecisionModules such as pre-specified planning, a rule-based models or multi-objective optimisation. These implementations influence the combination and ordering of decision iterations and model timesteps that need to be performed by the model runner. The DecisionManager presents a simple decision loop interface to the model runner, in the form of a generator which allows the model runner to iterate over the collection of independent simulations required at each step. The DecisionManager collates the output of the decision algorithms and writes the post-decision state through a ResultsHandle. This allows Models to access a given decision state (identified uniquely by timestep and decision iteration id). The :py:meth:`get_decisions` method passes a ResultsHandle down to a DecisionModule, allowing the DecisionModule to access model results from previous timesteps and decision iterations when making decisions Arguments --------- store: smif.data_layer.store.Store """ def __init__( self, store: Store, timesteps: List[int], modelrun_name: str, sos_model, decision: int = 0, ): self.logger = getLogger(__name__) self._store = store self._modelrun_name = modelrun_name self._sos_model = sos_model self._timesteps = timesteps self._decision_module = None self._register = {} # type: Dict for sector_model in sos_model.sector_models: self._register.update(self._store.read_interventions(sector_model.name)) self.planned_interventions = [] # type: List strategies = self._store.read_strategies(modelrun_name) self.logger.info("%s strategies found", len(strategies)) self.pre_spec_planning = self._set_up_pre_spec_planning( modelrun_name, strategies ) self._set_up_decision_modules(modelrun_name, strategies) def _set_up_pre_spec_planning(self, modelrun_name: str, strategies: List[Dict]): # Read in the historical interventions (initial conditions) directly initial_conditions = self._store.read_all_initial_conditions(modelrun_name) # Read in strategies planned_interventions = [] # type: List planned_interventions.extend(initial_conditions) for index, strategy in enumerate(strategies): # Extract pre-specified planning interventions if strategy["type"] == "pre-specified-planning": msg = "Adding %s planned interventions to pre-specified-planning %s" self.logger.info(msg, len(strategy["interventions"]), index) planned_interventions.extend(strategy["interventions"]) # Create a Pre-Specified planning decision module with all # the planned interventions self.planned_interventions = self._tuplize_state(planned_interventions) def _set_up_decision_modules(self, modelrun_name, strategies): strategy_types = [ x["type"] for x in strategies if x["type"] != "pre-specified-planning" ] if len(set(strategy_types)) > 1: msg = "Cannot use more the 2 type of strategy simultaneously" raise NotImplementedError(msg) for strategy in strategies: if strategy["type"] != "pre-specified-planning": loader = ModelLoader() # absolute path to be crystal clear for ModelLoader when loading python class strategy["path"] = os.path.normpath( os.path.join(self._store.model_base_folder, strategy["path"]) ) strategy["timesteps"] = self._timesteps # Pass a reference to the register of interventions strategy["register"] = MappingProxyType(self.available_interventions) strategy["name"] = strategy["classname"] + "_" + strategy["type"] self.logger.debug("Trying to load strategy: %s", strategy["name"]) decision_module = loader.load(strategy) self._decision_module = decision_module # type: DecisionModule @property def available_interventions(self) -> Dict[str, Dict]: """Returns a register of available interventions, i.e. those not planned""" planned_names = set(name for build_year, name in self.planned_interventions) edited_register = { name: self._register[name] for name in self._register.keys() - planned_names } return edited_register
[docs] def get_intervention(self, value): try: return self._register[value] except KeyError: msg = "" raise SmifDataNotFoundError(msg.format(value))
[docs] def decision_loop(self): """Generate bundles of simulation steps to run Each call to this method returns a dict: { 'decision_iterations': list of decision iterations (int), 'timesteps': list of timesteps (int), 'decision_links': (optional) dict of { decision iteration in current bundle: decision iteration of previous bundle } } A bundle is composed differently according to the implementation of the contained DecisionModule. For example: With only pre-specified planning, there is a single step in the loop, with a single decision iteration with timesteps covering the entire model horizon. With a rule based approach, there might be many steps in the loop, each with a single decision iteration and single timestep, moving on once some threshold is satisfied. With a genetic algorithm, there might be a configurable number of steps in the loop, each with multiple decision iterations (one for each member of the algorithm's population) and timesteps covering the entire model horizon. Implicitly, if the bundle returned in an iteration contains multiple decision iterations, they can be performed in parallel. If each decision iteration contains multiple timesteps, they can also be parallelised, so long as there are no temporal dependencies. Decision links are only required if the bundle timesteps do not start from the first timestep of the model horizon. """ self.logger.debug("Calling decision loop") if self._decision_module: while True: bundle = next(self._decision_module) if bundle is None: break self.logger.debug("Bundle returned: %s", bundle) self._get_and_save_bundle_decisions(bundle) yield bundle else: bundle = { "decision_iterations": [0], "timesteps": [x for x in self._timesteps], } self._get_and_save_bundle_decisions(bundle) yield bundle
def _get_and_save_bundle_decisions(self, bundle): """Iterate over bundle and write decisions Arguments --------- bundle : dict Returns ------- bundle : dict One definitive bundle across the decision modules """ for iteration, timestep in itertools.product( bundle["decision_iterations"], bundle["timesteps"] ): self.get_and_save_decisions(iteration, timestep)
[docs] def get_and_save_decisions(self, iteration, timestep): """Retrieves decisions for given timestep and decision iteration from each decision module and writes them to the store as state. Calls each contained DecisionModule for the given timestep and decision iteration in the `data_handle`, retrieving a list of decision dicts (keyed by intervention name and build year). These decisions are then written to a state file using the data store. Arguments --------- timestep : int iteration : int Notes ----- State contains all intervention names which are present in the system at the given ``timestep`` for the current ``iteration``. This must include planned interventions from a previous timestep that are still within their lifetime, and interventions picked by a decision module in the previous timesteps. After loading all historical interventions, and screening them to remove interventions from the previous timestep that have reached the end of their lifetime, new decisions are added to the list of current interventions. Finally, the new state is written to the store. """ results_handle = ResultsHandle( store=self._store, modelrun_name=self._modelrun_name, sos_model=self._sos_model, current_timestep=timestep, timesteps=self._timesteps, decision_iteration=iteration, ) # Decision module overrides pre-specified planning for obtaining state # from previous iteration pre_decision_state = set() if self._decision_module: previous_state = self._get_previous_state( self._decision_module, results_handle ) pre_decision_state.update(previous_state) msg = "Pre-decision state at timestep %s and iteration %s:\n%s" self.logger.debug(msg, timestep, iteration, pre_decision_state) new_decisions = set() if self._decision_module: decisions = self._get_decisions(self._decision_module, results_handle) new_decisions.update(decisions) self.logger.debug( "New decisions at timestep %s and iteration %s:\n%s", timestep, iteration, new_decisions, ) # Post decision state is the union of the pre decision state set, the # new decision set and the set of planned interventions post_decision_state = ( pre_decision_state | new_decisions | set(self.planned_interventions) ) post_decision_state = self.retire_interventions(post_decision_state, timestep) post_decision_state = self._untuplize_state(post_decision_state) self.logger.debug( "Post-decision state at timestep %s and iteration %s:\n%s", timestep, iteration, post_decision_state, ) self._store.write_state( post_decision_state, self._modelrun_name, timestep, iteration )
[docs] def retire_interventions( self, state: List[Tuple[int, str]], timestep: int ) -> List[Tuple[int, str]]: alive = [] for intervention in state: build_year = int(intervention[0]) data = self._register[intervention[1]] lifetime = data["technical_lifetime"]["value"] if self.buildable(build_year, timestep) and self.within_lifetime( build_year, timestep, lifetime ): alive.append(intervention) return alive
def _get_decisions( self, decision_module: "DecisionModule", results_handle: ResultsHandle ) -> List[Tuple[int, str]]: decisions = decision_module.get_decision(results_handle) return self._tuplize_state(decisions) def _get_previous_state( self, decision_module: "DecisionModule", results_handle: ResultsHandle ) -> List[Tuple[int, str]]: state_dict = decision_module.get_previous_state(results_handle) return self._tuplize_state(state_dict) @staticmethod def _tuplize_state(state: List[Dict]) -> List[Tuple[int, str]]: return [(x["build_year"], x["name"]) for x in state] @staticmethod def _untuplize_state(state: List[Tuple[int, str]]) -> List[Dict]: return [{"build_year": x[0], "name": x[1]} for x in state]
[docs] def buildable(self, build_year, timestep) -> bool: """Interventions are deemed available if build_year is less than next timestep For example, if `a` is built in 2011 and timesteps are [2005, 2010, 2015, 2020] then buildable returns True for timesteps 2010, 2015 and 2020 and False for 2005. Arguments --------- build_year: int The build year of the intervention timestep: int The current timestep """ if not isinstance(build_year, (int, float)): msg = "Build Year should be an integer but is a {}" raise TypeError(msg.format(type(build_year))) if timestep not in self._timesteps: raise ValueError("Timestep not in model timesteps") index = self._timesteps.index(timestep) if index == len(self._timesteps) - 1: next_year = timestep + 1 else: next_year = self._timesteps[index + 1] if int(build_year) < next_year: return True else: return False
[docs] @staticmethod def within_lifetime(build_year, timestep, lifetime) -> bool: """Interventions are deemed active if build_year + lifetime >= timestep Arguments --------- build_year : int timestep : int lifetime : int Returns ------- bool """ if not isinstance(build_year, (int, float)): msg = "Build Year should be an integer but is a {}" raise TypeError(msg.format(type(build_year))) try: build_year = int(build_year) except ValueError: raise ValueError( "A build year must be a valid integer. Received {}.".format(build_year) ) try: lifetime = int(lifetime) except ValueError: lifetime = float("inf") if lifetime < 0: msg = "The value of lifetime cannot be negative" raise ValueError(msg) if timestep <= build_year + lifetime: return True else: return False
[docs]class DecisionModule(metaclass=ABCMeta): """Abstract class which provides the interface to user defined decision modules. These mechanisms could include a Rule-based Approach or Multi-objective Optimisation. This class provides two main methods, ``__next__`` which is normally called implicitly as a call to the class as an iterator, and ``get_decision()`` which takes as arguments a smif.data_layer.data_handle.ResultsHandle object. Arguments --------- timesteps : list A list of planning timesteps register : dict Reference to a dict of iterventions """ """Current iteration of the decision module """ def __init__(self, timesteps: List[int], register: MappingProxyType): self.timesteps = timesteps self._register = register self.logger = getLogger(__name__) self._decisions = set() # type: Set def __next__(self) -> List[Dict]: return self._get_next_decision_iteration() @property def first_timestep(self) -> int: return min(self.timesteps) @property def last_timestep(self) -> int: return max(self.timesteps)
[docs] @abstractmethod def get_previous_state(self, results_handle: ResultsHandle) -> List[Dict]: """Return the state of the previous timestep""" raise NotImplementedError
[docs] def available_interventions(self, state: List[Dict]) -> List: """Return the collection of available interventions Available interventions are the subset of interventions that have not been implemented in a prior iteration or timestep Returns ------- List """ return [ name for name in self._register.keys() - set([x["name"] for x in state]) ]
[docs] def get_intervention(self, name): """Return an intervention dict Returns ------- dict """ try: return self._register[name] except KeyError: msg = ( "Intervention '{}' is not found in the list of available interventions" ) raise SmifDataNotFoundError(msg.format(name))
@abstractmethod def _get_next_decision_iteration(self) -> List[Dict]: """Implement to return the next decision iteration Within a list of decision-iteration/timestep pairs, the assumption is that all decision iterations can be run in parallel (otherwise only one will be returned) and within a decision interation, all timesteps may be run in parallel as long as there are no inter-timestep state dependencies Returns ------- dict Yields a dictionary keyed by ``decision iterations`` whose values contain a list of iteration integers, ``timesteps`` whose values contain a list of timesteps run in each decision iteration and the optional ``decision_links`` which link the decision interation of the current bundle to that of the previous bundle:: { 'decision_iterations': list of decision iterations (int), 'timesteps': list of timesteps (int), 'decision_links': (optional) dict of { decision iteration in current bundle: decision iteration of previous bundle } } """ raise NotImplementedError
[docs] @abstractmethod def get_decision(self, results_handle: ResultsHandle) -> List[Dict]: """Return decisions for a given timestep and decision iteration Parameters ---------- results_handle : smif.data_layer.data_handle.ResultsHandle Returns ------- list of dict Examples -------- >>> register = {'intervention_a': {'capital_cost': {'value': 1234}}} >>> dm = DecisionModule([2010, 2015], register) >>> dm.get_decision(results_handle) [{'name': 'intervention_a', 'build_year': 2010}]) """ raise NotImplementedError
[docs]class RuleBased(DecisionModule): """Rule-base decision modules""" def __init__(self, timesteps, register): super().__init__(timesteps, register) self.satisfied = False # type: bool self.current_timestep = self.first_timestep # type: int self.current_iteration = 0 # type: int # keep internal account of max iteration reached per timestep self._max_iteration_by_timestep = {self.first_timestep: 0} self.logger = getLogger(__name__)
[docs] def get_previous_iteration_timestep(self) -> Optional[Tuple[int, int]]: """Returns the timestep, iteration pair that describes the previous iteration Returns ------- tuple Contains (timestep, iteration) """ if self.current_iteration > 1: iteration = self.current_iteration - 1 if self.current_timestep == self.first_timestep: timestep = self.current_timestep elif iteration == self._max_iteration_by_timestep[self.previous_timestep]: timestep = self.previous_timestep elif iteration >= self._max_iteration_by_timestep[self.previous_timestep]: timestep = self.current_timestep else: return None return timestep, iteration
[docs] def get_previous_state(self, results_handle: ResultsHandle) -> List[Dict]: timestep_iteration = self.get_previous_iteration_timestep() if timestep_iteration: timestep, iteration = timestep_iteration return results_handle.get_state(timestep, iteration) else: return []
@property def next_timestep(self): index_current_timestep = self.timesteps.index(self.current_timestep) try: return self.timesteps[index_current_timestep + 1] except IndexError: return None @property def previous_timestep(self): index_current_timestep = self.timesteps.index(self.current_timestep) if index_current_timestep > 0: return self.timesteps[index_current_timestep - 1] else: return None def _get_next_decision_iteration(self): if self.satisfied and (self.current_timestep == self.last_timestep): return None elif self.satisfied and (self.current_timestep < self.last_timestep): self._max_iteration_by_timestep[ self.current_timestep ] = self.current_iteration self.satisfied = False self.current_timestep = self.next_timestep self.current_iteration += 1 return self._make_bundle() else: self.current_iteration += 1 return self._make_bundle()
[docs] def get_previous_year_iteration(self): iteration = self._max_iteration_by_timestep[self.previous_timestep] return iteration
def _make_bundle(self): bundle = { "decision_iterations": [self.current_iteration], "timesteps": [self.current_timestep], } if self.current_timestep > self.first_timestep: bundle["decision_links"] = { self.current_iteration: self._max_iteration_by_timestep[ self.previous_timestep ] } return bundle
[docs] def get_decision(self, results_handle) -> List[Dict]: return []