Source code for smif.convert.register

"""Register, ResolutionSet abstract classes to contain metadata and generate conversion
coefficients.

NDimensionalRegister is used in :class:`smif.convert.interval.IntervalAdaptor` and
:class:`smif.convert.region.RegionAdaptor`.
"""
import logging
from abc import ABCMeta, abstractmethod
from collections import OrderedDict, defaultdict
from typing import Dict, List

import numpy as np  # type: ignore


[docs]class ResolutionSet(metaclass=ABCMeta): """Abstract class which holds the Resolution definitions""" def __init__(self): self.name = "" self.description = "" self._data = [] self.logger = logging.getLogger(__name__)
[docs] def as_dict(self): """Get a serialisable representation of the object""" return {"name": self.name, "description": self.description}
def __iter__(self): return iter(self.data) def __len__(self): return len(self.data) @property def data(self): """Resolution set data Returns ------- list """ return self._data @data.setter def data(self, data): self._data = data
[docs] @abstractmethod def get_entry_names(self): """Get the names of the entries in the ResolutionSet Returns ------- set The set of names which identify each entry in the ResolutionSet """ raise NotImplementedError
[docs] @abstractmethod def intersection(self, bounds): """Return the subset of entries intersecting with the bounds""" raise NotImplementedError
[docs] @abstractmethod def get_proportion(self, entry_a, entry_b): """Calculate the proportion of `entry_a` and `entry_b` Arguments --------- entry_a : string Name of an entry in `ResolutionSet` entry_b : string Name of an entry in `ResolutionSet` Returns ------- float The proportion of `entry_a` and `entry_b` """ raise NotImplementedError
@property @abstractmethod def coverage(self): raise NotImplementedError
[docs] @staticmethod @abstractmethod def get_bounds(entry): """Implement this helper method to return bounds from an entry in the register Arguments --------- entry An entry from a ResolutionSet Returns ------- bounds The bounds of the entry """ raise NotImplementedError
[docs]class LogMixin(object): @property def logger(self): try: logger = self._logger except AttributeError: name = ".".join([__name__, self.__class__.__name__]) logger = logging.getLogger(name) self._logger = logger return self._logger @logger.setter def logger(self, logger): self._logger = logger
[docs]class Register(LogMixin, metaclass=ABCMeta): """Abstract class which holds the ResolutionSets Arguments --------- axis : int, default=None The axis over which operations on the data array are performed """ store = None def __init__(self, axis=None): self.axis = axis @property @abstractmethod def names(self): raise NotImplementedError
[docs] @abstractmethod def register(self, resolution_set: ResolutionSet): raise NotImplementedError
[docs] @abstractmethod def get_coefficients(self, source: str, destination: str): raise NotImplementedError
[docs] def convert( self, data: np.ndarray, from_set_name: str, to_set_name: str ) -> np.ndarray: """Convert a list of data points for a given set to another set .. deprecated Usage superceded by Adaptor.convert Parameters ---------- data: numpy.ndarray from_set_name: str to_set_name: str Returns ------- numpy.ndarray """ coefficients = self.get_coefficients(from_set_name, to_set_name) converted = Register.convert_with_coefficients(data, coefficients, self.axis) self.logger.debug("Converting from %s to %s.", from_set_name, to_set_name) self.logger.debug("Converted value from %s to %s", data.sum(), converted.sum()) return converted
[docs] @staticmethod def convert_with_coefficients( data, coefficients: np.ndarray, axis=None ) -> np.ndarray: """Convert an array of data using given coefficients, along a given axis .. deprecated Usage superceded by Adaptor.convert Parameters ---------- data: numpy.ndarray coefficients: numpy.ndarray axis: integer, optional Returns ------- numpy.ndarray """ if axis is not None: data_count = data.shape[axis] if coefficients.shape[0] != data_count: msg = ( "Size of coefficient array does not match source " "resolution set from data matrix: %s != %s" ) raise ValueError(msg, coefficients.shape[axis], data_count) if axis == 0: converted = np.dot(coefficients.T, data) elif axis == 1: converted = np.dot(data, coefficients) else: converted = np.dot(data, coefficients) return converted
[docs]class NDimensionalRegister(Register): """Abstract class which holds N-Dimensional ResolutionSets Arguments --------- axis : int, default=None The axis over which operations on the data array are performed """ def __init__(self, axis=None): super().__init__(axis) self._register = OrderedDict() # type: Dict[str, ResolutionSet] self._conversions = defaultdict(dict)
[docs] def register(self, resolution_set: ResolutionSet): """Add a ResolutionSet to the register Parameters ---------- resolution_set : :class:`smif.convert.ResolutionSet` Raises ------ ValueError If a ResolutionSet of the same name already exists in the register """ if resolution_set.name in self._register: msg = "A ResolutionSet named {} has already been loaded" raise ValueError(msg.format(resolution_set.name)) self.logger.info( "Registering '%s' with %i items", resolution_set.name, len(resolution_set) ) self._register[resolution_set.name] = resolution_set
@property def names(self) -> List[str]: """Names of registered region sets Returns ------- sets: list[str] """ return list(self._register.keys())
[docs] def get_entry(self, name: str) -> ResolutionSet: """Returns the ResolutionSet of `name` Arguments --------- name : str The unique identifier of a ResolutionSet in the register Returns ------- smif.convert.ResolutionSet """ if name not in self._register: msg = "ResolutionSet '{}' not registered" raise ValueError(msg.format(name)) return self._register[name]
def _write_coefficients(self, source, destination, data: np.ndarray): if self.store: self.store.write_coefficients(source, destination, data) else: msg = "Data interface not available to write coefficients" self.logger.warning(msg)
[docs] def get_coefficients(self, source: str, destination: str) -> np.ndarray: """Get coefficients representing intersection of sets Arguments --------- source : string The name of the source set destination : string The name of the destination set Returns ------- numpy.ndarray """ from_set = self.get_entry(source) to_set = self.get_entry(destination) if from_set.coverage != to_set.coverage: log_msg = ( "Coverage for '%s' is %d and does not match coverage " "for '%s' which is %d" ) self.logger.warning( log_msg, from_set.name, from_set.coverage, to_set.name, to_set.coverage ) coefficients = self.generate_coefficients(from_set, to_set) return coefficients
[docs] def generate_coefficients( self, from_set: ResolutionSet, to_set: ResolutionSet ) -> np.ndarray: """Generate coefficients for converting between two :class:`ResolutionSet`s Coefficients for converting a single dimension will always be 2D, of shape (len(from_set), len(to_set)). Parameters ---------- from_set : ResolutionSet to_set : ResolutionSet Returns ------- numpy.ndarray """ coefficients = np.zeros((len(from_set), len(to_set)), dtype=np.float64) self.logger.debug( "Coefficients array is of shape %s for %s to %s", coefficients.shape, from_set.name, to_set.name, ) from_names = from_set.get_entry_names() for to_idx, to_entry in enumerate(to_set): for from_idx in from_set.intersection(to_entry): from_entry = from_set.data[from_idx] proportion = from_set.get_proportion(from_idx, to_entry) self.logger.debug( "%i percent of %s (#%s) is in %s (#%s)", proportion * 100, to_entry.name, to_idx, from_entry.name, from_idx, ) from_idx = from_names.index(from_entry.name) coefficients[from_idx, to_idx] = proportion self.logger.debug("Generated %s", coefficients) return coefficients