"""S3-compatible storage client using aiobotocore."""
from __future__ import annotations
from types import TracebackType
from typing import Any
from botocore.config import Config
from botocore.exceptions import ClientError
from etils import epy
from derp.config import StorageConfig
from derp.storage.exceptions import (
StorageAccessDeniedError,
StorageBackendError,
StorageBucketNotFoundError,
StorageNotConnectedError,
StorageObjectNotFoundError,
StoragePartialDeleteError,
)
with epy.lazy_imports():
import aiobotocore.client as aio_client
import aiobotocore.session as aio_session
_NOT_FOUND_CODES = frozenset({"404", "NoSuchKey", "NotFound"})
_NO_SUCH_BUCKET_CODES = frozenset({"NoSuchBucket"})
_ACCESS_DENIED_CODES = frozenset({"403", "AccessDenied", "Forbidden"})
def _join_url(base_url: str, *parts: str) -> str:
return "/".join([base_url.rstrip("/"), *(part.strip("/") for part in parts)])
def _error_code(exc: ClientError) -> str:
return exc.response.get("Error", {}).get("Code", "") or ""
[docs]
class StorageClient:
"""S3-compatible storage client for uploading and fetching files.
Example::
config = StorageConfig(
endpoint_url="https://s3.amazonaws.com",
access_key_id="key",
secret_access_key="secret",
)
storage = StorageClient(config)
await storage.connect()
await storage.upload_file("local.txt", "remote.txt")
await storage.disconnect()
"""
[docs]
def __init__(self, config: StorageConfig):
"""Initialize Storage client.
Args:
config: Storage configuration.
"""
self._config = config
self._session: aio_session.AioSession | None = None
self._client: aio_client.AioBaseClient | None = None
[docs]
async def connect(self) -> None:
"""Establish connection to S3."""
if self._client is not None:
return
self._session = aio_session.get_session()
config = Config(
region_name=self._config.region,
signature_version="s3v4",
)
self._client = await self._session.create_client(
self._config.service_name,
endpoint_url=self._config.endpoint_url,
aws_access_key_id=self._config.access_key_id,
aws_secret_access_key=self._config.secret_access_key,
aws_session_token=self._config.session_token,
use_ssl=self._config.use_ssl,
verify=self._config.verify,
config=config,
).__aenter__()
[docs]
async def disconnect(self) -> None:
"""Close connection to S3."""
if self._client is not None:
await self._client.__aexit__(None, None, None)
self._client = None
if self._session is not None:
self._session = None
async def __aenter__(self) -> StorageClient:
await self.connect()
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
await self.disconnect()
async def _call(
self,
op_name: str,
*,
bucket: str | None = None,
key: str | None = None,
**kwargs: Any,
) -> Any:
"""Invoke an aiobotocore operation with normalised error handling.
``bucket`` / ``key`` are used to build informative
``StorageObjectNotFoundError`` / ``StorageBucketNotFoundError``
exceptions; they do not get passed through to the operation.
Pass S3 arguments (``Bucket=...``, ``Key=...``) explicitly.
"""
if self._client is None:
raise StorageNotConnectedError()
op = getattr(self._client, op_name)
try:
return await op(**kwargs)
except ClientError as exc:
code = _error_code(exc)
if code in _NO_SUCH_BUCKET_CODES and bucket is not None:
raise StorageBucketNotFoundError(bucket) from exc
if code in _NOT_FOUND_CODES:
if key is not None and bucket is not None:
raise StorageObjectNotFoundError(bucket, key) from exc
if bucket is not None:
raise StorageBucketNotFoundError(bucket) from exc
if code in _ACCESS_DENIED_CODES:
raise StorageAccessDeniedError() from exc
raise StorageBackendError(str(exc), code=code or None) from exc
[docs]
def get_url(self, *, bucket: str, key: str) -> str:
"""Gets the URL for a file in S3 (public or private).
Args:
bucket: Name of the S3 bucket.
key: S3 object key (path in bucket).
Returns:
URL for the file.
Raises:
ValueError: If no public endpoint URL is configured for this bucket
and endpoint_url is not configured.
"""
public_url = self._config.public_urls.get(bucket)
if public_url is not None:
return _join_url(public_url, key)
if self._config.endpoint_url is None:
raise ValueError(
"Cannot construct URL: no public URL is configured for bucket "
f"{bucket!r} in `public_urls`, and `endpoint_url` is not "
"configured in StorageConfig."
)
return _join_url(self._config.endpoint_url, bucket, key)
[docs]
async def upload_file(
self,
*,
bucket: str,
key: str,
data: bytes,
content_type: str | None = None,
metadata: dict[str, str] | None = None,
extra_args: dict[str, Any] | None = None,
) -> None:
"""Upload a file to S3.
Args:
bucket: Name of the S3 bucket.
key: S3 object key (path in bucket).
data: Bytes to upload.
content_type: MIME type of the file.
metadata: Metadata to attach to the object.
extra_args: Additional arguments to pass to put_object.
Example::
await storage.upload_file(
bucket="my-bucket",
key="remote/file.txt",
data=b"Hello, World!",
content_type="text/plain",
metadata={"author": "user123"},
)
"""
put_kwargs: dict[str, Any] = {
"Bucket": bucket,
"Key": key,
"Body": data,
}
if content_type:
put_kwargs["ContentType"] = content_type
if metadata:
put_kwargs["Metadata"] = metadata
if extra_args:
put_kwargs.update(extra_args)
await self._call("put_object", bucket=bucket, key=key, **put_kwargs)
[docs]
async def fetch_file(self, *, bucket: str, key: str) -> bytes:
"""Fetch a file from S3.
Args:
bucket: Name of the S3 bucket.
key: S3 object key (path in bucket).
Returns:
File content as bytes.
Raises:
StorageObjectNotFoundError: If ``key`` does not exist.
StorageBucketNotFoundError: If ``bucket`` does not exist.
StorageAccessDeniedError: If the backend denies the request.
StorageBackendError: For any other backend failure.
Example:
content = await storage.fetch_file(
bucket="my-bucket", key="remote/file.txt"
)
"""
response = await self._call(
"get_object", bucket=bucket, key=key, Bucket=bucket, Key=key
)
async with response["Body"] as stream:
content = await stream.read()
return content
[docs]
async def delete_file(self, *, bucket: str, key: str) -> None:
"""Delete a file from S3.
S3 deletes are idempotent: calling this for a missing key
succeeds silently rather than raising.
Args:
bucket: Name of the S3 bucket.
key: S3 object key (path in bucket).
"""
await self._call(
"delete_object", bucket=bucket, key=key, Bucket=bucket, Key=key
)
[docs]
async def delete_files(self, *, bucket: str, keys: list[str]) -> list[str]:
"""Delete multiple files from S3 in a single request.
Args:
bucket: Name of the S3 bucket.
keys: List of S3 object keys to delete.
Returns:
List of keys that were successfully deleted.
Raises:
StoragePartialDeleteError: If S3 reports per-key failures. The
exception carries both the failed entries and the keys
that succeeded in the same batch.
"""
if not keys:
return []
response = await self._call(
"delete_objects",
bucket=bucket,
Bucket=bucket,
Delete={"Objects": [{"Key": k} for k in keys]},
)
deleted = [obj["Key"] for obj in response.get("Deleted", [])]
errors_raw = response.get("Errors") or []
if errors_raw:
errors = [
{
"key": str(e.get("Key", "")),
"code": str(e.get("Code", "")),
"message": str(e.get("Message", "")),
}
for e in errors_raw
]
raise StoragePartialDeleteError(errors=errors, deleted=deleted)
return deleted
[docs]
async def copy_file(
self,
*,
src_bucket: str,
src_key: str,
dst_bucket: str | None = None,
dst_key: str,
) -> None:
"""Copy an object between keys or buckets (server-side).
Args:
src_bucket: Source bucket name.
src_key: Source object key.
dst_bucket: Destination bucket name. Defaults to src_bucket.
dst_key: Destination object key.
Raises:
StorageObjectNotFoundError: If the source object does not exist.
StorageBucketNotFoundError: If either bucket does not exist.
"""
await self._call(
"copy_object",
bucket=src_bucket,
key=src_key,
Bucket=dst_bucket or src_bucket,
Key=dst_key,
CopySource={"Bucket": src_bucket, "Key": src_key},
)
[docs]
async def file_exists(self, *, bucket: str, key: str) -> bool:
"""Check if a file exists in S3.
Returns ``False`` for a missing key; only true backend errors
(denied, network, 5xx) raise.
Args:
bucket: Name of the S3 bucket.
key: S3 object key (path in bucket).
Returns:
True if file exists, False otherwise.
"""
try:
await self._call(
"head_object", bucket=bucket, key=key, Bucket=bucket, Key=key
)
return True
except StorageObjectNotFoundError:
return False
[docs]
async def head_object(self, *, bucket: str, key: str) -> dict[str, Any]:
"""Get object metadata without downloading the content.
Args:
bucket: Name of the S3 bucket.
key: S3 object key (path in bucket).
Returns:
Dict with 'content_type', 'content_length', 'last_modified',
'etag', and 'metadata' keys.
Raises:
StorageObjectNotFoundError: If ``key`` does not exist.
"""
response = await self._call(
"head_object", bucket=bucket, key=key, Bucket=bucket, Key=key
)
return {
"content_type": response.get("ContentType", "application/octet-stream"),
"content_length": response.get("ContentLength", 0),
"last_modified": response["LastModified"].isoformat(),
"etag": response.get("ETag", "").strip('"'),
"metadata": response.get("Metadata", {}),
}
[docs]
async def list_files(
self, *, bucket: str, prefix: str = "", max_keys: int | None = None
) -> list[str]:
"""List files in S3 bucket.
Args:
bucket: Name of the S3 bucket.
prefix: Prefix to filter files by.
max_keys: Maximum number of keys to return.
Returns:
List of object keys.
Example:
files = await storage.list_files(bucket="my-bucket", prefix="folder/")
"""
list_kwargs: dict[str, Any] = {
"Bucket": bucket,
}
if prefix:
list_kwargs["Prefix"] = prefix
if max_keys:
list_kwargs["MaxKeys"] = max_keys
response = await self._call("list_objects_v2", bucket=bucket, **list_kwargs)
if "Contents" not in response:
return []
return [obj["Key"] for obj in response["Contents"]]
[docs]
async def signed_download_url(
self,
*,
bucket: str,
key: str,
expires_in: int = 3600,
response_content_disposition: str | None = None,
response_content_type: str | None = None,
) -> str:
"""Generate a presigned URL for downloading (GET) an object.
Args:
bucket: Name of the S3 bucket.
key: S3 object key (path in bucket).
expires_in: URL expiry in seconds (default 3600).
response_content_disposition: Value for the ``Content-Disposition``
response header the storage backend returns for this URL, e.g.
``'attachment; filename="report.pdf"'`` to force a download. The
value is signed into the URL, so it can't be tampered with.
response_content_type: Value for the ``Content-Type`` response header
the storage backend returns for this URL.
Returns:
Presigned URL string.
"""
params: dict[str, str] = {"Bucket": bucket, "Key": key}
if response_content_disposition is not None:
params["ResponseContentDisposition"] = response_content_disposition
if response_content_type is not None:
params["ResponseContentType"] = response_content_type
return await self._call(
"generate_presigned_url",
bucket=bucket,
key=key,
ClientMethod="get_object",
Params=params,
ExpiresIn=expires_in,
)
[docs]
async def signed_upload_url(
self,
*,
bucket: str,
key: str,
expires_in: int = 3600,
content_type: str | None = None,
) -> str:
"""Generate a presigned URL for uploading (PUT) an object.
Args:
bucket: Name of the S3 bucket.
key: S3 object key (path in bucket).
expires_in: URL expiry in seconds (default 3600).
content_type: MIME type the uploader must use.
Returns:
Presigned URL string.
"""
params: dict[str, str] = {"Bucket": bucket, "Key": key}
if content_type:
params["ContentType"] = content_type
return await self._call(
"generate_presigned_url",
bucket=bucket,
key=key,
ClientMethod="put_object",
Params=params,
ExpiresIn=expires_in,
)
[docs]
async def list_buckets(self) -> list[dict[str, Any]]:
"""List all S3 buckets.
Returns:
List of dicts with 'name' and 'creation_date' keys.
"""
response = await self._call("list_buckets")
return [
{
"name": b["Name"],
"creation_date": b["CreationDate"].isoformat(),
}
for b in response.get("Buckets", [])
]
[docs]
async def list_objects(
self,
*,
bucket: str,
prefix: str = "",
max_keys: int = 1000,
) -> dict[str, Any]:
"""List objects and common prefixes in a bucket.
Uses ``Delimiter='/'`` for folder-like browsing.
Args:
bucket: Name of the S3 bucket.
prefix: Prefix to filter by (use trailing ``/`` for folders).
max_keys: Maximum number of keys to return.
Returns:
Dict with 'objects' (list of object metadata dicts) and
'prefixes' (list of prefix strings representing folders).
"""
list_kwargs: dict[str, Any] = {
"Bucket": bucket,
"Prefix": prefix,
"Delimiter": "/",
"MaxKeys": max_keys,
}
response = await self._call("list_objects_v2", bucket=bucket, **list_kwargs)
objects = [
{
"key": obj["Key"],
"size": obj["Size"],
"last_modified": obj["LastModified"].isoformat(),
}
for obj in response.get("Contents", [])
if obj["Key"] != prefix
]
prefixes = [p["Prefix"] for p in response.get("CommonPrefixes", [])]
return {"objects": objects, "prefixes": prefixes}