Source code for smif.cli

# -*- coding: utf-8 -*-
"""A command line interface to the system of systems framework

This command line interface implements a number of methods.

- `setup` creates an example project with the recommended folder structure
- `run` performs a simulation of an individual sector model, or the whole system
        of systems model
- `validate` performs a validation check of the configuration file
- `app` runs the graphical user interface, opening in a web browser

Folder structure
----------------

When configuring a project for the CLI, the folder structure below should be
used.  In this example, there is one system-of-systems model, combining a water
supply and an energy demand model::

    /config
        project.yaml
        /sector_models
            energy_demand.yml
            water_supply.yml
        /sos_models
            energy_water.yml
        /model_runs
            run_to_2050.yml
            short_test_run.yml
            ...
    /data
        /initial_conditions
            reservoirs.yml
        /interval_definitions
            annual_intervals.csv
        /interventions
            water_supply.yml
        /narratives
            high_tech_dsm.yml
        /region_definitions
            /oxfordshire
                regions.geojson
            /uk_nations_shp
                regions.shp
        /scenarios
            population.csv
            raininess.csv
        /water_supply
            /initial_system
    /models
        energy_demand.py
        water_supply.py
    /planning
        expected_to_2020.yaml
        national_infrastructure_pipeline.yml

The sector model implementations can be installed independently of the model run
configuration. The paths to python wrapper classes (implementing SectorModel)
should be specified in each ``sector_model/*.yml`` configuration.

The project.yaml file specifies the metadata shared by all elements of the
project; ``sos_models`` specify the combinations of ``sector_models`` and
``scenarios`` while individual ``model_runs`` specify the scenario, strategy
and narrative combinations to be used in each run of the models.

"""
from __future__ import print_function

import glob
import logging
import os
import sys
from argparse import ArgumentParser

import pkg_resources

import pandas
import smif
import smif.cli.log
from smif.controller import (
    copy_project_folder,
    execute_decision_step,
    execute_model_before_step,
    execute_model_run,
    execute_model_step,
)
from smif.controller.run import DAFNIRunScheduler, SubProcessRunScheduler
from smif.data_layer import Store
from smif.data_layer.file import (
    CSVDataStore,
    FileMetadataStore,
    ParquetDataStore,
    YamlConfigStore,
)
from smif.http_api import create_app

try:
    import _thread
except ImportError:
    import thread as _thread

try:
    import win32api

    USE_WIN32 = True
except ImportError:
    USE_WIN32 = False

__author__ = "Will Usher, Tom Russell"
__copyright__ = "Will Usher, Tom Russell"
__license__ = "mit"


[docs]def list_model_runs(args): """List the model runs defined in the config, optionally indicating whether complete results exist. """ store = _get_store(args) model_run_configs = store.read_model_runs() if args.complete: print("Model runs with an asterisk (*) have complete results available\n") for run in model_run_configs: run_name = run["name"] if args.complete: expected_results = store.canonical_expected_results(run_name) available_results = store.canonical_available_results(run_name) complete = " *" if expected_results == available_results else "" print("{}{}".format(run_name, complete)) else: print(run_name)
[docs]def list_available_results(args): """List the available results for a specified model run.""" store = _get_store(args) expected = store.canonical_expected_results(args.model_run) available = store.available_results(args.model_run) # Print run and sos model run = store.read_model_run(args.model_run) print("\nmodel run: {}".format(args.model_run)) print("{}- sos model: {}".format(" " * 2, run["sos_model"])) # List of expected sector models sec_models = sorted({sec for _t, _d, sec, _out in expected}) for sec_model in sec_models: print("{}- sector model: {}".format(" " * 4, sec_model)) # List expected outputs for this sector model outputs = sorted({out for _t, _d, sec, out in expected if sec == sec_model}) for output in outputs: print("{}- output: {}".format(" " * 6, output)) # List available decisions for this sector model and output decs = sorted( { d for _t, d, sec, out in available if sec == sec_model and out == output } ) if len(decs) == 0: print("{}- no results".format(" " * 8)) for dec in decs: base_str = "{}- decision {}:".format(" " * 8, dec) # List available time steps for this decision, sector model and output ts = sorted( { t for t, d, sec, out in available if d == dec and sec == sec_model and out == output } ) assert ( len(ts) > 0 ), "If a decision is available, so is at least one time step" res_str = ", ".join([str(t) for t in ts]) print("{} {}".format(base_str, res_str))
[docs]def list_missing_results(args): """List the missing results for a specified model run.""" store = _get_store(args) expected = store.canonical_expected_results(args.model_run) missing = store.canonical_missing_results(args.model_run) # Print run and sos model run = store.read_model_run(args.model_run) print("\nmodel run: {}".format(args.model_run)) print("{}- sos model: {}".format(" " * 2, run["sos_model"])) # List of expected sector models sec_models = sorted({sec for _t, _d, sec, _out in expected}) for sec_model in sec_models: print("{}- sector model: {}".format(" " * 4, sec_model)) # List expected outputs for this sector model outputs = sorted({out for _t, _d, sec, out in expected if sec == sec_model}) for output in outputs: print("{}- output: {}".format(" " * 6, output)) # List missing time steps for this sector model and output ts = sorted( {t for t, d, sec, out in missing if sec == sec_model and out == output} ) if len(ts) == 0: print("{}- no missing results".format(" " * 8)) else: base_str = "{}- results missing for:".format(" " * 8) res_str = ", ".join([str(t) for t in ts]) print("{} {}".format(base_str, res_str))
[docs]def prepare_convert(args): src_store = _get_store(args) if isinstance(src_store.data_store, CSVDataStore): tgt_store = Store( config_store=YamlConfigStore(args.directory), metadata_store=FileMetadataStore(args.directory), data_store=ParquetDataStore(args.directory), model_base_folder=(args.directory), ) else: tgt_store = Store( config_store=YamlConfigStore(args.directory), metadata_store=FileMetadataStore(args.directory), data_store=CSVDataStore(args.directory), model_base_folder=(args.directory), ) # Read model run model_run = src_store.read_model_run(args.model_run) # Read sos model for model run sos_model = src_store.read_sos_model(model_run["sos_model"]) # Now let us convert data # Convert strategies interventions for model run src_store.convert_strategies_data(model_run["name"], tgt_store, args.noclobber) # Convert scenario data for model run src_store.convert_scenario_data(model_run["name"], tgt_store) # Convert narrative data for sos model src_store.convert_narrative_data(sos_model["name"], tgt_store, args.noclobber) # Convert initial conditions, default parameter and interventions data # for sector models in sos model for sector_model_name in sos_model["sector_models"]: src_store.convert_model_parameter_default_data( sector_model_name, tgt_store, args.noclobber ) src_store.convert_interventions_data( sector_model_name, tgt_store, args.noclobber ) src_store.convert_initial_conditions_data( sector_model_name, tgt_store, args.noclobber )
[docs]def csv2parquet(args): """Convert CSV to Parquet - assuming the CSV can be parsed as a dataframe""" path = args.path if ".csv" in path: files = [path] else: files = glob.glob(os.path.join(path, "**", "*.csv"), recursive=True) for csv_path in files: parquet_path = csv_path.replace(".csv", ".parquet") if args.noclobber and os.path.exists(parquet_path): print("Skipping", csv_path) else: print("Converting", csv_path, flush=True) try: dataframe = pandas.read_csv(csv_path) dataframe.to_parquet(parquet_path, engine="pyarrow", compression="gzip") except UnicodeDecodeError: # guess that cp1252 is next most common encoding we'll come across dataframe = pandas.read_csv(csv_path, encoding="cp1252") dataframe.to_parquet(parquet_path, engine="pyarrow", compression="gzip") except pandas.errors.ParserError as ex: # nothing we can do with ParserError - usually a data problem print(ex) continue
[docs]def prepare_scenario(args): """Update scenario configuration file to include multiple scenario variants. The initial scenario configuration file is overwritten. """ # Read template scenario using the Store class store = _get_store(args) list_of_variants = range(args.variants_range[0], args.variants_range[1] + 1) store.prepare_scenario(args.scenario_name, list_of_variants)
[docs]def prepare_model_runs(args): """Generate multiple model runs according to a model run file referencing a scenario with multiple variants. """ # Read model run and scenario using the Store class store = _get_store(args) nb_variants = len(store.read_scenario_variants(args.scenario_name)) # Define default lower and upper of variant range var_start = 0 var_end = nb_variants # Check if optional cli arguments specify range of variants # They are compared to None because they can be 0 if args.start is not None: var_start = args.start if var_start < 0: raise ValueError("Lower bound of variant range must be >=0") if var_start > nb_variants: raise ValueError( "Lower bound of variant range greater" " than number of variants" ) if args.end is not None: var_end = args.end if var_end < 0: raise ValueError("Upper bound of variant range must be >=0") if var_end > nb_variants - 1: raise ValueError( "Upper bound of variant range cannot be greater" " than {:d}".format(nb_variants - 1) ) if var_end < var_start: raise ValueError( "Upper bound of variant range must be >= lower" " bound of variant range" ) store.prepare_model_runs( args.model_run_name, args.scenario_name, var_start, var_end )
[docs]def before_step(args): """Prepare a single model to run (call once before calling `smif step`) Parameters ---------- args """ store = _get_store(args) execute_model_before_step(args.modelrun, args.model, store)
[docs]def step(args): """Run a single model for a single timestep Parameters ---------- args """ store = _get_store(args) execute_model_step(args.modelrun, args.model, args.timestep, args.decision, store)
[docs]def decide(args): """Run a decision step for a model run Parameters ---------- args """ store = _get_store(args) execute_decision_step(args.modelrun, args.decision, store)
[docs]def run(args): """Run the model runs as requested. Check if results exist and asks user for permission to overwrite Parameters ---------- args """ logger = logging.getLogger(__name__) msg = "{:s}, {:s}, {:s}".format(args.modelrun, args.interface, args.directory) try: logger.profiling_start("run_model_runs", msg) except AttributeError: logger.info("START run_model_runs %s", msg) if args.batchfile: with open(args.modelrun, "r") as f: model_run_ids = f.read().splitlines() else: model_run_ids = [args.modelrun] store = _get_store(args) execute_model_run(model_run_ids, store, args.warm, args.dry_run) try: logger.profiling_stop("run_model_runs", msg) if not args.dry_run: logger.summary() except AttributeError: logger.info("STOP run_model_runs %s", msg)
def _get_store(args): """Contruct store as configured by arguments""" if args.interface == "local_csv": store = Store( config_store=YamlConfigStore(args.directory), metadata_store=FileMetadataStore(args.directory), data_store=CSVDataStore(args.directory), model_base_folder=args.directory, ) elif args.interface == "local_binary": store = Store( config_store=YamlConfigStore(args.directory), metadata_store=FileMetadataStore(args.directory), data_store=ParquetDataStore(args.directory), model_base_folder=args.directory, ) else: raise ValueError( "Store interface type {} not recognised.".format(args.interface) ) return store def _run_server(args): app_folder = pkg_resources.resource_filename("smif", "app/dist") if args.scheduler == "dafni" and args.interface != "local_csv": msg = "Scheduler implementation {0}, is not valid when combined with {1}." raise ValueError(msg.format(args.scheduler, args.interface)) if args.scheduler == "default": model_scheduler = SubProcessRunScheduler() elif args.scheduler == "dafni": model_scheduler = DAFNIRunScheduler(args.username, args.password) else: raise ValueError( "Scheduler implentation {} not recognised.".format(args.scheduler) ) app = create_app( static_folder=app_folder, template_folder=app_folder, data_interface=_get_store(args), scheduler=model_scheduler, ) print(" Opening smif app\n") print(" Copy/paste this URL into your web browser to connect:") print(" http://localhost:" + str(args.port) + "\n") # add flush to ensure that text is printed before server thread starts print(" Close your browser then type Control-C here to quit.", flush=True) app.run(host="0.0.0.0", port=args.port, threaded=True)
[docs]def run_app(args): """Run the client/server application Parameters ---------- args """ # avoid one of two error messages from 'forrtl error(200)' when running # on windows cmd - seems related to scipy's underlying Fortran os.environ["FOR_DISABLE_CONSOLE_CTRL_HANDLER"] = "T" if USE_WIN32: # Set handler for CTRL-C. Necessary to avoid `forrtl: error (200): # program aborting...` crash on CTRL-C if we're runnging from Windows # cmd.exe def handler(dw_ctrl_type, hook_sigint=_thread.interrupt_main): """Handler for CTRL-C interrupt""" if dw_ctrl_type == 0: # CTRL-C hook_sigint() return 1 # don't chain to the next handler return 0 # chain to the next handler win32api.SetConsoleCtrlHandler(handler, 1) # Create backend server process _run_server(args)
[docs]def setup_project_folder(args): """Setup a sample project""" copy_project_folder(args.directory)
[docs]def parse_arguments(): """Parse command line arguments Returns ======= :class:`argparse.ArgumentParser` """ parser = ArgumentParser(description="Command line tools for smif") parser.add_argument( "-V", "--version", action="version", version="smif " + smif.__version__, help="show the current version of smif", ) parent_parser = ArgumentParser(add_help=False) parent_parser.add_argument( "-v", "--verbose", action="count", help="show messages: -v to see messages reporting on " + "progress, -vv to see debug messages.", ) parent_parser.add_argument( "-i", "--interface", default="local_csv", choices=["local_csv", "local_binary"], help="Select the data interface (default: %(default)s)", ) parent_parser.add_argument( "-d", "--directory", default=".", help="Path to the project directory" ) subparsers = parser.add_subparsers(help="available commands") # SETUP parser_setup = subparsers.add_parser( "setup", help="Setup the project folder", parents=[parent_parser] ) parser_setup.set_defaults(func=setup_project_folder) # LIST parser_list = subparsers.add_parser( "list", help="List available model runs", parents=[parent_parser] ) parser_list.set_defaults(func=list_model_runs) parser_list.add_argument( "-c", "--complete", help="Show which model runs have complete results", action="store_true", ) # RESULTS parser_available_results = subparsers.add_parser( "available_results", help="List available results", parents=[parent_parser] ) parser_available_results.set_defaults(func=list_available_results) parser_available_results.add_argument( "model_run", help="Name of the model run to list available results" ) parser_missing_results = subparsers.add_parser( "missing_results", help="List missing results", parents=[parent_parser] ) parser_missing_results.set_defaults(func=list_missing_results) parser_missing_results.add_argument( "model_run", help="Name of the model run to list missing results" ) # PREPARE parser_convert = subparsers.add_parser( "prepare-convert", help="Convert data from one format to another", parents=[parent_parser], ) parser_convert.set_defaults(func=prepare_convert) parser_convert.add_argument("model_run", help="Name of the model run") parser_convert.add_argument( "-nc", "--noclobber", help="Do not convert existing data files", action="store_true", ) parser_prepare_scenario = subparsers.add_parser( "prepare-scenario", help="Prepare scenario configuration file with multiple variants", parents=[parent_parser], ) parser_prepare_scenario.set_defaults(func=prepare_scenario) parser_prepare_scenario.add_argument("scenario_name", help="Name of the scenario") parser_prepare_scenario.add_argument( "variants_range", nargs=2, type=int, help="Two integers delimiting the range of variants", ) parser_prepare_model_runs = subparsers.add_parser( "prepare-run", help="Prepare model runs based on scenario variants", parents=[parent_parser], ) parser_prepare_model_runs.set_defaults(func=prepare_model_runs) parser_prepare_model_runs.add_argument("scenario_name", help="Name of the scenario") parser_prepare_model_runs.add_argument( "model_run_name", help="Name of the template model run" ) parser_prepare_model_runs.add_argument( "-s", "--start", type=int, help="Lower bound of the range of variants" ) parser_prepare_model_runs.add_argument( "-e", "--end", type=int, help="Upper bound of the range of variants" ) # CONVERT parser_convert_format = subparsers.add_parser( "csv2parquet", help="Convert CSV to Parquet. Pass a filename or a directory to " + "search recurisvely", parents=[parent_parser], ) parser_convert_format.set_defaults(func=csv2parquet) parser_convert_format.add_argument("path", help="Path to file") parser_convert_format.add_argument( "-nc", "--noclobber", help="Skip converting data files which already exist as parquet", action="store_true", ) # APP parser_app = subparsers.add_parser( "app", help="Open smif app", parents=[parent_parser] ) parser_app.set_defaults(func=run_app) parser_app.add_argument( "-p", "--port", type=int, default=5000, help="The port over which to serve the app", ) parser_app.add_argument( "-s", "--scheduler", default="default", choices=["default", "dafni"], help="The module scheduling implementation to use", ) parser_app.add_argument( "-u", "--username", help="The username for logging in to the dafni JobSubmissionAPI, \ only needed with the dafni job scheduler", ) parser_app.add_argument( "-pw", "--password", help="The password for logging in to the dafni JobSubmissionAPI, \ only needed with the dafni job scheduler", ) # RUN parser_run = subparsers.add_parser( "run", help="Run a modelrun", parents=[parent_parser] ) parser_run.set_defaults(func=run) parser_run.add_argument( "-w", "--warm", action="store_true", help="Use intermediate results from the last modelrun \ and continue from where it had left", ) parser_run.add_argument( "-b", "--batchfile", action="store_true", help="Use a batchfile instead of a modelrun name (a \ list of modelrun names)", ) parser_run.add_argument("modelrun", help="Name of the model run to run") parser_run.add_argument( "-n", "--dry-run", action="store_true", help="Do not execute individual models, print steps instead", ) # BEFORE RUN parser_before_step = subparsers.add_parser( "before_step", help="Initialise a model before stepping through", parents=[parent_parser], ) parser_before_step.set_defaults(func=before_step) parser_before_step.add_argument("modelrun", help="Name of the model run") parser_before_step.add_argument( "-m", "--model", required=True, help="The individual model to run." ) # DECIDE parser_decide = subparsers.add_parser( "decide", help="Run a decision step", parents=[parent_parser] ) parser_decide.set_defaults(func=decide) parser_decide.add_argument("modelrun", help="Name of the model run") parser_decide.add_argument( "-dn", "--decision", type=int, default=0, help="The decision step to run: either 0 to start a run, or " "n+1 where n is the maximum previous decision iteration " "for which all steps have been simulated", ) # STEP parser_step = subparsers.add_parser( "step", help="Run a model step", parents=[parent_parser] ) parser_step.set_defaults(func=step) parser_step.add_argument("modelrun", help="Name of the model run") parser_step.add_argument( "-m", "--model", required=True, help="The individual model to run." ) parser_step.add_argument( "-t", "--timestep", type=int, required=True, help="The single timestep to run." ) parser_step.add_argument( "-dn", "--decision", type=int, required=True, help="The decision step to run." ) return parser
[docs]def confirm(prompt=None, response=False): """Prompts for a yes or no response from the user Arguments --------- prompt : str, default=None response : bool, default=False Returns ------- bool True for yes and False for no. Notes ----- `response` should be set to the default value assumed by the caller when user simply types ENTER. Examples -------- >>> confirm(prompt='Create Directory?', response=True) Create Directory? [y]|n: True >>> confirm(prompt='Create Directory?', response=False) Create Directory? [n]|y: False >>> confirm(prompt='Create Directory?', response=False) Create Directory? [n]|y: y True """ if prompt is None: prompt = "Confirm" if response: prompt = "{} [{}]|{}: ".format(prompt, "y", "n") else: prompt = "{} [{}]|{}: ".format(prompt, "n", "y") while True: ans = input(prompt) if not ans: return response if ans not in ["y", "Y", "n", "N"]: print("please enter y or n.") continue if ans in ["y", "Y"]: return True if ans in ["n", "N"]: return False
[docs]def main(arguments=None): """Parse args and run""" parser = parse_arguments() args = parser.parse_args(arguments) try: smif.cli.log.setup_logging(args.verbose) except AttributeError: # verbose is only set on subcommands - so `smif` or `smif -h` would error pass def exception_handler( exception_type, exception, traceback, debug_hook=sys.excepthook ): if args.verbose: debug_hook(exception_type, exception, traceback) else: print("{}: {}".format(exception_type.__name__, exception), file=sys.stderr) sys.excepthook = exception_handler if "func" in args: args.func(args) else: parser.print_help()