Skip to content

buckethead.files.store

Content-addressable blob store backed by one R2 object per blob.

buckethead.files.store

FileStore: content-addressable blob store backed by S3-compatible storage.

Metadata lives in the caller's sqlite3.Connection; bytes live as one object per blob under a configurable prefix in the BucketClient's bucket. The filestore table is created on FileStore construction, so it's always present once the store exists.

See plan/build-plan.md — Phase 10 and issue #3 for the design decisions.

FileStore

FileStore(
    conn: Connection,
    bucket_client: BucketClient,
    prefix: str = "files/",
)

Bases: Interface

Content-addressable blob store. Owns the filestore table and a prefix in R2. Note: the table name predates the bh_ prefix convention documented in Interface — grandfathered in.

Source code in src/buckethead/files/store.py
def __init__(
    self,
    conn: sqlite3.Connection,
    bucket_client: BucketClient,
    prefix: str = "files/",
) -> None:
    self._prefix = prefix if prefix.endswith("/") else prefix + "/"
    # Interface.__init__ stores conn/bucket_client on self and calls
    # _init_schema(), which uses _prefix set above.
    super().__init__(conn, bucket_client)

put

put(
    data: bytes | Path,
    *,
    filename: str | None = None,
    mime: str | None = None,
    metadata: dict[str, str] | None = None,
) -> str

Upload; returns the bh-key (SHA-256 hex of contents).

If the bh-key already exists in both tiers, this is a pure no-op — any new metadata passed in is ignored. If the R2 object exists but the SQLite row is missing (e.g., crashed mid-put), the row is filled in with the arguments from this call.

Source code in src/buckethead/files/store.py
def put(
    self,
    data: bytes | Path,
    *,
    filename: str | None = None,
    mime: str | None = None,
    metadata: dict[str, str] | None = None,
) -> str:
    """Upload; returns the bh-key (SHA-256 hex of contents).

    If the bh-key already exists in both tiers, this is a pure no-op —
    any new metadata passed in is ignored. If the R2 object exists but
    the SQLite row is missing (e.g., crashed mid-put), the row is
    filled in with the arguments from this call.
    """
    if isinstance(data, Path):
        size = data.stat().st_size
        hasher = hashlib.sha256()
        with data.open("rb") as f:
            while chunk := f.read(1 << 20):
                hasher.update(chunk)
        bh_key = hasher.hexdigest()
    else:
        size = len(data)
        bh_key = hashlib.sha256(data).hexdigest()

    object_key = self._object_key(bh_key)
    existing_row = self._fetch_row(bh_key)
    object_present = self._bucket.exists(object_key)

    if existing_row is not None and object_present:
        return bh_key

    if not object_present:
        if isinstance(data, Path):
            self._bucket.upload(data, object_key)
        else:
            self._bucket.put_bytes(data, object_key)

    self._conn.execute(
        "INSERT OR IGNORE INTO filestore "
        "(bh_key, object_key, size, mime, filename, created_at, metadata) "
        "VALUES (?, ?, ?, ?, ?, ?, ?)",
        (
            bh_key,
            object_key,
            size,
            mime,
            filename,
            time.time(),
            json.dumps(metadata) if metadata else None,
        ),
    )
    self._conn.commit()
    return bh_key

metadata

metadata(bh_key: str) -> FileMetadata | None

Return the stored metadata for bh_key, or None if absent.

Source code in src/buckethead/files/store.py
def metadata(self, bh_key: str) -> FileMetadata | None:
    """Return the stored metadata for `bh_key`, or None if absent."""
    row = self._fetch_row(bh_key)
    if row is None:
        return None
    return self._row_to_metadata(row)

gc

gc(*, grace_seconds: float = 300.0) -> GCReport

Reconcile R2 objects under prefix against SQLite rows.

  • Orphans (R2 object with no SQLite row) younger than grace_seconds are skipped (covers in-flight puts).
  • Older orphans are deleted from R2.
  • Dangling rows (SQLite row whose R2 object is gone) are counted but not auto-deleted — a human decides.
Source code in src/buckethead/files/store.py
def gc(self, *, grace_seconds: float = 300.0) -> GCReport:
    """Reconcile R2 objects under `prefix` against SQLite rows.

    - Orphans (R2 object with no SQLite row) younger than `grace_seconds`
      are skipped (covers in-flight puts).
    - Older orphans are deleted from R2.
    - Dangling rows (SQLite row whose R2 object is gone) are counted but
      not auto-deleted — a human decides.
    """
    report = GCReport()

    known_keys: set[str] = set()
    for (bh_key,) in self._conn.execute("SELECT bh_key FROM filestore").fetchall():
        known_keys.add(bh_key)

    known_object_keys: set[str] = set()
    for r2_key in self._bucket.list_keys(self._prefix):
        bh_key = r2_key[len(self._prefix) :]
        known_object_keys.add(bh_key)
        if bh_key in known_keys:
            continue

        report.orphans_found += 1
        last_modified = self._bucket.last_modified(r2_key)
        if last_modified is None:
            # R2 object vanished between list and head — already gone.
            continue
        if time.time() - last_modified < grace_seconds:
            report.orphans_skipped_grace += 1
            continue
        self._bucket.delete(r2_key)
        report.orphans_deleted += 1

    for bh_key in known_keys - known_object_keys:
        report.dangling_rows_found += 1
        _ = bh_key  # visible for debugging if we later log

    return report