# -*- coding: utf-8 -*-
"""Validate the correct format and presence of the config data
for the system-of-systems model
"""
import itertools
from smif.exception import SmifDataError, SmifDataInputError, SmifValidationError
VALIDATION_ERRORS = []
[docs]def validate_sos_model_config(sos_model, sector_models, scenarios):
"""Check expected values for data loaded from master config file"""
errors = []
if not isinstance(sos_model, dict):
msg = "Main config file should contain setup data, instead found: {}"
err = SmifValidationError(msg.format(sos_model))
errors.append(err)
return
# check description
errors.extend(_validate_description(sos_model))
# check sector models
errors.extend(_validate_sos_model_models(sos_model, sector_models))
# check scenarios
errors.extend(_validate_sos_model_scenarios(sos_model, scenarios))
# check narratives
errors.extend(_validate_sos_model_narratives(sos_model, sector_models))
# check dependencies
errors.extend(_validate_sos_model_deps(sos_model, sector_models, scenarios))
if errors:
raise SmifDataError(errors)
def _validate_sos_model_models(sos_model, sector_models):
errors = []
if not sos_model["sector_models"]:
errors.append(
SmifDataInputError(
"sector_models",
"At least one sector model must be selected.",
"A system-of-systems model requires to have at least one system "
+ "enabled to build a valid configuration.",
)
)
for sector_model in sos_model["sector_models"]:
if sector_model not in [sector_model["name"] for sector_model in sector_models]:
errors.append(
SmifDataInputError(
"sector_models",
"%s must have a valid sector_model configuration." % (sector_model),
"Smif refers to the sector_model-configurations to find "
+ "details about a selected sector_model.",
)
)
return errors
def _validate_sos_model_scenarios(sos_model, scenarios):
errors = []
for scenario in sos_model["scenarios"]:
if scenario not in [scenario["name"] for scenario in scenarios]:
errors.append(
SmifDataInputError(
"scenarios",
"%s must have a valid scenario configuration." % (scenario),
"Smif refers to the scenario-configurations to find "
+ "details about a selected scenario.",
)
)
return errors
def _validate_sos_model_narratives(sos_model, sector_models):
errors = []
for narrative in sos_model["narratives"]:
# Check provides are valid
for model_name in narrative["provides"]:
# A narrative can only provides for enabled models
if model_name not in sos_model["sector_models"]:
errors.append(
SmifDataInputError(
"narratives",
(
"Narrative `%s` provides data for model `%s` that is not enabled "
+ "in this system-of-systems model."
)
% (narrative["name"], model_name),
"A narrative can only provide for enabled models.",
)
)
else:
# A narrative can only provides parameters that exist in the model
try:
sector_model = _pick_sector_model(model_name, sector_models)
except KeyError:
msg = (
"Narrative `{}` provides data for model `{}` that is not found."
)
errors.append(
SmifDataInputError(
"models",
msg.format(narrative["name"], model_name),
"A narrative can only provide for existing models.",
)
)
sector_model = {"parameters": []}
parameters = [
parameter["name"] for parameter in sector_model["parameters"]
]
for provide in narrative["provides"][model_name]:
msg = "Narrative `{}` provides data for non-existing model parameter `{}`"
if provide not in parameters:
errors.append(
SmifDataInputError(
"narratives",
msg.format(narrative["name"], provide),
"A narrative can only provide existing model parameters.",
)
)
# Check if all variants are valid
for variant in narrative["variants"]:
should_provide = list(itertools.chain(*narrative["provides"].values()))
variant_provides = list(variant["data"].keys())
if sorted(variant_provides) != sorted(should_provide):
msg = "Narrative `{}`, variant `{}` provides incorrect data."
errors.append(
SmifDataInputError(
"narratives",
msg.format(narrative["name"], variant["name"]),
"A variant can only provide data for parameters that are specified "
+ "by the narrative.",
)
)
return errors
def _pick_sector_model(name, models):
for model in models:
if model["name"] == name:
return model
raise KeyError("Model '{}' not found in models".format(name))
def _validate_sos_model_deps(sos_model, sector_models, scenarios):
errors = []
errors.extend(
_validate_dependencies(
sos_model,
"model_dependencies",
sector_models,
"sector_models",
sector_models,
"sector_models",
)
)
errors.extend(
_validate_dependencies(
sos_model,
"scenario_dependencies",
scenarios,
"scenarios",
sector_models,
"sector_models",
)
)
return errors
def _validate_description(configuration):
errors = []
if len(configuration["description"]) > 255:
errors.append(
SmifDataInputError(
"description",
"Description must not contain more than 255 characters.",
"A description should briefly outline a `%s` configuration."
% (configuration["name"]),
)
)
return errors
def _validate_dependencies(configuration, conf_key, source, source_key, sink, sink_key):
errors = []
for idx, dependency in enumerate(configuration[conf_key]):
errors.extend(_validate_dependency_cycle(idx, dependency, conf_key))
errors.extend(
_validate_dependency_in_sos_model(
idx, dependency, configuration, conf_key, source_key, sink_key
)
)
errors.extend(
_validate_dependency(
idx, dependency, conf_key, source, source_key, sink, sink_key
)
)
return errors
def _validate_dependency_cycle(idx, dependency, conf_key):
errors = []
# Circular dependencies are not allowed
is_current = "timestep" not in dependency or dependency["timestep"] == "CURRENT"
if dependency["source"] == dependency["sink"] and is_current:
errors.append(
SmifDataInputError(
conf_key,
"(Dependency %s) Circular dependencies are not allowed." % (idx + 1),
"Smif does not support self-dependencies unless the dependency is on "
+ "output from a previous timestep.",
)
)
return errors
def _validate_dependency_in_sos_model(
idx, dependency, configuration, conf_key, source_key, sink_key
):
errors = []
# Source / Sink must be enabled in sos_model config
if dependency["source"] not in configuration[source_key]:
errors.append(
SmifDataInputError(
conf_key,
"(Dependency %s) Source `%s` is not enabled."
% (idx + 1, dependency["source"]),
"Each dependency source must be enabled in the sos-model",
)
)
if dependency["sink"] not in configuration[sink_key]:
errors.append(
SmifDataInputError(
conf_key,
"(Dependency %s) Sink `%s` is not enabled."
% (idx + 1, dependency["sink"]),
"Each dependency sink must be enabled in the sos-model",
)
)
# Sink can only have a single dependency
dep_sinks = [
(dependency["sink"], dependency["sink_input"])
for dependency in configuration[conf_key]
]
if dep_sinks.count((dependency["sink"], dependency["sink_input"])) > 1:
errors.append(
SmifDataInputError(
conf_key,
"(Dependency %s) Sink input `%s` is driven by multiple sources."
% (idx + 1, dependency["sink_input"]),
"A model input can only be driven by a single model output.",
)
)
return errors
def _validate_dependency(idx, dependency, conf_key, source, source_key, sink, sink_key):
errors = []
# Source and sink model configurations must exist
source_model = [model for model in source if model["name"] == dependency["source"]]
sink_model = [model for model in sink if model["name"] == dependency["sink"]]
if not source_model:
errors.append(
SmifDataInputError(
conf_key,
"(Dependency %s) Source `%s` does not exist."
% (idx + 1, dependency["source"]),
"Each dependency source must have a `%s` configuration." % (source_key),
)
)
if not sink_model:
errors.append(
SmifDataInputError(
conf_key,
"(Dependency %s) Sink `%s` does not exist."
% (idx + 1, dependency["sink"]),
"Each dependency sink must have a `%s` configuration." % (sink_key),
)
)
if not sink_model or not source_model:
# not worth doing further checks if source/sink does not exist
return errors
# Source_output and sink_input must exist
if source_key == "sector_models":
source_model_outputs = [
output
for output in source_model[0]["outputs"]
if output["name"] == dependency["source_output"]
]
if source_key == "scenarios":
source_model_outputs = [
output
for output in source_model[0]["provides"]
if output["name"] == dependency["source_output"]
]
sink_model_inputs = [
input_
for input_ in sink_model[0]["inputs"]
if input_["name"] == dependency["sink_input"]
]
if not source_model_outputs:
errors.append(
SmifDataInputError(
conf_key,
"(Dependency %s) Source output `%s` does not exist."
% (idx + 1, dependency["source_output"]),
"Each dependency source output must exist in the `%s` configuration."
% (source_key),
)
)
if not sink_model_inputs:
errors.append(
SmifDataInputError(
conf_key,
"(Dependency %s) Sink input `%s` does not exist."
% (idx + 1, dependency["sink_input"]),
"Each dependency sink input must exist in the `%s` configuration."
% (sink_key),
)
)
if not source_model_outputs or not sink_model_inputs:
# not worth doing further checks if source_output/sink_input does not exist
return errors
# Source_output and sink_input must have matching specs
source_model_output = source_model_outputs[0]
sink_model_input = sink_model_inputs[0]
if "dims" in source_model_output and "dims" in sink_model_input:
if sorted(source_model_output["dims"]) != sorted(sink_model_input["dims"]):
errors.append(
SmifDataInputError(
conf_key,
"(Dependency %s) Source `%s` has different dimensions than sink "
% (idx + 1, source_model_output["name"])
+ "`%s` (%s != %s)."
% (
sink_model_input["name"],
source_model_output["dims"],
sink_model_input["dims"],
),
"Dependencies must have matching dimensions.",
)
)
else:
if "dims" in source_model_output or "dims" in sink_model_input:
errors.append(
SmifDataInputError(
conf_key,
"(Dependency %s) Source `%s` has different dimensions than sink "
% (idx + 1, source_model_output["name"])
+ "`%s` (%s != %s)."
% (sink_model_input["name"], source_model_output, sink_model_input),
"Dependencies must have matching dimensions.",
)
)
if source_model_output["dtype"] != sink_model_input["dtype"]:
errors.append(
SmifDataInputError(
conf_key,
"(Dependency %s) Source `%s` has a different dtype than sink "
% (
idx + 1,
source_model_output["name"],
)
+ "`%s` (%s != %s)."
% (
sink_model_input["name"],
source_model_output["dtype"],
sink_model_input["dtype"],
),
"Dependencies must have matching data types.",
)
)
return errors
[docs]def validate_path_to_timesteps(timesteps):
"""Check timesteps is a path to timesteps file"""
if not isinstance(timesteps, str):
VALIDATION_ERRORS.append(
SmifValidationError(
"Expected 'timesteps' in main config to specify "
+ "a timesteps file, instead got {}.".format(timesteps)
)
)
[docs]def validate_timesteps(timesteps, file_path):
"""Check timesteps is a list of integers"""
if not isinstance(timesteps, list):
msg = "Loading {}: expected a list of timesteps.".format(file_path)
VALIDATION_ERRORS.append(SmifValidationError(msg))
else:
msg = "Loading {}: timesteps should be integer years, instead got {}"
for timestep in timesteps:
if not isinstance(timestep, int):
VALIDATION_ERRORS.append(msg.format(file_path, timestep))
[docs]def validate_time_intervals(intervals, file_path):
"""Check time intervals"""
if not isinstance(intervals, list):
msg = "Loading {}: expected a list of time intervals.".format(file_path)
VALIDATION_ERRORS.append(SmifValidationError(msg))
else:
for interval in intervals:
validate_time_interval(interval)
[docs]def validate_time_interval(interval):
"""Check a single time interval"""
if not isinstance(interval, dict):
msg = "Expected a time interval, instead got {}.".format(interval)
VALIDATION_ERRORS.append(SmifValidationError(msg))
return
required_keys = ["id", "start", "end"]
for key in required_keys:
if key not in interval:
fmt = (
"Expected a value for '{}' in each " + "time interval, only received {}"
)
VALIDATION_ERRORS.append(SmifValidationError(fmt.format(key, interval)))
[docs]def validate_sector_models_initial_config(sector_models):
"""Check list of sector models initial configuration"""
if not isinstance(sector_models, list):
fmt = (
"Expected 'sector_models' in main config to "
+ "specify a list of sector models to run, instead got {}."
)
VALIDATION_ERRORS.append(SmifValidationError(fmt.format(sector_models)))
else:
if len(sector_models) == 0:
VALIDATION_ERRORS.append(
SmifValidationError("No 'sector_models' specified in main config file.")
)
# check each sector model
for sector_model_config in sector_models:
validate_sector_model_initial_config(sector_model_config)
[docs]def validate_sector_model_initial_config(sector_model_config):
"""Check a single sector model initial configuration"""
if not isinstance(sector_model_config, dict):
fmt = "Expected a sector model config block, instead got {}"
VALIDATION_ERRORS.append(SmifValidationError(fmt.format(sector_model_config)))
return
required_keys = ["name", "config_dir", "path", "classname"]
for key in required_keys:
if key not in sector_model_config:
fmt = (
"Expected a value for '{}' in each "
+ "sector model in main config file, only received {}"
)
VALIDATION_ERRORS.append(
SmifValidationError(fmt.format(key, sector_model_config))
)
[docs]def validate_dependency_spec(input_spec, model_name):
"""Check the input specification for a single sector model"""
if not isinstance(input_spec, list):
fmt = (
"Expected a list of parameter definitions in '{}' model "
+ "input specification, instead got {}"
)
VALIDATION_ERRORS.append(
SmifValidationError(fmt.format(model_name, input_spec))
)
return
for dep in input_spec:
validate_dependency(dep)
[docs]def validate_dependency(dep):
"""Check a dependency specification"""
if not isinstance(dep, dict):
fmt = "Expected a dependency specification, instead got {}"
VALIDATION_ERRORS.append(SmifValidationError(fmt.format(dep)))
return
required_keys = ["name", "spatial_resolution", "temporal_resolution", "units"]
for key in required_keys:
if key not in dep:
fmt = "Expected a value for '{}' in each model dependency, only received {}"
VALIDATION_ERRORS.append(SmifValidationError(fmt.format(key, dep)))
[docs]def validate_scenario_data_config(scenario_data):
"""Check scenario data"""
if not isinstance(scenario_data, list):
fmt = (
"Expected a list of scenario datasets in main model config, "
+ "instead got {}"
)
VALIDATION_ERRORS.append(SmifValidationError(fmt.format(scenario_data)))
return
for scenario in scenario_data:
validate_scenario(scenario)
[docs]def validate_scenario(scenario):
"""Check a single scenario specification"""
if not isinstance(scenario, dict):
fmt = "Expected a scenario specification, instead got {}"
VALIDATION_ERRORS.append(SmifValidationError(fmt.format(scenario)))
return
required_keys = [
"parameter",
"spatial_resolution",
"temporal_resolution",
"units",
"file",
]
for key in required_keys:
if key not in scenario:
fmt = "Expected a value for '{}' in each scenario, only received {}"
VALIDATION_ERRORS.append(SmifValidationError(fmt.format(key, scenario)))
[docs]def validate_scenario_data(data, file_path):
"""Check a list of scenario observations"""
if not isinstance(data, list):
fmt = "Expected a list of scenario data in {}"
VALIDATION_ERRORS.append(SmifValidationError(fmt.format(file_path)))
return
for datum in data:
validate_scenario_datum(datum, file_path)
[docs]def validate_scenario_datum(datum, file_path):
"""Check a single scenario datum"""
if not isinstance(datum, dict):
fmt = "Expected a scenario data point, instead got {}"
VALIDATION_ERRORS.append(SmifValidationError(fmt.format(datum)))
return
required_keys = ["region", "interval", "year", "value"]
for key in required_keys:
if key not in datum:
fmt = (
"Expected a value for '{}' in each data point in a scenario, "
+ "only received {}"
)
VALIDATION_ERRORS.append(SmifValidationError(fmt.format(key, datum)))
[docs]def validate_initial_conditions(data, file_path):
"""Check a list of initial condition observations"""
if not isinstance(data, list):
fmt = "Expected a list of initial conditions in {}"
VALIDATION_ERRORS.append(SmifValidationError(fmt.format(file_path)))
return
for datum in data:
validate_initial_condition(datum, file_path)
[docs]def validate_initial_condition(datum, file_path):
"""Check a single initial condition datum"""
if not isinstance(datum, dict):
fmt = "Expected a initial condition data point, instead got {} from {}"
VALIDATION_ERRORS.append(SmifValidationError(fmt.format(datum, file_path)))
return
required_keys = ["name", "build_date"]
for key in required_keys:
if key not in datum:
fmt = (
"Expected a value for '{}' in each data point in a initial condition, "
+ "only received {} from {}"
)
VALIDATION_ERRORS.append(
SmifValidationError(fmt.format(key, datum, file_path))
)
[docs]def validate_planning_config(planning):
"""Check planning options"""
required_keys = ["pre_specified", "rule_based", "optimisation"]
for key in required_keys:
if key not in planning:
fmt = (
"No '{}' settings specified under 'planning' " + "in main config file."
)
VALIDATION_ERRORS.append(SmifValidationError(fmt.format(key)))
# check each planning type
for key, planning_type in planning.items():
if "use" not in planning_type:
fmt = "No 'use' settings specified for '{}' 'planning'"
VALIDATION_ERRORS.append(SmifValidationError(fmt.format(key)))
continue
if planning_type["use"]:
if (
"files" not in planning_type
or not isinstance(planning_type["files"], list)
or len(planning_type["files"]) == 0
):
fmt = (
"No 'files' provided for the '{}' "
+ "planning type in main config file."
)
VALIDATION_ERRORS.append(SmifValidationError(fmt.format(key)))
[docs]def validate_region_sets_config(region_sets):
"""Check regions sets"""
required_keys = ["name", "file"]
for key in required_keys:
for region_set in region_sets:
if key not in region_set:
fmt = (
"Expected a value for '{}' in each "
+ "region set in main config file, only received {}"
)
VALIDATION_ERRORS.append(
SmifValidationError(fmt.format(key, region_set))
)
[docs]def validate_interval_sets_config(interval_sets):
"""Check interval sets"""
required_keys = ["name", "file"]
for key in required_keys:
for interval_set in interval_sets:
if key not in interval_set:
fmt = (
"Expected a value for '{}' in each "
+ "interval set in main config file, only received {}"
)
VALIDATION_ERRORS.append(
SmifValidationError(fmt.format(key, interval_set))
)
[docs]def validate_interventions(data, path):
"""Validate the loaded data as required for model interventions"""
# check required keys
required_keys = [
"name",
"location",
"capital_cost",
"operational_lifetime",
"economic_lifetime",
]
# except for some keys which are allowed simple values,
# expect each attribute to be of the form {value: x, units: y}
simple_keys = ["name", "sector", "location"]
for intervention in data:
for key in required_keys:
if key not in intervention:
fmt = (
"Loading interventions from {}, required "
+ "a value for '{}' in each intervention, but only "
+ "received {}"
)
VALIDATION_ERRORS.append(
SmifValidationError(fmt.format(path, key, intervention))
)
for key, value in intervention.items():
if key not in simple_keys and (
not isinstance(value, dict)
or "value" not in value
or "units" not in value
):
fmt = (
"Loading interventions from {3}, {0}.{1} was {2} but "
+ "should have specified units, "
+ "e.g. {{'value': {2}, 'units': 'm'}}"
)
msg = fmt.format(intervention["name"], key, value, path)
VALIDATION_ERRORS.append(SmifValidationError(msg))