# Source code for sertit.geometry

```# -*- coding: utf-8 -*-
# Copyright 2024, SERTIT-ICube - France, https://sertit.unistra.fr/
# This file is part of sertit-utils project
#     https://github.com/sertit/sertit-utils
#
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# Unless required by applicable law or agreed to in writing, software
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
"""
Geometry tools

You can use this only if you have installed sertit[full] or sertit[vectors]
"""
import logging

import geopandas as gpd
import numpy as np
import shapely
from shapely import ops
from shapely.errors import GeometryTypeError
from shapely.geometry import Polygon, box
from tqdm import tqdm

from sertit import vectors
from sertit.logs import SU_NAME
from sertit.types import AnyPolygonType

LOGGER = logging.getLogger(SU_NAME)

[docs]
def from_polygon_to_bounds(polygon: AnyPolygonType) -> (float, float, float, float):
"""
Convert a :code:`shapely.polygon` to its bounds, sorted as :code:`left, bottom, right, top`.

Args:
polygon (MultiPolygon): polygon to convert

Returns:
(float, float, float, float): left, bottom, right, top

Example:
>>> poly = Polygon(((0., 0.), (0., 1.), (1., 1.), (1., 0.), (0., 0.)))
>>> from_polygon_to_bounds(poly)
(0.0, 0.0, 1.0, 1.0)
"""
left = polygon.bounds[0]  # xmin
bottom = polygon.bounds[1]  # ymin
right = polygon.bounds[2]  # xmax
top = polygon.bounds[3]  # ymax

assert left < right
assert bottom < top

return left, bottom, right, top

[docs]
def from_bounds_to_polygon(
left: float, bottom: float, right: float, top: float
) -> Polygon:
"""
Convert the bounds to a :code:`shapely.polygon`.

Args:
left (float): Left coordinates
bottom (float): Bottom coordinates
right (float): Right coordinates
top (float): Top coordinates

Returns:
Polygon: Polygon corresponding to the bounds

Example:
>>> poly = from_bounds_to_polygon(0.0, 0.0, 1.0, 1.0)
>>> print(poly)
'POLYGON ((1 0, 1 1, 0 1, 0 0, 1 0))'

"""
return box(min(left, right), min(top, bottom), max(left, right), max(top, bottom))

[docs]
def get_wider_exterior(vector: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""
Get the wider exterior of a MultiPolygon as a Polygon

Args:
vector (gpd.GeoDataFrame): Polygon to simplify

Returns:
vector: gpd.GeoDataFrame: Wider exterior

Example:
>>> # Open a raw footprint
geometry
0         MULTIPOLYGON (((491053.524 5616778.498, 491262...
1         MULTIPOLYGON (((491314.496 5616444.620, 491295...
2         MULTIPOLYGON (((490783.440 5616102.457, 490923...
>>>
>>> # Get the wider exterior
>>> get_wider_exterior(footprint_raw)
geometry
0      POLYGON ((491053.524 5616778.498, 491262.302 5...
"""
vector = vector.explode(index_parts=True)

# Get the footprint max (discard small holes stored in other polygons)
wider = vector[vector.area == np.max(vector.area)]

# Only select the exterior of this footprint(sometimes some holes persist)
if not wider.empty:
poly = Polygon(list(wider.exterior.iat[0].coords))
wider = gpd.GeoDataFrame(geometry=[poly], crs=wider.crs)

# Resets index as we only got one polygon left which should have index 0
wider.reset_index(inplace=True)

return wider

[docs]
def make_valid(gdf: gpd.GeoDataFrame, verbose=False) -> gpd.GeoDataFrame:
"""
Repair geometries from a dataframe.

Better to use :code:`gpd.make_valid` if you can.

Args:
gdf (gpd.GeoDataFrame): GeoDataFrame to repair
verbose (bool): Verbose invalid geometries

Returns:
gpd.GeoDataFrame: Repaired geometries

Example:
>>> # Open a raw  vector with invalid geometries
geometry
0         MULTIPOLYGON (((491053.524 5616778.498, 491262...
1         MULTIPOLYGON (((491314.496 5616444.620, 491295...
2         MULTIPOLYGON (((490783.440 5616102.457, 490923...
>>>
>>> # Get the valid geometries
>>> make_valid(raw)
geometry
1         MULTIPOLYGON (((491314.496 5616444.620, 491295...
"""
try:
geos_logger = logging.getLogger("shapely.geos")
previous_level = geos_logger.level
if verbose:
logging.debug("Invalid geometries:\n" f"\t{gdf[~gdf.is_valid]}")
else:
geos_logger.setLevel(logging.CRITICAL)

# Discard self-intersection and null geometries
from shapely.validation import make_valid

gdf.geometry = gdf.geometry.apply(make_valid)

if not verbose:
geos_logger.setLevel(previous_level)
except ImportError:
import shapely

LOGGER.warning(
f"'make_valid' not available in 'shapely' (version {shapely.__version__} < 1.8). "
f"The obtained vector may be broken !"
)

return gdf

[docs]
def simplify_footprint(
footprint: gpd.GeoDataFrame, resolution: float, max_nof_vertices: int = 50
) -> gpd.GeoDataFrame:
"""
Simplify footprint.

Set a number of maximum vertices and this function will try to simplify the footprint to have less than this number of vertices.
The tolerance will grow to try to respect this number of vertices.

This function will loop over a number of pixels of tolerence [1, 2, 4, 8, 16, 32, 64] (tolerance of gpd.simplify == resolution * tol_pix)
If in the end, the number of vertices is still too high, a warning will be emitted.

Args:
footprint (gpd.GeoDataFrame): Footprint to be simplified
resolution (float): Corresponding resolution
max_nof_vertices (int): Maximum number of vertices of the wanted footprint

Returns:
gpd.GeoDataFrame: Simplified footprint

Examples:
>>> # Open a raw footprint
>>> len(footprint_raw.get_coordinates())
64757
>>>
>>> # Get the simplified footprint
>>> simplified = simplify_footprint(footprint_raw, 20)
>>> len(simplified.get_coordinates())
29
"""
# Number of pixels of tolerance
tolerance = [1, 2, 4, 8, 16, 32, 64]

# Process only if given footprint is too complex (too many vertices)
def simplify_geom(value):
nof_vertices = len(value.exterior.coords)
if nof_vertices > max_nof_vertices:
for tol in tolerance:
# Simplify footprint
value = value.simplify(
tolerance=tol * resolution, preserve_topology=True
)

# Check if OK
nof_vertices = len(value.exterior.coords)
if nof_vertices <= max_nof_vertices:
break
return value

footprint = footprint.explode(index_parts=True)
footprint.geometry = footprint.geometry.apply(simplify_geom)

return footprint

[docs]
def fill_polygon_holes(
gpd_results: gpd.GeoDataFrame, threshold: float = None
) -> gpd.GeoDataFrame:
"""
Fill holes over a given threshold on the hole area (in meters) for all polygons of a GeoDataFrame.
If the threshold is set to None, every hole is filled.

Args:
gpd_results (gpd.GeoDataFrame): Geodataframe filled whith drilled polygons
threshold (float): Holes area threshold, in meters. If set to None, every hole is filled.

Returns:
gpd.GeoDataFrame: GeoDataFrame with filled holes

Example:
>>> # Open a polygon with holes
>>> # This polygons has interiors features, it has holes
>>> holes.interiors
3    [LINEARRING (491328.9981955575 5616655.8234532...
dtype: object
>>> no_holes = fill_polygon_holes(holes)
Processing objects: 100%|██████████| 1/1 [00:00<00:00, 897.18it/s]
>>> no_holes.interiors
0    []
dtype: object
"""

def _fill_polygon(polygon: Polygon, threshold: float = None):
"""
Function used in apply in fill_polygon_holes
"""
if threshold is not None:
if threshold > 0 and len(polygon.interiors) > 0:
new_interiors = list(polygon.interiors)
for interior in polygon.interiors:
interior_poly = Polygon(np.array(interior.coords))
if interior_poly.area < threshold:
new_interiors.remove(interior)
return Polygon(polygon.exterior, new_interiors)
else:
return polygon
else:
return Polygon(polygon.exterior)

# Ensure the geometries are valid
gpd_results = make_valid(gpd_results)

# Check if vector is projected, if not convert it
with vectors.utm_crs(gpd_results) as utm_results:
# Explode the multipolygons
utm_results = utm_results.explode(index_parts=False)

# Keep only the exterior
tqdm.pandas(desc="Processing objects")
utm_results.geometry = utm_results.geometry.progress_apply(
_fill_polygon, args=(threshold,)
)

# Dissolve and explode to remove polygons in polygons
utm_results = utm_results.dissolve()
gpd_results = utm_results.explode(index_parts=False)

# Write back to file
return gpd_results

[docs]
def line_merge(lines: gpd.GeoDataFrame, **kwargs) -> gpd.GeoDataFrame:
"""
:code:`shapely.line_merge` algorithm applied to a GeoDataFrame.

See the corresponding documentation for more insights about the details of this function.

Args:
lines (gpd.GeoDataFrame): MultiLineString as a GeoDataFrame.

Returns:
gpd.GeoDataFrame: GeoDataFrame composed of (Multi)LineStrings formed by combining the lines of the input GeoDataFrame

Example:
>>> import geopandas as gpd
>>> from sertit import geometry
geometry
0  POLYGON ((491460.248 5616687.073, 491460.248 5...
>>> geometry.split(poly, splitter=lines)
The lines are discontinuous and don't intersect totally the input polygon: nothing is splitted
geometry
0  POLYGON ((491460.248 5616687.073, 491460.248 5...

>>> lines = geometry.line_merge(lines)
>>> geometry.split(poly, splitter=lines)
The lines are now merged into one and now intersect totally the input polygon: split is done
geometry
0  POLYGON ((491460.248 5616687.073, 491460.248 5...
0  POLYGON ((491055.017 5616255.823, 491053.998 5...
"""
merge_lines = shapely.line_merge(lines.dissolve().geometry.values, **kwargs)
return gpd.GeoDataFrame(geometry=merge_lines, crs=lines.crs).explode(
ignore_index=True
)

[docs]
def split(polygons: gpd.GeoDataFrame, splitter: gpd.GeoDataFrame):
"""
Split polygons with polygons or lines.

:code:`shapely.ops.split` algorithm applied to GeoDataFrames.

Be careful: lines have to cut the whole polygon to work!
Use :code:`geometry.line_merge: to merge your lines if needed.

Args:
polygons (gpd.GeoDataFrame): Polygons to split
splitter (gpd.GeoDataFrame): Splitter to split the polygons

Returns:
gpd.GeoDataFrame: Split GeoDataFrame

Example:
>>> import geopandas as gpd
>>> from sertit import geometry
geometry
0  POLYGON ((491460.248 5616687.073, 491460.248 5...
>>> split_poly = geometry.split(poly, splitter=lines)
geometry
0  POLYGON ((491460.248 5616687.073, 491460.248 5...
0  POLYGON ((491055.017 5616255.823, 491053.998 5...
"""
out = polygons.dropna(axis=1).geometry
for _, split in splitter.iterrows():
# Compute the boundary of the splitter polygon (to get a LineString)
if split.geometry.area > 0:
boundary = split.geometry.boundary
else:
boundary = split.geometry

# Explode to prevent FeatureCollections
try:
# LineStrings
out = (
out.map(lambda geom: ops.split(geom, boundary).geoms).explode().dropna()
)
except GeometryTypeError:
# MultiLineStrings
for line in boundary.geoms:
out = (
out.map(lambda geom: ops.split(geom, line).geoms).explode().dropna()
)

return gpd.GeoDataFrame(geometry=out.explode(), crs=polygons.crs)

[docs]
def intersects(
input: gpd.GeoDataFrame, other: gpd.GeoDataFrame, buffer_on_input: float = None
) -> gpd.GeoDataFrame:
"""
Select the polygons of the input GeoDataFrame that intersects the other one and return them.

A buffer can be added on the input GeoDataFrame to be sure the intersection is ok.
For example, when dealing with polygons and points on their borders, sometimes the points fall a tiny bit outside the polygon boundary and is not considred as intersecting.

:code:`gpd.intersects` algorithm applied to whole GeoDataFrames.

Args:
input (gpd.GeoDataFrame): Input GeoDataFrame from which the polygons will be selected
other (gpd.GeoDataFrame): Other GeoDataFrame from that will intersect the first one
buffer_on_input (float): Buffer size on thhe input GeoDataFrame

Returns:
gpd.GeoDataFrame: Polygons of the input that intersects the other GeoDataFrame

Examples:
>>> intersects(water, lakes)
geometry
2  POLYGON ((490733.035 5616749.035, 490936.972 5...
3  POLYGON ((491254.800 5616242.894, 491175.035 5...

>>>
>>> # Some samples are on the polygon boundary but for some reason fall a tiny bit outside and don't intersect the water
>>> intersects(water, samples)
geometry
2  POLYGON ((490733.035 5616749.035, 490936.972 5...
>>>
>>> # Some samples are on the polygon boundary but for some reason fall a tiny bit outside and don't intersect the water
>>> # Use the buffer to handle this case. Use it carefully!
>>> intersects(water, samples, buffer_on_input=0.1)
geometry
2  POLYGON ((490733.035 5616749.035, 490936.972 5...
3  POLYGON ((491254.800 5616242.894, 491175.035 5...
"""
if buffer_on_input is not None:
input_to_intersect = buffer(input, buffer_on_input)
else:
input_to_intersect = input

return input[
input_to_intersect.geometry.map(lambda x: x.intersects(other.geometry).any())
]

[docs]
def buffer(vector: gpd.GeoDataFrame, buffer_m: float, **kwargs) -> gpd.GeoDataFrame:
"""
Add a buffer on a vector.

:code:`gpd.buffer` algorithm returning a GeoDataFrame instead of a GeoSeries.

Args:
vector (gpd.GeoDataFrame): Input vector
buffer_m (int): Buffer size in meters.
**kwargs: Other buffer arguments

Returns:
gpd.GeoDataFrame: Buffered vector

Example:
>>> lines.area
0    0.0
1    0.0
dtype: float64
>>> lines_buffer = buffer(lines, 10)
>>> lines_buffer.area
0    5695.213033
1    5898.723719
dtype: float64

"""
vector_bfd = vector.copy()
vector_bfd.geometry = vector.buffer(buffer_m, **kwargs)
return vector_bfd

[docs]
def nearest_neighbors(
src_gdf: gpd.GeoDataFrame,
candidates_gdf: gpd.GeoDataFrame,
method: str,
k_neighbors: int = None,
**kwargs,
) -> (np.ndarray, np.ndarray):
"""
For each point in src_gdf, find the closest point in candidates_gdf and return them with their distances (in the crs coordinates).

Closest points are:

- if method == :code:`k_neighbors`: the k closest neighbors
- if method == :code:`radius`: the neighbors inside this radius (in the crs coordinates, better done with projected geometries)

Args:
src_gdf (gpd.GeoDataFrame): Source geodataframe
candidates_gdf (gpd.GeoDataFrame): Candidates geodataframe
k_neighbors (int): Number of neighbors to be looked for
**kwargs: Other args for the query

Returns:
(np.ndarray, np.ndarray): closest samples, distances

Examples:
>>> from sertit import geometry, vectors
>>> # There is only one point in the neighborhood of each src, the others are further than 100m
>>>
[array([13]) array([12]) array([0])], [array([39.62574458]) array([50.37121574]) array([90.98648454])]
>>>
>>> # k_neighbors method
>>> nearest_neighbors(src, candidates, method="k_neighbors", k_neighbors=1)
[array([13]) array([12]) array([0])], [array([39.62574458]) array([50.37121574]) array([90.98648454])]
"""
# Parse coordinates from points and insert them into a numpy array as RADIANS
src = [(val.xy[0][0], val.xy[1][0]) for val in src_gdf.geometry.values]
candidates = [
(val.xy[0][0], val.xy[1][0]) for val in candidates_gdf.geometry.values
]

# Find the nearest points
# -----------------------
# closest ==> index in right_gdf that corresponds to the closest point
# dist ==> distance between the nearest neighbors (in the crs coordinates)
if method == "k_neighbors":
closest_samples, distances = _get_k_nearest(
src_points=src, candidates=candidates, k_neighbors=k_neighbors, **kwargs
)
else:
)

return closest_samples, distances

def _get_k_nearest(src_points: list, candidates: list, k_neighbors: int, **kwargs):
"""
For each point in src_gdf, find the nearest k points in candidates_gdf and return them with their distance.

Args:
src_points (list): Source points
candidates (list): Candidate points
k_neighbors (int): Number of neighbors to be looked for
**kwargs: Other args for the query

Returns:
(np.ndarray, np.ndarray): closest samples, distances
"""
try:
from sklearn.neighbors import BallTree
except ModuleNotFoundError as ex:
raise ModuleNotFoundError(
"Please install 'sklearn' the 'geometry.nearest_neighbors' function."
) from ex

# Create tree from the candidate points
tree = BallTree(candidates, leaf_size=15)

# Find the closest points and distances
closest_dist, closest = tree.query(
src_points, k=min(k_neighbors, len(candidates)), **kwargs
)

# Return indices and distances
return closest, closest_dist

src_points: list, candidates: list, radius: float, **kwargs
) -> (np.ndarray, np.ndarray):
"""
For each point in src_gdf, find the points in candidates_gdf inside the given radius and return them with their distance.

Args:
src_points (list): Source points
candidates (list): Candidate points
**kwargs: Other args for the query

Returns:
(np.ndarray, np.ndarray): closest samples, distances
"""
try:
from sklearn.neighbors import BallTree
except ModuleNotFoundError as ex:
raise ModuleNotFoundError(
"Please install 'sklearn' the 'geometry.nearest_neighbors' function."
) from ex

# Create tree from the candidate points
tree = BallTree(candidates, leaf_size=15)

# Find the closest points and distances