Skip to content

buckethead.storage.snapshot

Pure flush() and restore() primitives. No threading, no lifecycle — that's core.BucketSQLite's job.

buckethead.storage.snapshot

Snapshot primitives: flush() and restore().

Pure functions — no threading, no lifecycle, no signal handling. The flush loop and signal/atexit wiring live in lifecycle.py (Phase 5); BucketSQLite.start() and .stop() orchestrate both (Phase 6).

See plan/build-plan.md — Phase 4.

SnapshotError

Bases: Exception

Base class for snapshot-related failures.

SnapshotCorruptError

Bases: SnapshotError

Raised when a downloaded snapshot fails SQLite integrity checks.

FlushResult dataclass

FlushResult(
    uploaded: bool, bytes_written: int, snapshot_hash: str
)

Outcome of a single flush() call.

restore

restore(
    conn: Connection, bucket_client: BucketClient, key: str
) -> bool

Download the snapshot at key and hydrate it into conn.

Returns True if a snapshot was found and restored, False if no snapshot object exists at that key. Raises SnapshotCorruptError if the downloaded file fails PRAGMA integrity_check.

Source code in src/buckethead/storage/snapshot.py
def restore(conn: sqlite3.Connection, bucket_client: BucketClient, key: str) -> bool:
    """Download the snapshot at `key` and hydrate it into `conn`.

    Returns True if a snapshot was found and restored, False if no
    snapshot object exists at that key. Raises SnapshotCorruptError
    if the downloaded file fails `PRAGMA integrity_check`.
    """
    if not bucket_client.exists(key):
        return False

    with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp:
        tmp_path = Path(tmp.name)

    try:
        bucket_client.download(key, tmp_path)
        try:
            source = sqlite3.connect(tmp_path)
        except sqlite3.DatabaseError as e:
            raise SnapshotCorruptError(
                f"snapshot at {key!r} is not a valid SQLite file: {e}"
            ) from e
        try:
            source.backup(conn)
        except sqlite3.DatabaseError as e:
            raise SnapshotCorruptError(
                f"snapshot at {key!r} failed to restore: {e}"
            ) from e
        finally:
            source.close()
    finally:
        tmp_path.unlink(missing_ok=True)

    check = conn.execute("PRAGMA integrity_check").fetchone()
    if check != ("ok",):
        raise SnapshotCorruptError(
            f"snapshot at {key!r} failed integrity_check: {check}"
        )
    return True

flush

flush(
    conn: Connection,
    bucket_client: BucketClient,
    key: str,
    *,
    keep_previous: bool,
    last_hash: str | None = None,
) -> FlushResult

Back up conn to a temp file and upload it to key.

Dirty-bit optimization: the temp file is hashed (SHA-256) and if the hash matches last_hash, neither the upload nor the .prev copy runs. Callers pass back the snapshot_hash from the previous FlushResult so the next call can skip when nothing has changed. On read-heavy workloads this eliminates both the upload and the copy R2 class-A operations when the DB is idle.

keep_previous has its normal semantics when we do upload: if a prior snapshot exists at key, server-side copy it to key+".prev" before overwriting. First-ever flush skips the copy (no source yet).

Source code in src/buckethead/storage/snapshot.py
def flush(
    conn: sqlite3.Connection,
    bucket_client: BucketClient,
    key: str,
    *,
    keep_previous: bool,
    last_hash: str | None = None,
) -> FlushResult:
    """Back up `conn` to a temp file and upload it to `key`.

    Dirty-bit optimization: the temp file is hashed (SHA-256) and if the
    hash matches `last_hash`, neither the upload nor the `.prev` copy
    runs. Callers pass back the `snapshot_hash` from the previous
    FlushResult so the next call can skip when nothing has changed.
    On read-heavy workloads this eliminates both the upload and the
    copy R2 class-A operations when the DB is idle.

    `keep_previous` has its normal semantics when we do upload: if a
    prior snapshot exists at `key`, server-side copy it to `key+".prev"`
    before overwriting. First-ever flush skips the copy (no source yet).
    """
    with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp:
        tmp_path = Path(tmp.name)

    try:
        dest = sqlite3.connect(tmp_path)
        try:
            conn.backup(dest)
        finally:
            dest.close()

        current_hash = _sha256_file(tmp_path)

        if last_hash is not None and current_hash == last_hash:
            return FlushResult(
                uploaded=False, bytes_written=0, snapshot_hash=current_hash
            )

        if keep_previous and bucket_client.exists(key):
            bucket_client.copy(key, key + ".prev")
        bucket_client.upload(tmp_path, key)
        return FlushResult(
            uploaded=True,
            bytes_written=tmp_path.stat().st_size,
            snapshot_hash=current_hash,
        )
    finally:
        tmp_path.unlink(missing_ok=True)