Source code for smif.convert.region
"""Handles conversion between the sets of regions used in the `SosModel`
"""
from collections import namedtuple
from rtree import index # type: ignore
from shapely.geometry import mapping, shape # type: ignore
from shapely.validation import explain_validity # type: ignore
from smif.convert.adaptor import Adaptor
from smif.convert.register import NDimensionalRegister, ResolutionSet
__author__ = "Will Usher, Tom Russell"
__copyright__ = "Will Usher, Tom Russell"
__license__ = "mit"
[docs]class RegionAdaptor(Adaptor):
"""Convert regions, assuming uniform distributions where necessary"""
[docs] def generate_coefficients(self, from_spec, to_spec):
"""Generate conversion coefficients for spatial dimensions
Assumes that the Coordinates elements contain a 'feature' key whose value corresponds
to a GDAL vector feature represented as a dict, for example as returned by a `fiona`
reader.
"""
# find dimensions to convert
from_dim, to_dim = self.get_convert_dims(from_spec, to_spec)
# get dimension coordinates
from_coords = from_spec.dim_coords(from_dim)
to_coords = to_spec.dim_coords(to_dim)
# create RegionSets from Coordinates
from_set = RegionSet(from_dim, from_coords.elements)
to_set = RegionSet(to_dim, to_coords.elements)
# register RegionSets
register = NDimensionalRegister()
register.register(from_set)
register.register(to_set)
# use NDimensionalRegister to get coefficients
coefficients = register.get_coefficients(from_dim, to_dim)
return coefficients
NamedShape = namedtuple("NamedShape", ["name", "shape"])
[docs]class RegionSet(ResolutionSet):
"""Hold a set of regions, spatially indexed for ease of lookup when
constructing conversion matrices.
Parameters
----------
set_name : str
Name to use as identifier for this set of regions
elements: iterable
Iterable (probably a list or a reader handle)
of fiona feature records e.g. the 'features' entry of
a GeoJSON collection
"""
def __init__(self, set_name, elements):
super().__init__()
self.name = set_name
self._regions = []
self.data = [e["feature"] for e in elements]
self._idx = index.Index()
for pos, region in enumerate(self._regions):
self._idx.insert(pos, region.shape.bounds)
@property
def data(self):
return self._regions
@data.setter
def data(self, value):
names = {}
for region in value:
name = region["properties"]["name"]
if name in names:
msg = "Region set must have uniquely named regions - {} duplicated"
raise AssertionError(msg.format(name))
names[name] = True
self._regions.append(NamedShape(name, shape(region["geometry"])))
[docs] def get_entry_names(self):
return [region.name for region in self.data]
[docs] def as_features(self):
"""Get the regions as a list of feature dictionaries
Returns
-------
list
A list of GeoJSON-style dicts
"""
return [
{
"type": "Feature",
"geometry": mapping(region.shape),
"properties": {"name": region.name},
}
for region in self._regions
]
[docs] def centroids_as_features(self):
"""Get the region centroids as a list of feature dictionaries
Returns
-------
list
A list of GeoJSON-style dicts, with Point features corresponding to
region centroids
"""
return [
{
"type": "Feature",
"geometry": mapping(region.shape.centroid),
"properties": {"name": region.name},
}
for region in self._regions
]
[docs] def intersection(self, to_entry):
"""Return the set of regions intersecting with the bounds of `to_entry`"""
bounds = to_entry.shape.bounds
return [x for x in self._idx.intersection(bounds)]
[docs] def get_proportion(self, from_idx, entry_b):
"""Calculate the proportion of shape a that intersects with shape b"""
entry_a = self.data[from_idx]
if self.check_valid_shape(entry_a.shape):
if self.check_valid_shape(entry_b.shape):
intersection = entry_a.shape.intersection(entry_b.shape)
return intersection.area / entry_a.shape.area
else:
raise RuntimeError("Shape {} is not valid".format(entry_b.name))
else:
raise RuntimeError(
"Shape {} from {} is not valid".format(entry_a.name, self.name)
)
[docs] def check_valid_shape(self, shape):
if not shape.is_valid:
validity = explain_validity(shape)
print("Shape is not valid. Explanation: %s", validity)
return False
else:
return True
[docs] @staticmethod
def get_bounds(entry):
return entry.shape.bounds
@property
def coverage(self):
return sum([x.shape.area for x in self.data])
def __getitem__(self, key):
return self._regions[key]
def __len__(self):
return len(self._regions)