Source code for smif.convert.area

"""Handles conversion between the sets of regions used in the `SosModel`
"""
import logging
from collections import OrderedDict, defaultdict, namedtuple

import numpy as np
from rtree import index
from shapely.geometry import shape

from smif.convert.register import Register, ResolutionSet

__author__ = "Will Usher, Tom Russell"
__copyright__ = "Will Usher, Tom Russell"
__license__ = "mit"


[docs]def proportion_of_a_intersecting_b(shape_a, shape_b): """Calculate the proportion of shape a that intersects with shape b """ intersection = shape_a.intersection(shape_b) return intersection.area / shape_a.area
NamedShape = namedtuple('NamedShape', ['name', 'shape'])
[docs]class RegionSet(ResolutionSet): """Hold a set of regions, spatially indexed for ease of lookup when constructing conversion matrices. Parameters ---------- set_name : str Name to use as identifier for this set of regions fiona_shape_iter: iterable Iterable (probably a list or a reader handle) of fiona feature records e.g. the 'features' entry of a GeoJSON collection """ def __init__(self, set_name, fiona_shape_iter): self._name = set_name self.data = fiona_shape_iter self._idx = index.Index() for pos, region in enumerate(self._regions): self._idx.insert(pos, region.shape.bounds) @property def name(self): return self._name @name.setter def name(self, value): self._name = value @property def data(self): return self._regions @data.setter def data(self, value): self._regions = [] names = {} for region in value: name = region['properties']['name'] if name in names: raise AssertionError( "Region set must have uniquely named regions - %s duplicated", name) names[name] = True self._regions.append( NamedShape( name, shape(region['geometry']) ) )
[docs] def get_entry_names(self): return [region.name for region in self.data]
[docs] def intersection(self, bounds): """Return the subset of regions intersecting with a bounding box """ return [self._regions[pos] for pos in self._idx.intersection(bounds)]
def __getitem__(self, key): return self._regions[key] def __len__(self): return len(self._regions)
[docs]class RegionRegister(Register): """Holds the sets of regions used by the SectorModels and provides conversion between data values relating to compatible sets of regions. """ def __init__(self): self._register = OrderedDict() self._conversions = defaultdict(dict) self.logger = logging.getLogger(__name__) @property def names(self): """Names of registered region sets Returns ------- sets: list of str """ return list(self._register.keys())
[docs] def get_entry(self, name): """Returns the ResolutionSet of `name` Arguments --------- name : str The unique identifier of a ResolutionSet in the register Returns ------- smif.convert.area.RegionSet """ if name not in self._register: raise ValueError("Region set '{}' not registered".format(name)) return self._register[name]
[docs] def register(self, region_set): """Register a set of regions as a source/target for conversion """ if region_set.name in self._register: msg = "A region set named {} has already been loaded" raise ValueError(msg.format(region_set.name)) already_registered = self.names self._register[region_set.name] = region_set for other_set_name in already_registered: self._generate_coefficients(region_set, self._register[other_set_name])
[docs] def convert(self, data, from_set_name, to_set_name): """Convert a list of data points for a given set of regions to another set of regions. Parameters ---------- data: numpy.ndarray with dimension regions from_set_name: str to_set_name: str """ from_set = self.get_entry(from_set_name) from_set_names = from_set.get_entry_names() to_set = self.get_entry(to_set_name) to_set_names = to_set.get_entry_names() converted = np.zeros(len(to_set)) coefficents = self._conversions[from_set.name][to_set.name] for from_region_name, from_value in zip(from_set_names, data): for to_region_name, coef in coefficents[from_region_name]: to_region_idx = to_set_names.index(to_region_name) converted[to_region_idx] += coef*from_value return converted
def _generate_coefficients(self, set_a, set_b): # from a to b self._conversions[set_a.name][set_b.name] = self._conversion_coefficients(set_a, set_b) # from b to a self._conversions[set_b.name][set_a.name] = self._conversion_coefficients(set_b, set_a) @staticmethod def _conversion_coefficients(from_set, to_set): """Return a dict containing the proportions of to_regions intersecting with each given from_region:: { from_region.name: [ (to_region.name, proportion), # ... ], # ... } """ coefficients = defaultdict(list) for to_region in to_set: intersecting_from_regions = from_set.intersection(to_region.shape.bounds) for from_region in intersecting_from_regions: proportion = proportion_of_a_intersecting_b(from_region.shape, to_region.shape) coefficient_pair = (to_region.name, proportion) coefficients[from_region.name].append(coefficient_pair) return coefficients
__REGISTER = RegionRegister()
[docs]def get_register(): """Return single copy of RegionRegister """ return __REGISTER