Source code for smif.controller.run.dafni_run_scheduler

"""Schedulers are used to run models.

The defaults provided allow model runs to be scheduled as subprocesses, or individual models to
be called in series.

Future implementations may interface with common schedulers to queue up models to run in
parallel and/or distributed.

The DAFNIRunScheduler currently only works while connected to the RAL vpn.

Posts all of the model run information to the DAFNI API so that a DAFNI worker can start the
model run.
"""
import json
import os
import time
from collections import defaultdict

import requests

from minio import Minio
from ruamel.yaml import YAML  # type: ignore

if "BACKEND_NISMOD_MINIO_SECRETS_FILE" in os.environ:
    MINIO_CREDENTIALS_FILE = os.environ["BACKEND_NISMOD_MINIO_SECRETS_FILE"]
else:
    MINIO_CREDENTIALS_FILE = ""
if "BACKEND_SECRET_KEY" in os.environ:
    SECRET_KEY = os.environ["BACKEND_SECRET_KEY"]
else:
    SECRET_KEY = ""
if "BACKEND_ACCESS_KEY" in os.environ:
    ACCESS_KEY = os.environ["BACKEND_ACCESS_KEY"]
else:
    ACCESS_KEY = ""
if "BACKEND_JOBSUBMISSION_API" in os.environ:
    JOBSUBMISSION_API_URL = os.environ["BACKEND_JOBSUBMISSION_API"]
else:
    JOBSUBMISSION_API_URL = ""
if "BACKEND_MINIO_IP" in os.environ:
    MINIO_IP = os.environ["BACKEND_MINIO_IP"]
else:
    MINIO_IP = ""

URL_AUTH = JOBSUBMISSION_API_URL + "auth/obtain_token/"
URL_JOBS = JOBSUBMISSION_API_URL + "nismod-model/jobs"


[docs]class DAFNIRunScheduler(object): """The scheduler can run instances of smif as a subprocess and can provide information whether the modelrun is running, is done or has failed. """ def __init__(self, username, password): self._status = defaultdict(lambda: "unstarted") self._process = {} self._output = defaultdict(str) self._err = {} self.jobId = 0 self.lock = False self.username = username self.password = password response = requests.post( URL_AUTH, json={"username": self.username, "password": self.password}, allow_redirects=False, ) response.raise_for_status() token = response.json()["token"] self.auth_header = json.loads('{ "Authorization": "JWT ' + token + '"}') response = requests.get(URL_JOBS, headers=self.auth_header) response.raise_for_status()
[docs] def add(self, model_run_name, args): """Add a model_run to the Modelrun scheduler. Parameters ---------- model_run_name: str Name of the modelrun args: dict Arguments for the command-line interface Exception --------- Exception When the modelrun was already started Notes ----- DAFNI's queuing mechanism starts model runs in separate container. This means that it is possible to run multiple modelruns concurrently. This will not cause conflicts. """ if self._status[model_run_name] != "running": self._output[model_run_name] = "" self._status[model_run_name] = "queing" yaml_files = self.get_yamls(model_run_name, args) model_run_id = model_run_name.replace("_", "-") minio_credentials = self.get_dict_from_json(MINIO_CREDENTIALS_FILE) minio_client = Minio( MINIO_IP, access_key=minio_credentials["accessKey"], secret_key=minio_credentials["secretKey"], secure=False, ) bucket_list = minio_client.list_buckets() for bucket in bucket_list: if bucket.name == model_run_id: for obj in minio_client.list_objects(model_run_id, recursive=True): minio_client.remove_object(model_run_id, obj.object_name) minio_client.remove_bucket(model_run_id) minio_client.make_bucket(model_run_id) for yml in yaml_files: local_path = args["directory"] + yml with open(local_path, "rb") as yml_data: yml_stat = os.stat(local_path) minio_client.put_object( model_run_id, yml[1:], yml_data, yml_stat.st_size ) response = requests.get(URL_JOBS, headers=self.auth_header) response.raise_for_status() for job in response.json(): if job["job"]["job_name"] == model_run_id: response = requests.delete( URL_JOBS + "/" + str(job["job"]["id"]), headers=self.auth_header ) response.raise_for_status() response = requests.post( URL_JOBS, json={ "job_name": model_run_id, "model_name": model_run_name, "minio_config_id": model_run_id, }, headers=self.auth_header, ) response.raise_for_status()
[docs] def get_scheduler_type(self): return "dafni"
[docs] def get_yamls(self, model_run_name, args): yaml_files = [] yaml_files.append("/config/model_runs/" + model_run_name + ".yml") f = open(args["directory"] + yaml_files[0], "r") doc = YAML(typ="safe").load(f.read()) yaml_files.append("/config/sos_models/" + doc["sos_model"] + ".yml") sos_f = open(args["directory"] + yaml_files[1]) sos_doc = YAML(typ="safe").load(sos_f.read()) for sector_model in sos_doc["sector_models"]: yaml_files.append("/config/sector_models/" + sector_model + ".yml") for scenario in sos_doc["scenarios"]: yaml_files.append("/config/scenarios/" + scenario + ".yml") return yaml_files
[docs] def get_dict_from_json(self, file_path): """ Given a JSON file, will return the dictionary of values within that file. Is developed to wait for a file to exist (for the case of reading vault secrets) to avoid the inherent race condition. """ count = 0 while True: try: with open(file_path) as file: data = json.load(file) return data except FileNotFoundError: count += 1 if count > 3: raise FileNotFoundError else: print( "{} does not exist yet. Waiting 5 seconds and trying again.\ Have tried {} times".format( file_path, count ) ) time.sleep(5)
[docs] def kill(self, model_run_name): if self._status[model_run_name] == "running": self._status[model_run_name] = "stopped" minio_credentials = self.get_dict_from_json(MINIO_CREDENTIALS_FILE) minio_client = Minio( MINIO_IP, access_key=minio_credentials["accessKey"], secret_key=minio_credentials["secretKey"], secure=False, ) model_run_id = model_run_name.replace("_", "-") yaml_files_minio = minio_client.list_objects(model_run_id, recursive=True) for d in yaml_files_minio: minio_client.remove_object(model_run_id, d.object_name) minio_client.remove_bucket(model_run_id) response = requests.get(URL_JOBS, headers=self.auth_header) response.raise_for_status() for job in response.json(): if job["job"]["job_name"] == model_run_id: requests.delete( URL_JOBS + "/" + str(job["job"]["id"]), headers=self.auth_header )
[docs] def get_status(self, model_run_name): response = requests.get(URL_JOBS, headers=self.auth_header) response.raise_for_status() model_run_id = model_run_name.replace("_", "-") if len(response.json()) > 0: for j in response.json(): if j["job"]["job_name"] == model_run_id: job = j["job"] status = job["status"] jobStatus = ["unstarted", "unstarted", "running", "done", "failed"] self._status[model_run_name] = jobStatus[status] break else: self._status[model_run_name] = "unstarted" return { "status": self._status[model_run_name], "output": self._output[model_run_name], }