Source code for smif.controller.run.subprocess_run_scheduler
"""Schedulers are used to run models.
The defaults provided allow model runs to be scheduled as subprocesses,
or individual models to be called in series.
Future implementations may interface with common schedulers to queue
up models to run in parallel and/or distributed.
Calls smif run ... on the selected model in a sub process, where ...
are the options set in the app for info/debug messages, warm start
and output format.
"""
import subprocess
from collections import defaultdict
from datetime import datetime
[docs]class SubProcessRunScheduler(object):
"""The scheduler can run instances of smif as a subprocess
and can provide information whether the modelrun is running,
is done or has failed.
"""
def __init__(self):
self._status = defaultdict(lambda: "unstarted")
self._process = {}
self._output = defaultdict(str)
self._err = {}
self.lock = False
[docs] def add(self, model_run_name, args):
"""Add a model_run to the Modelrun scheduler.
Parameters
----------
model_run_name: str
Name of the modelrun
args: dict
Arguments for the command-line interface
Exception
---------
Exception
When the modelrun was already started
Notes
-----
There is no queuing mechanism implemented, each `add`
will directly start a subprocess. This means that it
is possible to run multiple modelruns concurrently.
This may cause conflicts, it depends on the
implementation whether a certain sector model / wrapper
touches the filesystem or other shared resources.
"""
if self._status[model_run_name] != "running":
self._output[model_run_name] = ""
self._status[model_run_name] = "queing"
smif_call = (
"smif run "
+ "-" * (int(args["verbosity"]) > 0)
+ "v" * int(args["verbosity"])
+ " "
+ model_run_name
+ " "
+ "-d"
+ " "
+ args["directory"]
+ " "
+ "-w" * args["warm_start"]
+ " " * args["warm_start"]
+ "-i"
+ " "
+ args["output_format"]
)
self._process[model_run_name] = subprocess.Popen(
smif_call, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
format_args = {
"model_run_name": model_run_name,
"datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"pid": str(self._process[model_run_name].pid),
"smif_call": smif_call,
"colour": "\x1b[1;34m",
"reset": "\x1b[0m",
"space": " \x1b",
}
format_str = """\
{colour}Modelrun{reset} {model_run_name}
{colour}Time{reset} {datetime}
{colour}PID{reset} {pid}
{colour}Command{reset} {smif_call}
"""
format_str.replace(" ", "{space}")
output = format_str.format(**format_args)
output += "-" * 100 + "\n"
self._output[model_run_name] = output
self._status[model_run_name] = "running"
else:
raise Exception("Model is already running.")
[docs] def kill(self, model_run_name):
"""Kill a Modelrun that is already running
Parameters
----------
model_run_name: str
Name of the modelrun
"""
if self._status[model_run_name] == "running":
self._process[model_run_name].kill()
self._status[model_run_name] = "stopped"
[docs] def get_scheduler_type(self):
return "default"
[docs] def get_status(self, model_run_name):
"""Get the status from the Modelrun scheduler.
Parameters
----------
model_run_name: str
Name of the modelrun
Returns
-------
dict: A message containing the status, command-line
output and error that can be directly sent back over
the http api.
Notes
-----
Possible status:
unstarted:
Model run was not started
queing:
Model run is waiting to be executed
running:
Model run is running
stopped:
Model run was stopped (killed) by user
done:
Model run was completed succesfully
failed:
Model run completed running with an exit code
"""
if self._status[model_run_name] == "running":
if self.lock is False:
self.lock = True
for line in iter(self._process[model_run_name].stdout.readline, b""):
self._output[model_run_name] += line.decode()
self._process[model_run_name].stdout.flush()
self.lock = False
if self._process[model_run_name].poll() == 0:
self._status[model_run_name] = "done"
elif self._process[model_run_name].poll() == 1:
self._status[model_run_name] = "failed"
return {
"status": self._status[model_run_name],
"output": self._output[model_run_name],
}