Source code for smif.controller.modelrun

"""The Model Run collects scenarios, timesteps, narratives, and
model collection into a package which can be built and passed to
the ModelRunner to run.

The ModelRunner is responsible for running a ModelRun, including passing
in the correct data to the model between timesteps and calling to the
DecisionManager to obtain decisions.

ModeRun has attributes:
- id
- description
- sosmodel
- timesteps
- scenarios
- narratives
- strategy
- status

"""
from logging import getLogger

import networkx as nx
from smif.decision.decision import DecisionManager
from smif.exception import SmifModelRunError, SmifTimestepResolutionError
from smif.metadata import RelativeTimestep
from smif.model import ModelOperation, ScenarioModel


[docs] class ModelRun(object): """Collects timesteps, scenarios , narratives and a SosModel together Attributes ---------- name: str The unique name of the model run timestamp: :class:`datetime.datetime` An ISO8601 compatible timestamp of model run creation time description: str A friendly description of the model run sos_model: :class:`smif.model.sos_model.SosModel` The contained SosModel scenarios: dict For each scenario set, a mapping to a valid scenario within that set narratives: list A list of :class:`smif.parameters.Narrative` objects strategies: dict status: str logger: logging.Logger results: dict """ def __init__(self): self.name = "" self.timestamp = None self.description = "" self.sos_model = None self._model_horizon = [] self.scenarios = {} self.narratives = [] self.strategies = None self.status = "Empty" self.logger = getLogger(__name__) self.results = {}
[docs] def as_dict(self): """Serialises :class:`smif.controller.modelrun.ModelRun` Returns a dictionary definition of a ModelRun which is equivalent to that required by `from_dict` to construct a new model run Returns ------- dict """ config = { "name": self.name, "description": self.description, "stamp": self.timestamp, "timesteps": self._model_horizon, "sos_model": self.sos_model.name, "scenarios": self.scenarios, "narratives": self.narratives, "strategies": self.strategies, } return config
[docs] @classmethod def from_dict(cls, config): """Create a :class:`smif.controller.modelrun.ModelRun` from a dictionary""" model_run = cls() model_run.name = config["name"] model_run.description = config["description"] model_run.timestamp = config["stamp"] model_run.initialised = False model_run.model_horizon = config["timesteps"] model_run.sos_model = config["sos_model"] model_run.scenarios = config["scenarios"] model_run.narratives = config["narratives"] model_run.strategies = config["strategies"] model_run.status = "Built" model_run.validate() return model_run
[docs] def validate(self): """Validate that this ModelRun has been set up with sufficient data to run """ scenarios = set(self.scenarios) model_scenarios = set( scenario.name for scenario in self.sos_model.scenario_models ) missing_scenarios = scenarios - model_scenarios if missing_scenarios: raise SmifModelRunError( "ScenarioSets {} are selected in the ModelRun " "configuration but not found in the SosModel " "configuration".format(missing_scenarios) )
@property def model_horizon(self): """Returns the list of timesteps Returns ======= list A list of timesteps, distinct and sorted in ascending order """ return self._model_horizon.copy() @model_horizon.setter def model_horizon(self, value): self._model_horizon = sorted(list(set(value)))
[docs] def run(self, store, job_scheduler, warm_start_timestep=None, dry_run=False): """Builds all the objects and passes them to the ModelRunner The idea is that this will add ModelRuns to a queue for asychronous processing """ self.logger.debug("Running model run %s", self.name) try: self.logger.profiling_start("modelrun.run", self.name) except AttributeError: self.logger.info("START modelrun.run %s", self.name) if self.status == "Built": if not self.model_horizon: raise SmifModelRunError("No timesteps specified for model run") # Either avoid rework (if warm_start) or else make sure to clear stale results warm_start = warm_start_timestep is not None if warm_start: idx = self.model_horizon.index(warm_start_timestep) self.model_horizon = self.model_horizon[idx:] else: self.logger.debug("Clearing results for %s", self.name) store.clear_results(self.name) self.status = "Running" modelrunner = ModelRunner(warm_start) modelrunner.solve_model(self, job_scheduler, store, dry_run) self.status = "Successful" else: raise SmifModelRunError("Model is not yet built.") try: self.logger.profiling_stop("modelrun.run", self.name) except AttributeError: self.logger.info("STOP modelrun.run %s", self.name)
[docs] class ModelRunner(object): """The ModelRunner orchestrates the simulation of a SoSModel over decision iterations and timesteps as provided by a DecisionManager. """ def __init__(self, warm_start=False): self.logger = getLogger(__name__) self.warm_start = warm_start
[docs] def solve_model(self, model_run, job_scheduler, store, dry_run=False): """Solve a ModelRun This method steps through the model horizon, building a job graph and submitting this to the scheduler at each decision loop. Arguments --------- model_run : :class:`smif.controller.modelrun.ModelRun` store : :class:`smif.data_layer.Store` """ # Solve the model run: decision loop generates a series of bundles of independent # decision iterations, each with a number of timesteps to run self.logger.debug( "Solving the models over all timesteps: %s", model_run.model_horizon ) # Initialise the decision manager (and hence decision modules) self.logger.debug("Initialising the decision manager") decision_manager = DecisionManager( store, model_run.model_horizon, model_run.name, model_run.sos_model ) for bundle in decision_manager.decision_loop(): # each iteration is independent at this point, so the following loop is a # candidate for running in parallel job_graph = self.build_job_graph(model_run, bundle) if self.warm_start: # filter graph to exclude already-available results complete_jobs = store.completed_jobs(model_run.name) job_graph = self.filter_job_graph( model_run.name, job_graph, complete_jobs ) job_id, err = job_scheduler.add(job_graph, dry_run) self.logger.debug("Running job %s", job_id) if err is not None: status = job_scheduler.get_status(job_id) self.logger.debug("Job %s %s", job_id, status["status"]) raise err
[docs] def build_job_graph(self, model_run, bundle): """Build a job graph Build and return the job graph for an entire bundle, including before_model_run jobs when the models were not yet initialised. Constraints: - Bundle must have keys: 'decision_iterations' and 'timesteps' - Running a bundle runs each (decision iteration, timestep) pair specified by the combinations of decision iterations and timesteps - (decision iteration, timestep) pairs must be unique over an entire model run - In a single bundle, timesteps must be a consecutive subset of the model horizon timesteps The first timestep in each decision iteration of a bundle is either: - the first timestep in the model horizon and initialised from the model run starting point with scenario data and initial-timestep interventions only - or another timestep, picking up from where some previous (timestep, decision iteration) left off, which is explicitly included in the bundle. If a bundle's timesteps start from a timestep after the first one in the model horizon, the bundle must provide 'decision_links', and bundle must start from the very next timestep available in the model horizon. Jobs need to be able to identify a point to pick up from, namely the (timestep, decision iteration) which identifies the immediately preceding simulation state. E.g. request running first two timesteps:: { 'decision_iterations': [0, 1], 'timesteps': [0, 1] } Run first two timesteps again, with an updated decision:: { 'decision_iterations': [1, 2], 'timesteps': [0, 1] } Results meet decision requirements, so run next two timesteps, linking this bundle's decision iterations to previous decision iterations:: { 'decision_iterations': [3, 4], 'timesteps': [2, 3], 'decision_links': {3: 1, 4: 2} } Arguments --------- model_run: :class:`smif.controller.modelrun.ModelRun` bundle: :class:`dict` Returns ------- :class:`networkx.Graph` A populated job graph with edges showing dependencies between different operations and timesteps """ job_graph = nx.DiGraph() # Solve the model run: decision loop generates a series of bundles of independent # decision iterations, each with a number of timesteps to run for decision_iteration in bundle["decision_iterations"]: self.logger.info("Running decision iteration %s", decision_iteration) for timestep_index, timestep in enumerate(bundle["timesteps"]): self.logger.info("Running timestep %s", timestep) # one simulate job node per model job_graph.add_nodes_from( self._make_simulate_job_nodes( model_run.name, model_run.sos_model.sector_models, decision_iteration, timestep, model_run.model_horizon, ) ) # edges to match within-timestep dependencies job_graph.add_edges_from( self._make_current_simulate_job_edges( model_run.name, model_run.sos_model.model_dependencies, timestep, decision_iteration, ) ) # connect any between-timestep dependencies if timestep_index == 0: # first timestep in bundle try: # connect to outputs from a previous bundle relative = RelativeTimestep.PREVIOUS previous_timestep = relative.resolve_relative_to( timestep, model_run.model_horizon ) previous_decision_iteration = bundle["decision_links"][ decision_iteration ] job_graph.add_edges_from( self._make_between_bundle_previous_simulate_job_edges( model_run.name, model_run.sos_model.model_dependencies, timestep, previous_timestep, decision_iteration, previous_decision_iteration, ) ) except SmifTimestepResolutionError: # no previous timestep, use scenarios to provide initial intertimestep # dependenciess job_graph.add_edges_from( self._make_initial_previous_simulate_job_edges( model_run.name, model_run.sos_model.model_dependencies, timestep, decision_iteration, ) ) else: # subsequent timestep in a bundle - connect to previous timestep previous_timestep = bundle["timesteps"][timestep_index - 1] job_graph.add_edges_from( self._make_within_bundle_previous_simulate_job_edges( model_run.name, model_run.sos_model.model_dependencies, timestep, previous_timestep, decision_iteration, ) ) if not model_run.initialised: # one before_model_run job per model self.logger.info("Initialising each of the sector models") job_graph.add_nodes_from( self._make_before_model_run_job_nodes( model_run.name, model_run.sos_model.sector_models, model_run.model_horizon, ) ) # must run before any simulate jobs for decision_iteration in bundle["decision_iterations"]: for timestep in bundle["timesteps"]: job_graph.add_edges_from( self._make_before_model_run_job_edges( model_run.name, model_run.sos_model.sector_models, timestep, decision_iteration, ) ) model_run.initialised = True if not nx.is_directed_acyclic_graph(job_graph): raise NotImplementedError( "SosModel dependency graphs must not contain within-timestep cycles" ) return job_graph
[docs] @staticmethod def filter_job_graph(modelrun_name, job_graph, complete_jobs): filtered = job_graph.copy() for timestep, decision_iteration, model_name in complete_jobs: job_id = ModelRunner._make_job_id( modelrun_name, model_name, ModelOperation.SIMULATE, timestep, decision_iteration, ) if job_id in filtered.nodes: filtered.remove_node(job_id) return filtered
@staticmethod def _make_before_model_run_job_nodes(modelrun_name, models, horizon): return [ ( ModelRunner._make_job_id( modelrun_name, model.name, ModelOperation.BEFORE_MODEL_RUN ), { "model": model, "modelrun_name": modelrun_name, "current_timestep": None, "timesteps": horizon, "decision_iteration": None, "operation": ModelOperation.BEFORE_MODEL_RUN, }, ) for model in models ] @staticmethod def _make_before_model_run_job_edges( modelrun_name, models, timestep, decision_iteration ): edges = [] for model in models: from_id = ModelRunner._make_job_id( modelrun_name, model.name, ModelOperation.BEFORE_MODEL_RUN ) to_id = ModelRunner._make_job_id( modelrun_name, model.name, ModelOperation.SIMULATE, timestep, decision_iteration, ) edges.append((from_id, to_id)) return edges @staticmethod def _make_simulate_job_nodes( modelrun_name, models, decision_iteration, timestep, horizon ): return [ ( ModelRunner._make_job_id( modelrun_name, model.name, ModelOperation.SIMULATE, timestep, decision_iteration, ), { "model": model, "modelrun_name": modelrun_name, "current_timestep": timestep, "timesteps": horizon, "decision_iteration": decision_iteration, "operation": ModelOperation.SIMULATE, }, ) for model in models ] @staticmethod def _make_current_simulate_job_edges( modelrun_name, dependencies, timestep, decision_iteration ): edges = [] for dependency in dependencies: if dependency.timestep != RelativeTimestep.PREVIOUS: from_id = ModelRunner._make_job_id( modelrun_name, dependency.source_model.name, ModelOperation.SIMULATE, timestep, decision_iteration, ) to_id = ModelRunner._make_job_id( modelrun_name, dependency.sink_model.name, ModelOperation.SIMULATE, timestep, decision_iteration, ) edges.append((from_id, to_id)) return edges @staticmethod def _make_within_bundle_previous_simulate_job_edges( modelrun_name, dependencies, timestep, previous_timestep, decision_iteration ): edges = [] for dependency in dependencies: if dependency.timestep == RelativeTimestep.PREVIOUS: from_id = ModelRunner._make_job_id( modelrun_name, dependency.source_model.name, ModelOperation.SIMULATE, previous_timestep, decision_iteration, ) to_id = ModelRunner._make_job_id( modelrun_name, dependency.sink_model.name, ModelOperation.SIMULATE, timestep, decision_iteration, ) edges.append((from_id, to_id)) return edges @staticmethod def _make_between_bundle_previous_simulate_job_edges( modelrun_name, dependencies, timestep, previous_timestep, decision_iteration, previous_decision_iteration, ): edges = [] for dependency in dependencies: if dependency.timestep == RelativeTimestep.PREVIOUS: from_id = ModelRunner._make_job_id( modelrun_name, dependency.source_model.name, ModelOperation.SIMULATE, previous_timestep, previous_decision_iteration, ) to_id = ModelRunner._make_job_id( modelrun_name, dependency.sink_model.name, ModelOperation.SIMULATE, timestep, decision_iteration, ) edges.append((from_id, to_id)) return edges @staticmethod def _make_initial_previous_simulate_job_edges( modelrun_name, dependencies, timestep, decision_iteration ): edges = [] for dependency in dependencies: if isinstance(dependency.source_model, ScenarioModel): from_id = ModelRunner._make_job_id( modelrun_name, dependency.source_model.name, ModelOperation.SIMULATE, timestep, decision_iteration, ) to_id = ModelRunner._make_job_id( modelrun_name, dependency.sink_model.name, ModelOperation.SIMULATE, timestep, decision_iteration, ) edges.append((from_id, to_id)) return edges @staticmethod def _make_job_id( modelrun_name, model_name, operation, timestep=None, decision_iteration=None ): if operation == ModelOperation.BEFORE_MODEL_RUN: id_ = "%s_%s_%s" % (modelrun_name, operation.value, model_name) else: id_ = "%s_%s_%s_%s_%s" % ( modelrun_name, operation.value, timestep, decision_iteration, model_name, ) return id_