Source code for smif.controller.job.serial_job_scheduler
"""Job Schedulers are used to run job graphs.
Runs a job graph by calling execute_model_step for each operation in order
"""
import itertools
import logging
import traceback
from collections import defaultdict
import networkx
from smif.controller.execute_step import (execute_model_before_step,
execute_model_step)
from smif.model import ModelOperation
[docs]class SerialJobScheduler(object):
"""Run JobGraphs produced by a :class:`~smif.controller.modelrun.ModelRun`
"""
def __init__(self, store=None):
self._status = defaultdict(lambda: 'unstarted')
self._id_counter = itertools.count()
self.logger = logging.getLogger(__name__)
self.store = store
[docs] def add(self, job_graph, dry_run=False):
"""Add a JobGraph to the SerialJobScheduler and run directly
Arguments
---------
job_graph: :class:`networkx.graph`
dry_run: boolean, optional
If True, print job steps without running
"""
job_graph_id = self._next_id()
try:
self._run(job_graph, job_graph_id, dry_run)
except Exception as ex:
self._status[job_graph_id] = 'failed'
traceback.print_exc()
return job_graph_id, ex
return job_graph_id, None
[docs] def kill(self, job_graph_id):
"""Kill a job_graph that is already running - not implemented
Parameters
----------
job_graph_id: int
"""
raise NotImplementedError
[docs] def get_status(self, job_graph_id):
"""Get job graph status
Parameters
----------
job_graph_id: int
Returns
-------
dict: A message containing the status
Notes
-----
Possible statuses:
unstarted:
Job graph has not yet started
running:
Job graph is running
done:
Job graph was completed succesfully
failed:
Job graph completed running with an exit code
"""
return {'status': self._status[job_graph_id]}
def _run(self, job_graph, job_graph_id, dry_run=False):
"""Run a job graph
- sort the jobs into a single list
- unpack model, data_handle and operation from each node
"""
try:
self.logger.profiling_start(
'SerialJobScheduler._run()', 'graph_' + str(job_graph_id))
except AttributeError:
self.logger.info('START SerialJobScheduler._run():graph_%s', job_graph_id)
self._status[job_graph_id] = 'running'
for job_node_id, job in self._get_run_order(job_graph):
self._run_job(job_node_id, job, dry_run)
self._status[job_graph_id] = 'done'
try:
self.logger.profiling_stop(
'SerialJobScheduler._run()', 'graph_' + str(job_graph_id))
except AttributeError:
self.logger.info(
'STOP SerialJobScheduler._run():graph_%s', job_graph_id)
def _run_job(self, job_node_id, job, dry_run=False):
self.logger.info("Job %s", job_node_id) # Call root logger to satisfy CLI test
try:
self.logger.profiling_start('SerialJobScheduler._run()', 'job_' + job_node_id)
except AttributeError:
self.logger.info('START SerialJobScheduler._run():job_%s', job_node_id)
if job['operation'] == ModelOperation.SIMULATE:
execute_model_step(
job['modelrun_name'],
job['model'].name,
job['current_timestep'],
job['decision_iteration'],
self.store,
dry_run
)
elif job['operation'] == ModelOperation.BEFORE_MODEL_RUN:
execute_model_before_step(
job['modelrun_name'],
job['model'].name,
self.store,
dry_run
)
else:
raise ValueError("Model operation not recognised", job)
try:
self.logger.profiling_stop('SerialJobScheduler._run()', 'job_' + job_node_id)
except AttributeError:
self.logger.info('STOP SerialJobScheduler._run():job_%s', job_node_id)
def _next_id(self):
return next(self._id_counter)
@staticmethod
def _get_run_order(graph):
"""Returns a list of jobs in a runnable order.
Returns
-------
list
A list of job nodes
"""
try:
# topological sort gives a single list from directed graph,
# ignoring opportunities to run independent models in parallel
run_order = networkx.topological_sort(graph)
# list of Models (typically ScenarioModel and SectorModel)
ordered_jobs = [(run, graph.nodes[run]) for run in run_order]
except networkx.NetworkXUnfeasible:
raise NotImplementedError("Job graphs must not contain cycles")
return ordered_jobs