# -*- coding: utf-8 -*-
# Copyright 2024, SERTIT-ICube - France, https://sertit.unistra.fr/
# This file is part of sertit-utils project
# https://github.com/sertit/sertit-utils
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Geometry tools
You can use this only if you have installed sertit[full] or sertit[vectors]
"""
import logging
import geopandas as gpd
import numpy as np
import shapely
from shapely import ops
from shapely.errors import GeometryTypeError
from shapely.geometry import Polygon, box
from tqdm import tqdm
from sertit import vectors
from sertit.logs import SU_NAME
from sertit.types import AnyPolygonType
LOGGER = logging.getLogger(SU_NAME)
[docs]
def from_polygon_to_bounds(polygon: AnyPolygonType) -> (float, float, float, float):
"""
Convert a :code:`shapely.polygon` to its bounds, sorted as :code:`left, bottom, right, top`.
Args:
polygon (MultiPolygon): polygon to convert
Returns:
(float, float, float, float): left, bottom, right, top
Example:
>>> poly = Polygon(((0., 0.), (0., 1.), (1., 1.), (1., 0.), (0., 0.)))
>>> from_polygon_to_bounds(poly)
(0.0, 0.0, 1.0, 1.0)
"""
left = polygon.bounds[0] # xmin
bottom = polygon.bounds[1] # ymin
right = polygon.bounds[2] # xmax
top = polygon.bounds[3] # ymax
assert left < right
assert bottom < top
return left, bottom, right, top
[docs]
def from_bounds_to_polygon(
left: float, bottom: float, right: float, top: float
) -> Polygon:
"""
Convert the bounds to a :code:`shapely.polygon`.
Args:
left (float): Left coordinates
bottom (float): Bottom coordinates
right (float): Right coordinates
top (float): Top coordinates
Returns:
Polygon: Polygon corresponding to the bounds
Example:
>>> poly = from_bounds_to_polygon(0.0, 0.0, 1.0, 1.0)
>>> print(poly)
'POLYGON ((1 0, 1 1, 0 1, 0 0, 1 0))'
"""
return box(min(left, right), min(top, bottom), max(left, right), max(top, bottom))
[docs]
def get_wider_exterior(vector: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""
Get the wider exterior of a MultiPolygon as a Polygon
Args:
vector (gpd.GeoDataFrame): Polygon to simplify
Returns:
vector: gpd.GeoDataFrame: Wider exterior
Example:
>>> # Open a raw footprint
>>> footprint_raw = vectors.read("footprint_raw.geojson")
geometry
0 MULTIPOLYGON (((491053.524 5616778.498, 491262...
1 MULTIPOLYGON (((491314.496 5616444.620, 491295...
2 MULTIPOLYGON (((490783.440 5616102.457, 490923...
>>>
>>> # Get the wider exterior
>>> get_wider_exterior(footprint_raw)
geometry
0 POLYGON ((491053.524 5616778.498, 491262.302 5...
"""
vector = vector.explode(index_parts=True)
# Get the footprint max (discard small holes stored in other polygons)
wider = vector[vector.area == np.max(vector.area)]
# Only select the exterior of this footprint(sometimes some holes persist)
if not wider.empty:
poly = Polygon(list(wider.exterior.iat[0].coords))
wider = gpd.GeoDataFrame(geometry=[poly], crs=wider.crs)
# Resets index as we only got one polygon left which should have index 0
wider.reset_index(inplace=True)
return wider
[docs]
def make_valid(gdf: gpd.GeoDataFrame, verbose=False) -> gpd.GeoDataFrame:
"""
Repair geometries from a dataframe.
Better to use :code:`gpd.make_valid` if you can.
Args:
gdf (gpd.GeoDataFrame): GeoDataFrame to repair
verbose (bool): Verbose invalid geometries
Returns:
gpd.GeoDataFrame: Repaired geometries
Example:
>>> # Open a raw vector with invalid geometries
>>> raw = vectors.read("raw.geojson")
geometry
0 MULTIPOLYGON (((491053.524 5616778.498, 491262...
1 MULTIPOLYGON (((491314.496 5616444.620, 491295...
2 MULTIPOLYGON (((490783.440 5616102.457, 490923...
>>>
>>> # Get the valid geometries
>>> make_valid(raw)
geometry
1 MULTIPOLYGON (((491314.496 5616444.620, 491295...
"""
try:
geos_logger = logging.getLogger("shapely.geos")
previous_level = geos_logger.level
if verbose:
logging.debug("Invalid geometries:\n" f"\t{gdf[~gdf.is_valid]}")
else:
geos_logger.setLevel(logging.CRITICAL)
# Discard self-intersection and null geometries
from shapely.validation import make_valid
gdf.geometry = gdf.geometry.apply(make_valid)
if not verbose:
geos_logger.setLevel(previous_level)
except ImportError:
import shapely
LOGGER.warning(
f"'make_valid' not available in 'shapely' (version {shapely.__version__} < 1.8). "
f"The obtained vector may be broken !"
)
return gdf
[docs]
def fill_polygon_holes(
gpd_results: gpd.GeoDataFrame, threshold: float = None
) -> gpd.GeoDataFrame:
"""
Fill holes over a given threshold on the hole area (in meters) for all polygons of a GeoDataFrame.
If the threshold is set to None, every hole is filled.
Args:
gpd_results (gpd.GeoDataFrame): Geodataframe filled whith drilled polygons
threshold (float): Holes area threshold, in meters. If set to None, every hole is filled.
Returns:
gpd.GeoDataFrame: GeoDataFrame with filled holes
Example:
>>> # Open a polygon with holes
>>> holes = vectors.read("holes.geojson")
>>> # This polygons has interiors features, it has holes
>>> holes.interiors
3 [LINEARRING (491328.9981955575 5616655.8234532...
dtype: object
>>> no_holes = fill_polygon_holes(holes)
Processing objects: 100%|██████████| 1/1 [00:00<00:00, 897.18it/s]
>>> no_holes.interiors
0 []
dtype: object
"""
def _fill_polygon(polygon: Polygon, threshold: float = None):
"""
Function used in apply in fill_polygon_holes
"""
if threshold is not None:
if threshold > 0 and len(polygon.interiors) > 0:
new_interiors = list(polygon.interiors)
for interior in polygon.interiors:
interior_poly = Polygon(np.array(interior.coords))
if interior_poly.area < threshold:
new_interiors.remove(interior)
return Polygon(polygon.exterior, new_interiors)
else:
return polygon
else:
return Polygon(polygon.exterior)
# Ensure the geometries are valid
gpd_results = make_valid(gpd_results)
# Check if vector is projected, if not convert it
with vectors.utm_crs(gpd_results) as utm_results:
# Explode the multipolygons
utm_results = utm_results.explode(index_parts=False)
# Keep only the exterior
tqdm.pandas(desc="Processing objects")
utm_results.geometry = utm_results.geometry.progress_apply(
_fill_polygon, args=(threshold,)
)
# Dissolve and explode to remove polygons in polygons
utm_results = utm_results.dissolve()
gpd_results = utm_results.explode(index_parts=False)
# Write back to file
return gpd_results
[docs]
def line_merge(lines: gpd.GeoDataFrame, **kwargs) -> gpd.GeoDataFrame:
"""
:code:`shapely.line_merge` algorithm applied to a GeoDataFrame.
See the corresponding documentation for more insights about the details of this function.
Args:
lines (gpd.GeoDataFrame): MultiLineString as a GeoDataFrame.
Returns:
gpd.GeoDataFrame: GeoDataFrame composed of (Multi)LineStrings formed by combining the lines of the input GeoDataFrame
Example:
>>> import geopandas as gpd
>>> from sertit import geometry
>>> lines = gpd.read("my_lines.shp")
>>> poly = gpd.read("my_poly.shp")
geometry
0 POLYGON ((491460.248 5616687.073, 491460.248 5...
>>> geometry.split(poly, splitter=lines)
The lines are discontinuous and don't intersect totally the input polygon: nothing is splitted
geometry
0 POLYGON ((491460.248 5616687.073, 491460.248 5...
>>> lines = geometry.line_merge(lines)
>>> geometry.split(poly, splitter=lines)
The lines are now merged into one and now intersect totally the input polygon: split is done
geometry
0 POLYGON ((491460.248 5616687.073, 491460.248 5...
0 POLYGON ((491055.017 5616255.823, 491053.998 5...
"""
merge_lines = shapely.line_merge(lines.dissolve().geometry.values, **kwargs)
return gpd.GeoDataFrame(geometry=merge_lines, crs=lines.crs).explode(
ignore_index=True
)
[docs]
def split(polygons: gpd.GeoDataFrame, splitter: gpd.GeoDataFrame):
"""
Split polygons with polygons or lines.
:code:`shapely.ops.split` algorithm applied to GeoDataFrames.
Be careful: lines have to cut the whole polygon to work!
Use :code:`geometry.line_merge: to merge your lines if needed.
Args:
polygons (gpd.GeoDataFrame): Polygons to split
splitter (gpd.GeoDataFrame): Splitter to split the polygons
Returns:
gpd.GeoDataFrame: Split GeoDataFrame
Example:
>>> import geopandas as gpd
>>> from sertit import geometry
>>> lines = gpd.read("my_lines.shp")
>>> poly = gpd.read("my_poly.shp")
geometry
0 POLYGON ((491460.248 5616687.073, 491460.248 5...
>>> split_poly = geometry.split(poly, splitter=lines)
geometry
0 POLYGON ((491460.248 5616687.073, 491460.248 5...
0 POLYGON ((491055.017 5616255.823, 491053.998 5...
"""
out = polygons.dropna(axis=1).geometry
for _, split in splitter.iterrows():
# Compute the boundary of the splitter polygon (to get a LineString)
if split.geometry.area > 0:
boundary = split.geometry.boundary
else:
boundary = split.geometry
# Explode to prevent FeatureCollections
try:
# LineStrings
out = (
out.map(lambda geom: ops.split(geom, boundary).geoms).explode().dropna()
)
except GeometryTypeError:
# MultiLineStrings
for line in boundary.geoms:
out = (
out.map(lambda geom: ops.split(geom, line).geoms).explode().dropna()
)
return gpd.GeoDataFrame(geometry=out.explode(), crs=polygons.crs)
[docs]
def intersects(
input: gpd.GeoDataFrame, other: gpd.GeoDataFrame, buffer_on_input: float = None
) -> gpd.GeoDataFrame:
"""
Select the polygons of the input GeoDataFrame that intersects the other one and return them.
A buffer can be added on the input GeoDataFrame to be sure the intersection is ok.
For example, when dealing with polygons and points on their borders, sometimes the points fall a tiny bit outside the polygon boundary and is not considred as intersecting.
:code:`gpd.intersects` algorithm applied to whole GeoDataFrames.
Args:
input (gpd.GeoDataFrame): Input GeoDataFrame from which the polygons will be selected
other (gpd.GeoDataFrame): Other GeoDataFrame from that will intersect the first one
buffer_on_input (float): Buffer size on thhe input GeoDataFrame
Returns:
gpd.GeoDataFrame: Polygons of the input that intersects the other GeoDataFrame
Examples:
>>> water = vectors.read("water.geojson")
>>> lakes = vectors.read("lakes.geojson")
>>> intersects(water, lakes)
geometry
2 POLYGON ((490733.035 5616749.035, 490936.972 5...
3 POLYGON ((491254.800 5616242.894, 491175.035 5...
>>> water = vectors.read("water.geojson")
>>> samples = vectors.read("samples.geojson")
>>>
>>> # Some samples are on the polygon boundary but for some reason fall a tiny bit outside and don't intersect the water
>>> intersects(water, samples)
geometry
2 POLYGON ((490733.035 5616749.035, 490936.972 5...
>>>
>>> # Some samples are on the polygon boundary but for some reason fall a tiny bit outside and don't intersect the water
>>> # Use the buffer to handle this case. Use it carefully!
>>> intersects(water, samples, buffer_on_input=0.1)
geometry
2 POLYGON ((490733.035 5616749.035, 490936.972 5...
3 POLYGON ((491254.800 5616242.894, 491175.035 5...
"""
if buffer_on_input is not None:
input_to_intersect = buffer(input, buffer_on_input)
else:
input_to_intersect = input
return input[
input_to_intersect.geometry.map(lambda x: x.intersects(other.geometry).any())
]
[docs]
def buffer(vector: gpd.GeoDataFrame, buffer_m: float, **kwargs) -> gpd.GeoDataFrame:
"""
Add a buffer on a vector.
:code:`gpd.buffer` algorithm returning a GeoDataFrame instead of a GeoSeries.
Args:
vector (gpd.GeoDataFrame): Input vector
buffer_m (int): Buffer size in meters.
**kwargs: Other buffer arguments
Returns:
gpd.GeoDataFrame: Buffered vector
Example:
>>> lines = vectors.read(r"lines.shp")
>>> lines.area
0 0.0
1 0.0
dtype: float64
>>> lines_buffer = buffer(lines, 10)
>>> lines_buffer.area
0 5695.213033
1 5898.723719
dtype: float64
"""
vector_bfd = vector.copy()
vector_bfd.geometry = vector.buffer(buffer_m, **kwargs)
return vector_bfd
[docs]
def nearest_neighbors(
src_gdf: gpd.GeoDataFrame,
candidates_gdf: gpd.GeoDataFrame,
method: str,
k_neighbors: int = None,
radius: float = None,
**kwargs,
) -> (np.ndarray, np.ndarray):
"""
For each point in src_gdf, find the closest point in candidates_gdf and return them with their distances (in the crs coordinates).
Closest points are:
- if method == :code:`k_neighbors`: the k closest neighbors
- if method == :code:`radius`: the neighbors inside this radius (in the crs coordinates, better done with projected geometries)
Args:
src_gdf (gpd.GeoDataFrame): Source geodataframe
candidates_gdf (gpd.GeoDataFrame): Candidates geodataframe
method (str): 'k_neighbors' or 'radius'
k_neighbors (int): Number of neighbors to be looked for
radius (float): Radius in which to find the neighbors
**kwargs: Other args for the query
Returns:
(np.ndarray, np.ndarray): closest samples, distances
Examples:
>>> from sertit import geometry, vectors
>>> src = vectors.read("src.shp")
>>> candidates = vectors.read("candidates.shp")
>>> # There is only one point in the neighborhood of each src, the others are further than 100m
>>>
>>> # Radius method
>>> nearest_neighbors(src, candidates, method="radius", radius=100)
[array([13]) array([12]) array([0])], [array([39.62574458]) array([50.37121574]) array([90.98648454])]
>>>
>>> # k_neighbors method
>>> nearest_neighbors(src, candidates, method="k_neighbors", k_neighbors=1)
[array([13]) array([12]) array([0])], [array([39.62574458]) array([50.37121574]) array([90.98648454])]
"""
# Parse coordinates from points and insert them into a numpy array as RADIANS
src = [(val.xy[0][0], val.xy[1][0]) for val in src_gdf.geometry.values]
candidates = [
(val.xy[0][0], val.xy[1][0]) for val in candidates_gdf.geometry.values
]
# Find the nearest points
# -----------------------
# closest ==> index in right_gdf that corresponds to the closest point
# dist ==> distance between the nearest neighbors (in the crs coordinates)
if method == "k_neighbors":
closest_samples, distances = _get_k_nearest(
src_points=src, candidates=candidates, k_neighbors=k_neighbors, **kwargs
)
else:
closest_samples, distances = _get_radius_nearest(
src_points=src, candidates=candidates, radius=radius, **kwargs
)
return closest_samples, distances
def _get_k_nearest(src_points: list, candidates: list, k_neighbors: int, **kwargs):
"""
For each point in src_gdf, find the nearest k points in candidates_gdf and return them with their distance.
Args:
src_points (list): Source points
candidates (list): Candidate points
k_neighbors (int): Number of neighbors to be looked for
**kwargs: Other args for the query
Returns:
(np.ndarray, np.ndarray): closest samples, distances
"""
try:
from sklearn.neighbors import BallTree
except ModuleNotFoundError as ex:
raise ModuleNotFoundError(
"Please install 'sklearn' the 'geometry.nearest_neighbors' function."
) from ex
# Create tree from the candidate points
tree = BallTree(candidates, leaf_size=15)
# Find the closest points and distances
closest_dist, closest = tree.query(
src_points, k=min(k_neighbors, len(candidates)), **kwargs
)
# Return indices and distances
return closest, closest_dist
def _get_radius_nearest(
src_points: list, candidates: list, radius: float, **kwargs
) -> (np.ndarray, np.ndarray):
"""
For each point in src_gdf, find the points in candidates_gdf inside the given radius and return them with their distance.
Args:
src_points (list): Source points
candidates (list): Candidate points
radius (float): Radius for the search
**kwargs: Other args for the query
Returns:
(np.ndarray, np.ndarray): closest samples, distances
"""
try:
from sklearn.neighbors import BallTree
except ModuleNotFoundError as ex:
raise ModuleNotFoundError(
"Please install 'sklearn' the 'geometry.nearest_neighbors' function."
) from ex
# Create tree from the candidate points
tree = BallTree(candidates, leaf_size=15)
# Find the closest points and distances
closest, closest_dist = tree.query_radius(
src_points, r=radius, return_distance=True, **kwargs
)
# Return indices and distances
return closest, closest_dist