Source code for sertit.s3

# -*- coding: utf-8 -*-
# Copyright 2024, SERTIT-ICube - France, https://sertit.unistra.fr/
# This file is part of sertit-utils project
#     https://github.com/sertit/sertit-utils
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
S3 tools
"""
import logging
import os
from contextlib import contextmanager
from functools import wraps

from cloudpathlib import S3Client

from sertit.logs import SU_NAME

LOGGER = logging.getLogger(SU_NAME)

AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID"
"""
Environment variable linked to AWS Access Key ID.
"""

AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY"
"""
Environment variable linked to AWS Secret Access Key.
"""

AWS_S3_ENDPOINT = "AWS_S3_ENDPOINT"
"""
Environment variable linked to AWS endpoint.
"""

USE_S3_STORAGE = "USE_S3_STORAGE"
"""
Environment variable created to use Unistra's S3 bucket.
"""


[docs] def s3_env(*args, **kwargs): """ Create S3 compatible storage environment You need to set endpoint url if you use s3 compatible storage since GDAL/Rasterio does not read endpoint url from config file. This function searches for S3 configuration in many places. It does apply configuration variables precedence, and you might have a use for it. Here is the order of precedence from least to greatest (the last listed configuration variables override all other variables): 1. AWS profile 2. Given endpoint_url as function argument 3. AWS environment variable Returns: Callable: decorated function Example: >>> from sertit.s3 import s3_env >>> from sertit import AnyPath >>> @s3_env(endpoint="s3.unistra.fr") >>> def file_exists(path: str): >>> pth = AnyPath(path) >>> print(pth.exists()) >>> file_exists("s3://sertit-geodatastore/GLOBAL/COPDEM_30m/COPDEM_30m.vrt") True """ import rasterio use_s3 = kwargs.get("use_s3_env_var", USE_S3_STORAGE) requester_pays = kwargs.get("requester_pays") no_sign_request = kwargs.get("no_sign_request") endpoint = os.getenv(AWS_S3_ENDPOINT, kwargs.get("endpoint")) profile_name = kwargs.get("profile_name", None) def decorator(function): @wraps(function) def s3_env_wrapper(*_args, **_kwargs): """S3 environment wrapper""" if int(os.getenv(use_s3, 1)): args_rasterio = { "profile_name": profile_name, "CPL_CURL_VERBOSE": False, "GDAL_DISABLE_READDIR_ON_OPEN": False, "AWS_NO_SIGN_REQUEST": "YES" if no_sign_request else "NO", "AWS_REQUEST_PAYER": "requester" if requester_pays else None, } args_s3_client = { "profile_name": profile_name, "requester_pays": requester_pays, "no_sign_request": no_sign_request, } args_s3_client.update(kwargs) if endpoint is not None: args_rasterio["AWS_S3_ENDPOINT"] = endpoint args_s3_client["endpoint_url"] = ( f"https://{endpoint}" # cloudpathlib can read endpoint from config file ) # Define S3 client for S3 paths define_s3_client(**args_s3_client) os.environ[use_s3] = "1" LOGGER.info("Using S3 files") with rasterio.Env(**args_rasterio): return function(*_args, **_kwargs) else: os.environ[use_s3] = "0" LOGGER.info("Using on disk files") return function(*_args, **_kwargs) return s3_env_wrapper return decorator
[docs] @contextmanager def temp_s3( endpoint: str = None, profile_name: str = None, requester_pays: bool = False, no_sign_request: bool = False, **kwargs, ) -> None: """ Initialize a temporary S3 environment as a context manager You need to set endpoint url if you use s3 compatible storage since GDAL/Rasterio does not read endpoint url from config file. This function searches for S3 configuration in many places. It does apply configuration variables precedence, and you might have a use for it. Here is the order of precedence from least to greatest (the last listed configuration variables override all other variables): 1. AWS profile 2. Given endpoint_url as function argument 3. AWS environment variable Args: endpoint: Endpoint to s3 path in the form s3.yourdomain.com profile_name: The name of your AWS profile requester_pays (bool): True if the endpoint says 'requester pays' no_sign_request (bool): True if the endpoint is open access Example: >>> from sertit.s3 import temp_s3 >>> from sertit import AnyPath >>> def file_exists(path: str): >>> with temp_s3(endpoint="s3.unistra.fr"): >>> pth = AnyPath(path) >>> print(pth.exists()) >>> file_exists("s3://sertit-geodatastore/GLOBAL/COPDEM_30m/COPDEM_30m.vrt") True """ import rasterio # Define S3 client for S3 paths try: args_rasterio = { "profile_name": profile_name, "CPL_CURL_VERBOSE": False, "GDAL_DISABLE_READDIR_ON_OPEN": False, "AWS_NO_SIGN_REQUEST": "YES" if no_sign_request else "NO", "AWS_REQUEST_PAYER": "requester" if requester_pays else None, } args_s3_client = { "profile_name": profile_name, "requester_pays": requester_pays, "no_sign_request": no_sign_request, } args_s3_client.update(kwargs) endpoint = os.getenv( AWS_S3_ENDPOINT, endpoint ) # Give the precedence to AWS_S3_ENDPOINT if endpoint is not None: args_rasterio["AWS_S3_ENDPOINT"] = endpoint args_s3_client["endpoint_url"] = ( f"https://{endpoint}" # cloudpathlib can read endpoint from config file ) with rasterio.Env(**args_rasterio): yield define_s3_client(**args_s3_client) finally: # Clean env S3Client().set_as_default_client()
[docs] def define_s3_client( endpoint_url=None, profile_name=None, requester_pays: bool = False, no_sign_request: bool = False, **kwargs, ): """ Define S3 client This function searches for S3 configuration in many places. It does apply configuration variables precedence, and you might have a use for it. Here is the order of precedence from least to greatest (the last listed configuration variables override all other variables): 1. AWS profile 2. Given endpoint_url as function argument 3. AWS environment variable Args: endpoint_url: The endpoint url in the form https://s3.yourdomain.com profile_name: The name of the aws profile. Default to default profile in AWS configuration file. requester_pays (bool): True if the endpoint says 'requester pays' no_sign_request (bool): True if the endpoint is open access """ if os.environ.get(AWS_S3_ENDPOINT) is not None: endpoint_url = kwargs.pop( "endpoint_url", f"https://{os.environ.get(AWS_S3_ENDPOINT)}" ) aws_access_key_id = kwargs.pop("aws_access_key_id", os.getenv(AWS_ACCESS_KEY_ID)) aws_secret_access_key = kwargs.pop( "aws_secret_access_key", os.getenv(AWS_SECRET_ACCESS_KEY) ) if not no_sign_request: no_sign_request = kwargs.pop("no_sign_request", False) s3_client_args = [ "aws_session_token", "botocore_session", "profile_name", "boto3_session", "file_cache_mode", "local_cache_dir", "boto3_transfer_config", "content_type_method", "extra_args", ] s3_client_kwargs = {key: kwargs.get(key) for key in s3_client_args if key in kwargs} if requester_pays: if "extra_args" in s3_client_kwargs: s3_client_kwargs["extra_args"].update({"RequestPayer": "requester"}) else: s3_client_kwargs["extra_args"] = {"RequestPayer": "requester"} # ON S3 args_s3_client = { "aws_access_key_id": aws_access_key_id, "aws_secret_access_key": aws_secret_access_key, "profile_name": profile_name, "no_sign_request": no_sign_request, } args_s3_client.update(s3_client_kwargs) if endpoint_url is not None: args_s3_client["endpoint_url"] = endpoint_url client = S3Client(**args_s3_client) client.set_as_default_client()