# -*- coding: utf-8 -*-
"""A command line interface to the system of systems framework
This command line interface implements a number of methods.
- `setup` creates an example project with the recommended folder structure
- `run` performs a simulation of an individual sector model, or the whole system
of systems model
- `validate` performs a validation check of the configuration file
- `app` runs the graphical user interface, opening in a web browser
Folder structure
----------------
When configuring a project for the CLI, the folder structure below should be
used. In this example, there is one system-of-systems model, combining a water
supply and an energy demand model::
/config
project.yaml
/sector_models
energy_demand.yml
water_supply.yml
/sos_models
energy_water.yml
/model_runs
run_to_2050.yml
short_test_run.yml
...
/data
/initial_conditions
reservoirs.yml
/interval_definitions
annual_intervals.csv
/interventions
water_supply.yml
/narratives
high_tech_dsm.yml
/region_definitions
/oxfordshire
regions.geojson
/uk_nations_shp
regions.shp
/scenarios
population.csv
raininess.csv
/water_supply
/initial_system
/models
energy_demand.py
water_supply.py
/planning
expected_to_2020.yaml
national_infrastructure_pipeline.yml
The sector model implementations can be installed independently of the model run
configuration. The paths to python wrapper classes (implementing SectorModel)
should be specified in each ``sector_model/*.yml`` configuration.
The project.yaml file specifies the metadata shared by all elements of the
project; ``sos_models`` specify the combinations of ``sector_models`` and
``scenarios`` while individual ``model_runs`` specify the scenario, strategy
and narrative combinations to be used in each run of the models.
"""
from __future__ import print_function
import glob
import logging
import os
import sys
from argparse import ArgumentParser
import pkg_resources
import pandas
import smif
import smif.cli.log
from smif.controller import (copy_project_folder, execute_decision_step,
execute_model_before_step, execute_model_run,
execute_model_step)
from smif.controller.run import DAFNIRunScheduler, SubProcessRunScheduler
from smif.data_layer import Store
from smif.data_layer.file import (CSVDataStore, FileMetadataStore,
ParquetDataStore, YamlConfigStore)
from smif.http_api import create_app
try:
import _thread
except ImportError:
import thread as _thread
try:
import win32api
USE_WIN32 = True
except ImportError:
USE_WIN32 = False
__author__ = "Will Usher, Tom Russell"
__copyright__ = "Will Usher, Tom Russell"
__license__ = "mit"
[docs]def list_model_runs(args):
"""List the model runs defined in the config, optionally indicating whether complete
results exist.
"""
store = _get_store(args)
model_run_configs = store.read_model_runs()
if args.complete:
print('Model runs with an asterisk (*) have complete results available\n')
for run in model_run_configs:
run_name = run['name']
if args.complete:
expected_results = store.canonical_expected_results(run_name)
available_results = store.canonical_available_results(run_name)
complete = ' *' if expected_results == available_results else ''
print('{}{}'.format(run_name, complete))
else:
print(run_name)
[docs]def list_available_results(args):
"""List the available results for a specified model run.
"""
store = _get_store(args)
expected = store.canonical_expected_results(args.model_run)
available = store.available_results(args.model_run)
# Print run and sos model
run = store.read_model_run(args.model_run)
print('\nmodel run: {}'.format(args.model_run))
print('{}- sos model: {}'.format(' ' * 2, run['sos_model']))
# List of expected sector models
sec_models = sorted({sec for _t, _d, sec, _out in expected})
for sec_model in sec_models:
print('{}- sector model: {}'.format(' ' * 4, sec_model))
# List expected outputs for this sector model
outputs = sorted({out for _t, _d, sec, out in expected if sec == sec_model})
for output in outputs:
print('{}- output: {}'.format(' ' * 6, output))
# List available decisions for this sector model and output
decs = sorted({d for _t, d, sec, out in available if
sec == sec_model and out == output})
if len(decs) == 0:
print('{}- no results'.format(' ' * 8))
for dec in decs:
base_str = '{}- decision {}:'.format(' ' * 8, dec)
# List available time steps for this decision, sector model and output
ts = sorted({t for t, d, sec, out in available if
d == dec and sec == sec_model and out == output})
assert (len(
ts) > 0), "If a decision is available, so is at least one time step"
res_str = ', '.join([str(t) for t in ts])
print('{} {}'.format(base_str, res_str))
[docs]def list_missing_results(args):
"""List the missing results for a specified model run.
"""
store = _get_store(args)
expected = store.canonical_expected_results(args.model_run)
missing = store.canonical_missing_results(args.model_run)
# Print run and sos model
run = store.read_model_run(args.model_run)
print('\nmodel run: {}'.format(args.model_run))
print('{}- sos model: {}'.format(' ' * 2, run['sos_model']))
# List of expected sector models
sec_models = sorted({sec for _t, _d, sec, _out in expected})
for sec_model in sec_models:
print('{}- sector model: {}'.format(' ' * 4, sec_model))
# List expected outputs for this sector model
outputs = sorted({out for _t, _d, sec, out in expected if sec == sec_model})
for output in outputs:
print('{}- output: {}'.format(' ' * 6, output))
# List missing time steps for this sector model and output
ts = sorted({t for t, d, sec, out in missing if
sec == sec_model and out == output})
if len(ts) == 0:
print('{}- no missing results'.format(' ' * 8))
else:
base_str = '{}- results missing for:'.format(' ' * 8)
res_str = ', '.join([str(t) for t in ts])
print('{} {}'.format(base_str, res_str))
[docs]def prepare_convert(args):
src_store = _get_store(args)
if isinstance(src_store.data_store, CSVDataStore):
tgt_store = Store(
config_store=YamlConfigStore(args.directory),
metadata_store=FileMetadataStore(args.directory),
data_store=ParquetDataStore(args.directory),
model_base_folder=(args.directory)
)
else:
tgt_store = Store(
config_store=YamlConfigStore(args.directory),
metadata_store=FileMetadataStore(args.directory),
data_store=CSVDataStore(args.directory),
model_base_folder=(args.directory)
)
# Read model run
model_run = src_store.read_model_run(args.model_run)
# Read sos model for model run
sos_model = src_store.read_sos_model(model_run['sos_model'])
# Now let us convert data
# Convert strategies interventions for model run
src_store.convert_strategies_data(model_run['name'], tgt_store, args.noclobber)
# Convert scenario data for model run
src_store.convert_scenario_data(model_run['name'], tgt_store)
# Convert narrative data for sos model
src_store.convert_narrative_data(sos_model['name'], tgt_store, args.noclobber)
# Convert initial conditions, default parameter and interventions data
# for sector models in sos model
for sector_model_name in sos_model['sector_models']:
src_store.convert_model_parameter_default_data(
sector_model_name, tgt_store, args.noclobber)
src_store.convert_interventions_data(sector_model_name, tgt_store, args.noclobber)
src_store.convert_initial_conditions_data(sector_model_name, tgt_store, args.noclobber)
[docs]def csv2parquet(args):
"""Convert CSV to Parquet - assuming the CSV can be parsed as a dataframe
"""
path = args.path
if ".csv" in path:
files = [path]
else:
files = glob.glob(os.path.join(path, '**', '*.csv'), recursive=True)
for csv_path in files:
parquet_path = csv_path.replace(".csv", ".parquet")
if args.noclobber and os.path.exists(parquet_path):
print("Skipping", csv_path)
else:
print("Converting", csv_path, flush=True)
try:
dataframe = pandas.read_csv(csv_path)
dataframe.to_parquet(parquet_path, engine='pyarrow', compression='gzip')
except UnicodeDecodeError:
# guess that cp1252 is next most common encoding we'll come across
dataframe = pandas.read_csv(csv_path, encoding='cp1252')
dataframe.to_parquet(parquet_path, engine='pyarrow', compression='gzip')
except pandas.errors.ParserError as ex:
# nothing we can do with ParserError - usually a data problem
print(ex)
continue
[docs]def prepare_scenario(args):
"""Update scenario configuration file to include multiple scenario variants.
The initial scenario configuration file is overwritten.
"""
# Read template scenario using the Store class
store = _get_store(args)
list_of_variants = range(args.variants_range[0], args.variants_range[1] + 1)
store.prepare_scenario(args.scenario_name, list_of_variants)
[docs]def prepare_model_runs(args):
"""Generate multiple model runs according to a model run file referencing a scenario
with multiple variants.
"""
# Read model run and scenario using the Store class
store = _get_store(args)
nb_variants = len(store.read_scenario_variants(args.scenario_name))
# Define default lower and upper of variant range
var_start = 0
var_end = nb_variants
# Check if optional cli arguments specify range of variants
# They are compared to None because they can be 0
if args.start is not None:
var_start = args.start
if var_start < 0:
raise ValueError('Lower bound of variant range must be >=0')
if var_start > nb_variants:
raise ValueError("Lower bound of variant range greater"
" than number of variants")
if args.end is not None:
var_end = args.end
if var_end < 0:
raise ValueError('Upper bound of variant range must be >=0')
if var_end > nb_variants - 1:
raise ValueError("Upper bound of variant range cannot be greater"
" than {:d}".format(nb_variants - 1))
if var_end < var_start:
raise ValueError("Upper bound of variant range must be >= lower"
" bound of variant range")
store.prepare_model_runs(args.model_run_name, args.scenario_name,
var_start, var_end)
[docs]def before_step(args):
"""Prepare a single model to run (call once before calling `smif step`)
Parameters
----------
args
"""
store = _get_store(args)
execute_model_before_step(args.modelrun, args.model, store)
[docs]def step(args):
"""Run a single model for a single timestep
Parameters
----------
args
"""
store = _get_store(args)
execute_model_step(args.modelrun, args.model, args.timestep, args.decision, store)
[docs]def decide(args):
"""Run a decision step for a model run
Parameters
----------
args
"""
store = _get_store(args)
execute_decision_step(args.modelrun, args.decision, store)
[docs]def run(args):
"""Run the model runs as requested. Check if results exist and asks
user for permission to overwrite
Parameters
----------
args
"""
logger = logging.getLogger(__name__)
msg = '{:s}, {:s}, {:s}'.format(args.modelrun, args.interface, args.directory)
try:
logger.profiling_start('run_model_runs', msg)
except AttributeError:
logger.info('START run_model_runs %s', msg)
if args.batchfile:
with open(args.modelrun, 'r') as f:
model_run_ids = f.read().splitlines()
else:
model_run_ids = [args.modelrun]
store = _get_store(args)
execute_model_run(model_run_ids, store, args.warm, args.dry_run)
try:
logger.profiling_stop('run_model_runs', msg)
if not args.dry_run:
logger.summary()
except AttributeError:
logger.info('STOP run_model_runs %s', msg)
def _get_store(args):
"""Contruct store as configured by arguments
"""
if args.interface == 'local_csv':
store = Store(
config_store=YamlConfigStore(args.directory),
metadata_store=FileMetadataStore(args.directory),
data_store=CSVDataStore(args.directory),
model_base_folder=args.directory
)
elif args.interface == 'local_binary':
store = Store(
config_store=YamlConfigStore(args.directory),
metadata_store=FileMetadataStore(args.directory),
data_store=ParquetDataStore(args.directory),
model_base_folder=args.directory
)
else:
raise ValueError("Store interface type {} not recognised.".format(args.interface))
return store
def _run_server(args):
app_folder = pkg_resources.resource_filename('smif', 'app/dist')
if args.scheduler == 'dafni' and args.interface != 'local_csv':
msg = "Scheduler implementation {0}, is not valid when combined with {1}."
raise ValueError(msg.format(args.scheduler, args.interface))
if args.scheduler == 'default':
model_scheduler = SubProcessRunScheduler()
elif args.scheduler == 'dafni':
model_scheduler = DAFNIRunScheduler(args.username, args.password)
else:
raise ValueError("Scheduler implentation {} not recognised.".format(args.scheduler))
app = create_app(
static_folder=app_folder,
template_folder=app_folder,
data_interface=_get_store(args),
scheduler=model_scheduler
)
print(" Opening smif app\n")
print(" Copy/paste this URL into your web browser to connect:")
print(" http://localhost:" + str(args.port) + "\n")
# add flush to ensure that text is printed before server thread starts
print(" Close your browser then type Control-C here to quit.", flush=True)
app.run(host='0.0.0.0', port=args.port, threaded=True)
[docs]def run_app(args):
"""Run the client/server application
Parameters
----------
args
"""
# avoid one of two error messages from 'forrtl error(200)' when running
# on windows cmd - seems related to scipy's underlying Fortran
os.environ['FOR_DISABLE_CONSOLE_CTRL_HANDLER'] = 'T'
if USE_WIN32:
# Set handler for CTRL-C. Necessary to avoid `forrtl: error (200):
# program aborting...` crash on CTRL-C if we're runnging from Windows
# cmd.exe
def handler(dw_ctrl_type, hook_sigint=_thread.interrupt_main):
"""Handler for CTRL-C interrupt
"""
if dw_ctrl_type == 0: # CTRL-C
hook_sigint()
return 1 # don't chain to the next handler
return 0 # chain to the next handler
win32api.SetConsoleCtrlHandler(handler, 1)
# Create backend server process
_run_server(args)
[docs]def setup_project_folder(args):
"""Setup a sample project
"""
copy_project_folder(args.directory)
[docs]def parse_arguments():
"""Parse command line arguments
Returns
=======
:class:`argparse.ArgumentParser`
"""
parser = ArgumentParser(description='Command line tools for smif')
parser.add_argument('-V', '--version',
action='version',
version="smif " + smif.__version__,
help='show the current version of smif')
parent_parser = ArgumentParser(add_help=False)
parent_parser.add_argument('-v', '--verbose',
action='count',
help='show messages: -v to see messages reporting on ' +
'progress, -vv to see debug messages.')
parent_parser.add_argument('-i', '--interface',
default='local_csv',
choices=['local_csv', 'local_binary'],
help="Select the data interface (default: %(default)s)")
parent_parser.add_argument('-d', '--directory',
default='.',
help="Path to the project directory")
subparsers = parser.add_subparsers(help='available commands')
# SETUP
parser_setup = subparsers.add_parser(
'setup', help='Setup the project folder', parents=[parent_parser])
parser_setup.set_defaults(func=setup_project_folder)
# LIST
parser_list = subparsers.add_parser(
'list', help='List available model runs', parents=[parent_parser])
parser_list.set_defaults(func=list_model_runs)
parser_list.add_argument('-c', '--complete',
help="Show which model runs have complete results",
action='store_true')
# RESULTS
parser_available_results = subparsers.add_parser(
'available_results', help='List available results', parents=[parent_parser])
parser_available_results.set_defaults(func=list_available_results)
parser_available_results.add_argument(
'model_run',
help="Name of the model run to list available results"
)
parser_missing_results = subparsers.add_parser(
'missing_results', help='List missing results', parents=[parent_parser])
parser_missing_results.set_defaults(func=list_missing_results)
parser_missing_results.add_argument(
'model_run',
help="Name of the model run to list missing results"
)
# PREPARE
parser_convert = subparsers.add_parser(
'prepare-convert', help='Convert data from one format to another',
parents=[parent_parser])
parser_convert.set_defaults(func=prepare_convert)
parser_convert.add_argument(
'model_run', help='Name of the model run')
parser_convert.add_argument(
'-nc', '--noclobber',
help='Do not convert existing data files', action='store_true')
parser_prepare_scenario = subparsers.add_parser(
'prepare-scenario', help='Prepare scenario configuration file with multiple variants',
parents=[parent_parser])
parser_prepare_scenario.set_defaults(func=prepare_scenario)
parser_prepare_scenario.add_argument(
'scenario_name', help='Name of the scenario')
parser_prepare_scenario.add_argument(
'variants_range', nargs=2, type=int,
help='Two integers delimiting the range of variants')
parser_prepare_model_runs = subparsers.add_parser(
'prepare-run', help='Prepare model runs based on scenario variants',
parents=[parent_parser])
parser_prepare_model_runs.set_defaults(func=prepare_model_runs)
parser_prepare_model_runs.add_argument(
'scenario_name', help='Name of the scenario')
parser_prepare_model_runs.add_argument(
'model_run_name', help='Name of the template model run')
parser_prepare_model_runs.add_argument(
'-s', '--start', type=int, help='Lower bound of the range of variants')
parser_prepare_model_runs.add_argument(
'-e', '--end', type=int, help='Upper bound of the range of variants')
# CONVERT
parser_convert_format = subparsers.add_parser(
'csv2parquet', help='Convert CSV to Parquet. Pass a filename or a directory to ' +
'search recurisvely', parents=[parent_parser])
parser_convert_format.set_defaults(func=csv2parquet)
parser_convert_format.add_argument(
'path', help='Path to file')
parser_convert_format.add_argument(
'-nc', '--noclobber',
help='Skip converting data files which already exist as parquet', action='store_true')
# APP
parser_app = subparsers.add_parser(
'app', help='Open smif app', parents=[parent_parser])
parser_app.set_defaults(func=run_app)
parser_app.add_argument('-p', '--port',
type=int,
default=5000,
help="The port over which to serve the app")
parser_app.add_argument('-s', '--scheduler',
default='default',
choices=['default', 'dafni'],
help="The module scheduling implementation to use")
parser_app.add_argument('-u', '--username',
help="The username for logging in to the dafni JobSubmissionAPI, \
only needed with the dafni job scheduler")
parser_app.add_argument('-pw', '--password',
help="The password for logging in to the dafni JobSubmissionAPI, \
only needed with the dafni job scheduler")
# RUN
parser_run = subparsers.add_parser(
'run', help='Run a modelrun', parents=[parent_parser])
parser_run.set_defaults(func=run)
parser_run.add_argument('-w', '--warm',
action='store_true',
help="Use intermediate results from the last modelrun \
and continue from where it had left")
parser_run.add_argument('-b', '--batchfile',
action='store_true',
help="Use a batchfile instead of a modelrun name (a \
list of modelrun names)")
parser_run.add_argument('modelrun',
help="Name of the model run to run")
parser_run.add_argument('-n', '--dry-run',
action='store_true',
help="Do not execute individual models, print steps instead")
# BEFORE RUN
parser_before_step = subparsers.add_parser(
'before_step',
help='Initialise a model before stepping through',
parents=[parent_parser])
parser_before_step.set_defaults(func=before_step)
parser_before_step.add_argument('modelrun',
help="Name of the model run")
parser_before_step.add_argument('-m', '--model',
required=True,
help="The individual model to run.")
# DECIDE
parser_decide = subparsers.add_parser(
'decide', help='Run a decision step', parents=[parent_parser])
parser_decide.set_defaults(func=decide)
parser_decide.add_argument('modelrun',
help="Name of the model run")
parser_decide.add_argument('-dn', '--decision',
type=int,
default=0,
help="The decision step to run: either 0 to start a run, or "
"n+1 where n is the maximum previous decision iteration "
"for which all steps have been simulated")
# STEP
parser_step = subparsers.add_parser(
'step', help='Run a model step', parents=[parent_parser])
parser_step.set_defaults(func=step)
parser_step.add_argument('modelrun',
help="Name of the model run")
parser_step.add_argument('-m', '--model',
required=True,
help="The individual model to run.")
parser_step.add_argument('-t', '--timestep',
type=int,
required=True,
help="The single timestep to run.")
parser_step.add_argument('-dn', '--decision',
type=int,
required=True,
help="The decision step to run.")
return parser
[docs]def confirm(prompt=None, response=False):
"""Prompts for a yes or no response from the user
Arguments
---------
prompt : str, default=None
response : bool, default=False
Returns
-------
bool
True for yes and False for no.
Notes
-----
`response` should be set to the default value assumed by the caller when
user simply types ENTER.
Examples
--------
>>> confirm(prompt='Create Directory?', response=True)
Create Directory? [y]|n:
True
>>> confirm(prompt='Create Directory?', response=False)
Create Directory? [n]|y:
False
>>> confirm(prompt='Create Directory?', response=False)
Create Directory? [n]|y: y
True
"""
if prompt is None:
prompt = 'Confirm'
if response:
prompt = '{} [{}]|{}: '.format(prompt, 'y', 'n')
else:
prompt = '{} [{}]|{}: '.format(prompt, 'n', 'y')
while True:
ans = input(prompt)
if not ans:
return response
if ans not in ['y', 'Y', 'n', 'N']:
print('please enter y or n.')
continue
if ans in ['y', 'Y']:
return True
if ans in ['n', 'N']:
return False
[docs]def main(arguments=None):
"""Parse args and run
"""
parser = parse_arguments()
args = parser.parse_args(arguments)
try:
smif.cli.log.setup_logging(args.verbose)
except AttributeError:
# verbose is only set on subcommands - so `smif` or `smif -h` would error
pass
def exception_handler(exception_type, exception, traceback, debug_hook=sys.excepthook):
if args.verbose:
debug_hook(exception_type, exception, traceback)
else:
print("{}: {}".format(exception_type.__name__, exception), file=sys.stderr)
sys.excepthook = exception_handler
if 'func' in args:
args.func(args)
else:
parser.print_help()