Skip to content

buckethead.storage.bucket

Thin wrapper over boto3.client("s3", ...).

buckethead.storage.bucket

Thin wrapper over boto3.client("s3", ...).

Exposes only what snapshot.py and files/store.py need. No retry/backoff logic — boto3 defaults stand. Any error surfaces as botocore.ClientError to the caller.

Optionally records per-op byte totals and wall times into an IOCounters instance (Phase 9 profiling). Passing io_counters=None (the default) keeps the hot path free of any counting overhead.

BucketClient

BucketClient(
    config: BucketConfig,
    io_counters: IOCounters | None = None,
)
Source code in src/buckethead/storage/bucket.py
def __init__(
    self,
    config: BucketConfig,
    io_counters: IOCounters | None = None,
) -> None:
    self._bucket = config.bucket
    self._counters = io_counters
    self._client = boto3.client(
        "s3",
        endpoint_url=config.endpoint_url,
        aws_access_key_id=config.access_key_id,
        aws_secret_access_key=config.secret_access_key,
        region_name=config.region,
        config=Config(signature_version="s3v4"),
    )

put_bytes

put_bytes(data: bytes, key: str) -> None

In-memory counterpart to upload(). For small payloads only — large bytes blow up memory. Counts as an 'upload' in IOCounters so the byte totals reflect all traffic we generated.

Source code in src/buckethead/storage/bucket.py
def put_bytes(self, data: bytes, key: str) -> None:
    """In-memory counterpart to upload(). For small payloads only —
    large bytes blow up memory. Counts as an 'upload' in IOCounters
    so the byte totals reflect all traffic we generated."""
    t0 = time.perf_counter()
    self._client.put_object(Bucket=self._bucket, Key=key, Body=io.BytesIO(data))
    self._record("upload", bytes_up=len(data), t0=t0)

generate_presigned_get

generate_presigned_get(key: str, *, expires_in: int) -> str

Return a sig-v4 presigned GET URL for key.

expires_in is seconds; S3/R2 hard-cap at 7 days (604800).

Source code in src/buckethead/storage/bucket.py
def generate_presigned_get(self, key: str, *, expires_in: int) -> str:
    """Return a sig-v4 presigned GET URL for `key`.

    `expires_in` is seconds; S3/R2 hard-cap at 7 days (604800).
    """
    t0 = time.perf_counter()
    url = self._client.generate_presigned_url(
        "get_object",
        Params={"Bucket": self._bucket, "Key": key},
        ExpiresIn=expires_in,
    )
    # No network I/O, but record the op for observability.
    self._record("generate_presigned_url", t0=t0)
    return url

last_modified

last_modified(key: str) -> float | None

Epoch seconds of the object's LastModified, or None if missing.

Used by FileStore.gc() to enforce the grace window. Counts as a head_object in IOCounters.

Source code in src/buckethead/storage/bucket.py
def last_modified(self, key: str) -> float | None:
    """Epoch seconds of the object's LastModified, or None if missing.

    Used by FileStore.gc() to enforce the grace window. Counts as a
    head_object in IOCounters.
    """
    t0 = time.perf_counter()
    try:
        resp = self._client.head_object(Bucket=self._bucket, Key=key)
    except ClientError as e:
        status = e.response.get("ResponseMetadata", {}).get("HTTPStatusCode")
        if status == 404:
            self._record("head_object", t0=t0)
            return None
        raise
    self._record("head_object", t0=t0)
    return resp["LastModified"].timestamp()