Source code for smif.data_layer.file.file_metadata_store

"""File-backed metadata store
"""
import copy
import json
import os
from functools import lru_cache
from logging import getLogger
from typing import Dict, List

import pandas  # type: ignore
from ruamel.yaml import YAML  # type: ignore
from smif.data_layer.abstract_metadata_store import MetadataStore
from smif.exception import SmifDataNotFoundError, SmifDataReadError

# Import fiona if available (optional dependency)
try:
    import fiona  # type: ignore
except ImportError:
    pass


[docs]class FileMetadataStore(MetadataStore): """File-based metadata store (supports YAML, CSV, or GDAL-compatible files)""" def __init__(self, base_folder): super().__init__() self.logger = getLogger(__name__) base_folder = str(base_folder) self.units_path = os.path.join(base_folder, "data", "user-defined-units.txt") self.data_folder = os.path.join(base_folder, "data", "dimensions") self.config_folder = os.path.join(base_folder, "config", "dimensions") # region Units
[docs] def read_unit_definitions(self) -> List[str]: try: with open(self.units_path, "r") as units_fh: return [line.strip() for line in units_fh] except FileNotFoundError: self.logger.warning( "Units file not found, expected at %s", str(self.units_path) ) return []
[docs] def write_unit_definitions(self, definitions: List[str]): with open(self.units_path, "w") as units_fh: units_fh.writelines(definitions)
# endregion # region Dimensions
[docs] def read_dimensions(self, skip_coords=False) -> List[dict]: dim_names = _read_filenames_in_dir(self.config_folder, ".yml") return [self.read_dimension(name, skip_coords) for name in dim_names]
[docs] def read_dimension(self, dimension_name: str, skip_coords=False): dim = _read_yaml_file(self.config_folder, dimension_name) if skip_coords: del dim["elements"] else: dim["elements"] = self._read_dimension_file(dim["elements"]) return dim
[docs] def write_dimension(self, dimension: Dict): # write elements to csv file (by default, can handle any nested data) elements_filename = "{}.csv".format(dimension["name"]) elements = dimension["elements"] self._write_dimension_file(elements_filename, elements) # refer to elements by filename and add to config dimension_with_ref = copy.copy(dimension) dimension_with_ref["elements"] = elements_filename _write_yaml_file(self.config_folder, dimension["name"], dimension_with_ref)
[docs] def update_dimension(self, dimension_name: str, dimension: Dict): # look up elements filename and write elements old_dim = _read_yaml_file(self.config_folder, dimension_name) elements_filename = old_dim["elements"] elements = dimension["elements"] self._write_dimension_file(elements_filename, elements) # refer to elements by filename and update config dimension_with_ref = copy.copy(dimension) dimension_with_ref["elements"] = elements_filename _write_yaml_file(self.config_folder, dimension_name, dimension_with_ref)
[docs] def delete_dimension(self, dimension_name: str): # read to find filename old_dim = _read_yaml_file(self.config_folder, dimension_name) elements_filename = old_dim["elements"] # remove elements data os.remove(os.path.join(self.data_folder, elements_filename)) # remove description os.remove(os.path.join(self.config_folder, "{}.yml".format(dimension_name)))
@lru_cache(maxsize=32) def _read_dimension_file(self, filename: str) -> List[Dict]: filepath = os.path.join(self.data_folder, filename) filebasename, ext = os.path.splitext(filename) if ext == ".csv": dataframe = pandas.read_csv(filepath) data = dataframe.to_dict("records") if "interval" in data[0]: data = self._unstringify_interval(data) elif ext in (".geojson", ".shp"): data = self._read_spatial_file(filepath) else: msg = "Extension '{}' not recognised, expected one of ('.csv', " msg += "'.geojson', '.shp') when reading {}" raise SmifDataReadError(msg.format(ext, filepath)) return data def _write_dimension_file(self, filename: str, data: List[Dict]): # lru_cache may now be invalid, so clear it self._read_dimension_file.cache_clear() path = os.path.join(self.data_folder, filename) filebasename, ext = os.path.splitext(filename) if ext == ".csv": if "interval" in data[0]: data = self._stringify_interval(data) pandas.DataFrame.from_records(data).to_csv(path, index=False) elif ext in (".geojson", ".shp"): raise NotImplementedError("Writing spatial dimensions not yet supported") # self._write_spatial_file(filepath) else: msg = "Extension '{}' not recognised, expected one of ('.csv', " msg += "'.geojson', '.shp') when writing {}" raise SmifDataReadError(msg.format(ext, path)) return data def _stringify_interval(self, data: List[Dict]) -> List[Dict]: output = [] for item in data: output_item = copy.copy(item) try: output_item["interval"] = json.dumps(item["interval"]) except KeyError: self.logger.warning("Expected interval in element %s", item) output.append(output_item) return output def _unstringify_interval(self, data: List[Dict]) -> List[Dict]: output = [] for item in data: output_item = copy.copy(item) try: output_item["interval"] = json.loads(item["interval"]) except KeyError: self.logger.warning("Expected interval in element %s", item) output.append(output_item) return output # endregion @staticmethod def _read_spatial_file(filepath) -> List[Dict]: try: with fiona.Env(): return _read_spatial_data(filepath) except AttributeError: # older fiona versions with fiona.drivers(): return _read_spatial_data(filepath) except NameError as ex: msg = "Could not read spatial dimension definition '%s' " % (filepath) msg += "Please install fiona to read geographic data files. Try running: \n" msg += " pip install smif[spatial]\n" msg += "or:\n" msg += " conda install fiona shapely rtree\n" raise SmifDataReadError(msg) from ex except IOError as ex: msg = "Could not read spatial dimension definition '%s' " % (filepath) msg += "Please verify that the path is correct and " msg += "that the file is present on this location." raise SmifDataNotFoundError(msg) from ex
def _read_spatial_data(filepath): data = [] with fiona.open(filepath) as src: for feature in src: element = {"name": feature["properties"]["name"], "feature": feature} data.append(element) return data def _read_yaml_file(directory, name): """Parse yaml config file into plain data (lists, dicts and simple values) Parameters ---------- directory : str name : str file basename (without yml extension) """ path = os.path.join(directory, "{}.yml".format(name)) with open(path, "r") as file_handle: return YAML().load(file_handle) def _write_yaml_file(directory, name, data): """Write plain data to a file as yaml Parameters ---------- directory : str name : str file basename (without yml extension) data Data to write (should be lists, dicts and simple values) """ path = os.path.join(directory, "{}.yml".format(name)) with open(path, "w") as file_handle: yaml = YAML() yaml.default_flow_style = False yaml.allow_unicode = True return yaml.dump(data, file_handle) def _read_filenames_in_dir(path, extension): """Returns the name of the Yaml files in a certain directory Arguments --------- path: str Path to directory extension: str Extension of files (such as: '.yml' or '.csv') Returns ------- list The list of files in `path` with extension """ files = [] for filename in os.listdir(path): if filename.endswith(extension): basename, _ = os.path.splitext(filename) files.append(basename) return files