Source code for smif.controller.job.serial_job_scheduler
"""Job Schedulers are used to run job graphs.
Runs a job graph by calling execute_model_step for each operation in order
"""
import itertools
import logging
import traceback
from collections import defaultdict
import networkx
from smif.controller.execute_step import execute_model_before_step, execute_model_step
from smif.model import ModelOperation
[docs]class SerialJobScheduler(object):
"""Run JobGraphs produced by a :class:`~smif.controller.modelrun.ModelRun`"""
def __init__(self, store=None):
self._status = defaultdict(lambda: "unstarted")
self._id_counter = itertools.count()
self.logger = logging.getLogger(__name__)
self.store = store
[docs] def add(self, job_graph, dry_run=False):
"""Add a JobGraph to the SerialJobScheduler and run directly
Arguments
---------
job_graph: :class:`networkx.graph`
dry_run: boolean, optional
If True, print job steps without running
"""
job_graph_id = self._next_id()
try:
self._run(job_graph, job_graph_id, dry_run)
except Exception as ex:
self._status[job_graph_id] = "failed"
traceback.print_exc()
return job_graph_id, ex
return job_graph_id, None
[docs] def kill(self, job_graph_id):
"""Kill a job_graph that is already running - not implemented
Parameters
----------
job_graph_id: int
"""
raise NotImplementedError
[docs] def get_status(self, job_graph_id):
"""Get job graph status
Parameters
----------
job_graph_id: int
Returns
-------
dict: A message containing the status
Notes
-----
Possible statuses:
unstarted:
Job graph has not yet started
running:
Job graph is running
done:
Job graph was completed succesfully
failed:
Job graph completed running with an exit code
"""
return {"status": self._status[job_graph_id]}
def _run(self, job_graph, job_graph_id, dry_run=False):
"""Run a job graph
- sort the jobs into a single list
- unpack model, data_handle and operation from each node
"""
try:
self.logger.profiling_start(
"SerialJobScheduler._run()", "graph_" + str(job_graph_id)
)
except AttributeError:
self.logger.info("START SerialJobScheduler._run():graph_%s", job_graph_id)
self._status[job_graph_id] = "running"
for job_node_id, job in self._get_run_order(job_graph):
self._run_job(job_node_id, job, dry_run)
self._status[job_graph_id] = "done"
try:
self.logger.profiling_stop(
"SerialJobScheduler._run()", "graph_" + str(job_graph_id)
)
except AttributeError:
self.logger.info("STOP SerialJobScheduler._run():graph_%s", job_graph_id)
def _run_job(self, job_node_id, job, dry_run=False):
self.logger.info("Job %s", job_node_id) # Call root logger to satisfy CLI test
try:
self.logger.profiling_start(
"SerialJobScheduler._run()", "job_" + job_node_id
)
except AttributeError:
self.logger.info("START SerialJobScheduler._run():job_%s", job_node_id)
if job["operation"] == ModelOperation.SIMULATE:
execute_model_step(
job["modelrun_name"],
job["model"].name,
job["current_timestep"],
job["decision_iteration"],
self.store,
dry_run,
)
elif job["operation"] == ModelOperation.BEFORE_MODEL_RUN:
execute_model_before_step(
job["modelrun_name"], job["model"].name, self.store, dry_run
)
else:
raise ValueError("Model operation not recognised", job)
try:
self.logger.profiling_stop(
"SerialJobScheduler._run()", "job_" + job_node_id
)
except AttributeError:
self.logger.info("STOP SerialJobScheduler._run():job_%s", job_node_id)
def _next_id(self):
return next(self._id_counter)
@staticmethod
def _get_run_order(graph):
"""Returns a list of jobs in a runnable order.
Returns
-------
list
A list of job nodes
"""
try:
# topological sort gives a single list from directed graph,
# ignoring opportunities to run independent models in parallel
run_order = networkx.topological_sort(graph)
# list of Models (typically ScenarioModel and SectorModel)
ordered_jobs = [(run, graph.nodes[run]) for run in run_order]
except networkx.NetworkXUnfeasible:
raise NotImplementedError("Job graphs must not contain cycles")
return ordered_jobs