Source code for smif.data_layer.validate

# -*- coding: utf-8 -*-
"""Validate the correct format and presence of the config data
for the system-of-systems model
"""
import itertools

from smif.exception import (SmifDataError, SmifDataInputError,
                            SmifValidationError)

VALIDATION_ERRORS = []


[docs]def validate_sos_model_format(sos_model): errors = [] if not isinstance(sos_model, dict): msg = "Main config file should contain setup data, instead found: {}" err = SmifValidationError(msg.format(sos_model)) errors.append(err) return sos_model default_keys = { 'name': '', 'description': '', 'sector_models': [], 'scenarios': [], 'narratives': [], 'model_dependencies': [], 'scenario_dependencies': [] } # Add default values to missing keys for key, value in default_keys.items(): if key not in sos_model: sos_model[key] = value # Report keys that should not be in the config for key, value in sos_model.items(): if key not in default_keys: errors.append( SmifValidationError( 'Invalid key `%s` in sos_model configuration `%s`.' % (key, sos_model['name'])) ) # Throw collection of errors if errors: raise SmifDataError(errors) return sos_model
[docs]def validate_sos_model_config(sos_model, sector_models, scenarios): """Check expected values for data loaded from master config file """ errors = [] if not isinstance(sos_model, dict): msg = "Main config file should contain setup data, instead found: {}" err = SmifValidationError(msg.format(sos_model)) errors.append(err) return # check description errors.extend(_validate_description(sos_model)) # check sector models errors.extend(_validate_sos_model_models(sos_model, sector_models)) # check scenarios errors.extend(_validate_sos_model_scenarios(sos_model, scenarios)) # check narratives errors.extend(_validate_sos_model_narratives(sos_model, sector_models)) # check dependencies errors.extend(_validate_sos_model_deps(sos_model, sector_models, scenarios)) if errors: raise SmifDataError(errors)
def _validate_sos_model_models(sos_model, sector_models): errors = [] if not sos_model['sector_models']: errors.append( SmifDataInputError( 'sector_models', 'At least one sector model must be selected.', 'A system-of-systems model requires to have at least one system ' + 'enabled to build a valid configuration.')) for sector_model in sos_model['sector_models']: if sector_model not in [sector_model['name'] for sector_model in sector_models]: errors.append( SmifDataInputError( 'sector_models', '%s must have a valid sector_model configuration.' % (sector_model), 'Smif refers to the sector_model-configurations to find ' + 'details about a selected sector_model.')) return errors def _validate_sos_model_scenarios(sos_model, scenarios): errors = [] for scenario in sos_model['scenarios']: if scenario not in [scenario['name'] for scenario in scenarios]: errors.append( SmifDataInputError( 'scenarios', '%s must have a valid scenario configuration.' % (scenario), 'Smif refers to the scenario-configurations to find ' + 'details about a selected scenario.')) return errors def _validate_sos_model_narratives(sos_model, sector_models): errors = [] for narrative in sos_model['narratives']: # Check provides are valid for model_name in narrative['provides']: # A narrative can only provides for enabled models if model_name not in sos_model['sector_models']: errors.append( SmifDataInputError( 'narratives', ('Narrative `%s` provides data for model `%s` that is not enabled ' + 'in this system-of-systems model.') % (narrative['name'], model_name), 'A narrative can only provide for enabled models.')) else: # A narrative can only provides parameters that exist in the model try: sector_model = _pick_sector_model(model_name, sector_models) except KeyError: msg = 'Narrative `{}` provides data for model `{}` that is not found.' errors.append( SmifDataInputError( 'models', msg.format(narrative['name'], model_name), 'A narrative can only provide for existing models.')) sector_model = {'parameters': []} parameters = [ parameter['name'] for parameter in sector_model['parameters'] ] for provide in narrative['provides'][model_name]: msg = 'Narrative `{}` provides data for non-existing model parameter `{}`' if provide not in parameters: errors.append( SmifDataInputError( 'narratives', msg.format(narrative['name'], provide), 'A narrative can only provide existing model parameters.' ) ) # Check if all variants are valid for variant in narrative['variants']: should_provide = list(itertools.chain(*narrative['provides'].values())) variant_provides = list(variant['data'].keys()) if sorted(variant_provides) != sorted(should_provide): msg = 'Narrative `{}`, variant `{}` provides incorrect data.' errors.append( SmifDataInputError( 'narratives', msg.format(narrative['name'], variant['name']), 'A variant can only provide data for parameters that are specified ' + 'by the narrative.')) return errors def _pick_sector_model(name, models): for model in models: if model['name'] == name: return model raise KeyError("Model '{}' not found in models".format(name)) def _validate_sos_model_deps(sos_model, sector_models, scenarios): errors = [] errors.extend(_validate_dependencies( sos_model, 'model_dependencies', sector_models, 'sector_models', sector_models, 'sector_models' )) errors.extend(_validate_dependencies( sos_model, 'scenario_dependencies', scenarios, 'scenarios', sector_models, 'sector_models' )) return errors def _validate_description(configuration): errors = [] if len(configuration['description']) > 255: errors.append( SmifDataInputError( 'description', 'Description must not contain more than 255 characters.', 'A description should briefly outline a `%s` configuration.' % (configuration['name']))) return errors def _validate_dependencies(configuration, conf_key, source, source_key, sink, sink_key): errors = [] for idx, dependency in enumerate(configuration[conf_key]): errors.extend(_validate_dependency_cycle( idx, dependency, conf_key)) errors.extend(_validate_dependency_in_sos_model( idx, dependency, configuration, conf_key, source_key, sink_key)) errors.extend(_validate_dependency( idx, dependency, conf_key, source, source_key, sink, sink_key)) return errors def _validate_dependency_cycle(idx, dependency, conf_key): errors = [] # Circular dependencies are not allowed is_current = 'timestep' not in dependency or dependency['timestep'] == 'CURRENT' if dependency['source'] == dependency['sink'] and is_current: errors.append( SmifDataInputError( conf_key, '(Dependency %s) Circular dependencies are not allowed.' % (idx + 1), 'Smif does not support self-dependencies unless the dependency is on ' + 'output from a previous timestep.')) return errors def _validate_dependency_in_sos_model(idx, dependency, configuration, conf_key, source_key, sink_key): errors = [] # Source / Sink must be enabled in sos_model config if dependency['source'] not in configuration[source_key]: errors.append( SmifDataInputError( conf_key, '(Dependency %s) Source `%s` is not enabled.' % (idx + 1, dependency['source']), 'Each dependency source must be enabled in the sos-model')) if dependency['sink'] not in configuration[sink_key]: errors.append( SmifDataInputError( conf_key, '(Dependency %s) Sink `%s` is not enabled.' % (idx + 1, dependency['sink']), 'Each dependency sink must be enabled in the sos-model')) # Sink can only have a single dependency dep_sinks = [ (dependency['sink'], dependency['sink_input']) for dependency in configuration[conf_key] ] if dep_sinks.count((dependency['sink'], dependency['sink_input'])) > 1: errors.append( SmifDataInputError( conf_key, '(Dependency %s) Sink input `%s` is driven by multiple sources.' % (idx + 1, dependency['sink_input']), 'A model input can only be driven by a single model output.')) return errors def _validate_dependency(idx, dependency, conf_key, source, source_key, sink, sink_key): errors = [] # Source and sink model configurations must exist source_model = [model for model in source if model['name'] == dependency['source']] sink_model = [model for model in sink if model['name'] == dependency['sink']] if not source_model: errors.append( SmifDataInputError( conf_key, '(Dependency %s) Source `%s` does not exist.' % (idx + 1, dependency['source']), 'Each dependency source must have a `%s` configuration.' % (source_key))) if not sink_model: errors.append( SmifDataInputError( conf_key, '(Dependency %s) Sink `%s` does not exist.' % (idx + 1, dependency['sink']), 'Each dependency sink must have a `%s` configuration.' % (sink_key))) if not sink_model or not source_model: # not worth doing further checks if source/sink does not exist return errors # Source_output and sink_input must exist if source_key == 'sector_models': source_model_outputs = [ output for output in source_model[0]['outputs'] if output['name'] == dependency['source_output'] ] if source_key == 'scenarios': source_model_outputs = [ output for output in source_model[0]['provides'] if output['name'] == dependency['source_output'] ] sink_model_inputs = [ input_ for input_ in sink_model[0]['inputs'] if input_['name'] == dependency['sink_input'] ] if not source_model_outputs: errors.append( SmifDataInputError( conf_key, '(Dependency %s) Source output `%s` does not exist.' % (idx + 1, dependency['source_output']), 'Each dependency source output must exist in the `%s` configuration.' % (source_key))) if not sink_model_inputs: errors.append( SmifDataInputError( conf_key, '(Dependency %s) Sink input `%s` does not exist.' % (idx + 1, dependency['sink_input']), 'Each dependency sink input must exist in the `%s` configuration.' % (sink_key))) if not source_model_outputs or not sink_model_inputs: # not worth doing further checks if source_output/sink_input does not exist return errors # Source_output and sink_input must have matching specs source_model_output = source_model_outputs[0] sink_model_input = sink_model_inputs[0] if 'dims' in source_model_output and 'dims' in sink_model_input: if sorted(source_model_output['dims']) != sorted(sink_model_input['dims']): errors.append( SmifDataInputError( conf_key, '(Dependency %s) Source `%s` has different dimensions than sink ' % ( idx + 1, source_model_output['name'] ) + '`%s` (%s != %s).' % ( sink_model_input['name'], source_model_output['dims'], sink_model_input['dims'] ), 'Dependencies must have matching dimensions.')) else: if 'dims' in source_model_output or 'dims' in sink_model_input: errors.append( SmifDataInputError( conf_key, '(Dependency %s) Source `%s` has different dimensions than sink ' % ( idx + 1, source_model_output['name'] ) + '`%s` (%s != %s).' % ( sink_model_input['name'], source_model_output, sink_model_input ), 'Dependencies must have matching dimensions.')) if source_model_output['dtype'] != sink_model_input['dtype']: errors.append( SmifDataInputError( conf_key, '(Dependency %s) Source `%s` has a different dtype than sink ' % ( idx + 1, source_model_output['name'], ) + '`%s` (%s != %s).' % ( sink_model_input['name'], source_model_output['dtype'], sink_model_input['dtype']), 'Dependencies must have matching data types.')) return errors
[docs]def validate_path_to_timesteps(timesteps): """Check timesteps is a path to timesteps file """ if not isinstance(timesteps, str): VALIDATION_ERRORS.append( SmifValidationError( "Expected 'timesteps' in main config to specify " + "a timesteps file, instead got {}.".format(timesteps)))
[docs]def validate_timesteps(timesteps, file_path): """Check timesteps is a list of integers """ if not isinstance(timesteps, list): msg = "Loading {}: expected a list of timesteps.".format(file_path) VALIDATION_ERRORS.append(SmifValidationError(msg)) else: msg = "Loading {}: timesteps should be integer years, instead got {}" for timestep in timesteps: if not isinstance(timestep, int): VALIDATION_ERRORS.append(msg.format(file_path, timestep))
[docs]def validate_time_intervals(intervals, file_path): """Check time intervals """ if not isinstance(intervals, list): msg = "Loading {}: expected a list of time intervals.".format(file_path) VALIDATION_ERRORS.append(SmifValidationError(msg)) else: for interval in intervals: validate_time_interval(interval)
[docs]def validate_time_interval(interval): """Check a single time interval """ if not isinstance(interval, dict): msg = "Expected a time interval, instead got {}.".format(interval) VALIDATION_ERRORS.append(SmifValidationError(msg)) return required_keys = ["id", "start", "end"] for key in required_keys: if key not in interval: fmt = "Expected a value for '{}' in each " + \ "time interval, only received {}" VALIDATION_ERRORS.append(SmifValidationError(fmt.format(key, interval)))
[docs]def validate_sector_models_initial_config(sector_models): """Check list of sector models initial configuration """ if not isinstance(sector_models, list): fmt = "Expected 'sector_models' in main config to " + \ "specify a list of sector models to run, instead got {}." VALIDATION_ERRORS.append(SmifValidationError(fmt.format(sector_models))) else: if len(sector_models) == 0: VALIDATION_ERRORS.append( SmifValidationError("No 'sector_models' specified in main config file.")) # check each sector model for sector_model_config in sector_models: validate_sector_model_initial_config(sector_model_config)
[docs]def validate_sector_model_initial_config(sector_model_config): """Check a single sector model initial configuration """ if not isinstance(sector_model_config, dict): fmt = "Expected a sector model config block, instead got {}" VALIDATION_ERRORS.append(SmifValidationError(fmt.format(sector_model_config))) return required_keys = ["name", "config_dir", "path", "classname"] for key in required_keys: if key not in sector_model_config: fmt = "Expected a value for '{}' in each " + \ "sector model in main config file, only received {}" VALIDATION_ERRORS.append(SmifValidationError(fmt.format(key, sector_model_config)))
[docs]def validate_dependency_spec(input_spec, model_name): """Check the input specification for a single sector model """ if not isinstance(input_spec, list): fmt = "Expected a list of parameter definitions in '{}' model " + \ "input specification, instead got {}" VALIDATION_ERRORS.append(SmifValidationError(fmt.format(model_name, input_spec))) return for dep in input_spec: validate_dependency(dep)
[docs]def validate_dependency(dep): """Check a dependency specification """ if not isinstance(dep, dict): fmt = "Expected a dependency specification, instead got {}" VALIDATION_ERRORS.append(SmifValidationError(fmt.format(dep))) return required_keys = ["name", "spatial_resolution", "temporal_resolution", "units"] for key in required_keys: if key not in dep: fmt = "Expected a value for '{}' in each model dependency, only received {}" VALIDATION_ERRORS.append(SmifValidationError(fmt.format(key, dep)))
[docs]def validate_scenario_data_config(scenario_data): """Check scenario data """ if not isinstance(scenario_data, list): fmt = "Expected a list of scenario datasets in main model config, " + \ "instead got {}" VALIDATION_ERRORS.append(SmifValidationError(fmt.format(scenario_data))) return for scenario in scenario_data: validate_scenario(scenario)
[docs]def validate_scenario(scenario): """Check a single scenario specification """ if not isinstance(scenario, dict): fmt = "Expected a scenario specification, instead got {}" VALIDATION_ERRORS.append(SmifValidationError(fmt.format(scenario))) return required_keys = ["parameter", "spatial_resolution", "temporal_resolution", "units", "file"] for key in required_keys: if key not in scenario: fmt = "Expected a value for '{}' in each scenario, only received {}" VALIDATION_ERRORS.append(SmifValidationError(fmt.format(key, scenario)))
[docs]def validate_scenario_data(data, file_path): """Check a list of scenario observations """ if not isinstance(data, list): fmt = "Expected a list of scenario data in {}" VALIDATION_ERRORS.append(SmifValidationError(fmt.format(file_path))) return for datum in data: validate_scenario_datum(datum, file_path)
[docs]def validate_scenario_datum(datum, file_path): """Check a single scenario datum """ if not isinstance(datum, dict): fmt = "Expected a scenario data point, instead got {}" VALIDATION_ERRORS.append(SmifValidationError(fmt.format(datum))) return required_keys = ["region", "interval", "year", "value"] for key in required_keys: if key not in datum: fmt = "Expected a value for '{}' in each data point in a scenario, " + \ "only received {}" VALIDATION_ERRORS.append(SmifValidationError(fmt.format(key, datum)))
[docs]def validate_initial_conditions(data, file_path): """Check a list of initial condition observations """ if not isinstance(data, list): fmt = "Expected a list of initial conditions in {}" VALIDATION_ERRORS.append(SmifValidationError(fmt.format(file_path))) return for datum in data: validate_initial_condition(datum, file_path)
[docs]def validate_initial_condition(datum, file_path): """Check a single initial condition datum """ if not isinstance(datum, dict): fmt = "Expected a initial condition data point, instead got {} from {}" VALIDATION_ERRORS.append(SmifValidationError(fmt.format(datum, file_path))) return required_keys = ["name", "build_date"] for key in required_keys: if key not in datum: fmt = "Expected a value for '{}' in each data point in a initial condition, " + \ "only received {} from {}" VALIDATION_ERRORS.append(SmifValidationError(fmt.format(key, datum, file_path)))
[docs]def validate_planning_config(planning): """Check planning options """ required_keys = ["pre_specified", "rule_based", "optimisation"] for key in required_keys: if key not in planning: fmt = "No '{}' settings specified under 'planning' " + \ "in main config file." VALIDATION_ERRORS.append(SmifValidationError(fmt.format(key))) # check each planning type for key, planning_type in planning.items(): if "use" not in planning_type: fmt = "No 'use' settings specified for '{}' 'planning'" VALIDATION_ERRORS.append(SmifValidationError(fmt.format(key))) continue if planning_type["use"]: if "files" not in planning_type or \ not isinstance(planning_type["files"], list) or \ len(planning_type["files"]) == 0: fmt = "No 'files' provided for the '{}' " + \ "planning type in main config file." VALIDATION_ERRORS.append(SmifValidationError(fmt.format(key)))
[docs]def validate_region_sets_config(region_sets): """Check regions sets """ required_keys = ["name", "file"] for key in required_keys: for region_set in region_sets: if key not in region_set: fmt = "Expected a value for '{}' in each " + \ "region set in main config file, only received {}" VALIDATION_ERRORS.append(SmifValidationError(fmt.format(key, region_set)))
[docs]def validate_interval_sets_config(interval_sets): """Check interval sets """ required_keys = ["name", "file"] for key in required_keys: for interval_set in interval_sets: if key not in interval_set: fmt = "Expected a value for '{}' in each " + \ "interval set in main config file, only received {}" VALIDATION_ERRORS.append(SmifValidationError(fmt.format(key, interval_set)))
[docs]def validate_interventions(data, path): """Validate the loaded data as required for model interventions """ # check required keys required_keys = ["name", "location", "capital_cost", "operational_lifetime", "economic_lifetime"] # except for some keys which are allowed simple values, # expect each attribute to be of the form {value: x, units: y} simple_keys = ["name", "sector", "location"] for intervention in data: for key in required_keys: if key not in intervention: fmt = "Loading interventions from {}, required " + \ "a value for '{}' in each intervention, but only " + \ "received {}" VALIDATION_ERRORS.append( SmifValidationError(fmt.format(path, key, intervention))) for key, value in intervention.items(): if key not in simple_keys and ( not isinstance(value, dict) or "value" not in value or "units" not in value): fmt = "Loading interventions from {3}, {0}.{1} was {2} but " + \ "should have specified units, " + \ "e.g. {{'value': {2}, 'units': 'm'}}" msg = fmt.format(intervention["name"], key, value, path) VALIDATION_ERRORS.append(SmifValidationError(msg))