Skip to content

buckethead.core

The BucketSQLite orchestrator. One class, the public entry point.

buckethead.core

BucketSQLite: the public orchestrator class.

Wires together everything from earlier phases
  • BucketClient (Phase 3)
  • snapshot.flush / snapshot.restore (Phase 4)
  • FlushLoop, install_signal_handlers, install_atexit (Phase 5)
  • FileStore (Phase 10)

Owns the keep-alive connection (shared-cache URI per the Phase 0 decision) and coordinates lifecycle so callers only see start() / stop() / flush() / force_flush() / connect() / connection / files.

BucketSQLite

BucketSQLite(
    bucket_config: BucketConfig | None = None,
    snapshot_config: SnapshotConfig | None = None,
    profiling_config: ProfilingConfig | None = None,
    *,
    on_flush_start: Callable[[], None] | None = None,
    on_flush_complete: Callable[[float, int], None]
    | None = None,
    on_flush_error: Callable[[BaseException], None]
    | None = None,
    status_reporter: StatusReporter | None = None,
    share_config: ShareConfig | None = None,
    project: str | None = None,
)
Source code in src/buckethead/core.py
def __init__(
    self,
    bucket_config: BucketConfig | None = None,
    snapshot_config: SnapshotConfig | None = None,
    profiling_config: ProfilingConfig | None = None,
    *,
    on_flush_start: Callable[[], None] | None = None,
    on_flush_complete: Callable[[float, int], None] | None = None,
    on_flush_error: Callable[[BaseException], None] | None = None,
    status_reporter: StatusReporter | None = None,
    share_config: ShareConfig | None = None,
    project: str | None = None,
) -> None:
    if project is not None and share_config is not None:
        raise ValueError(
            "pass either project= or share_config=, not both — "
            "project= derives share_config from user config"
        )
    if bucket_config is None:
        bucket_config = _load_bucket_config_from_settings()
    if project is not None:
        share_config = _share_config_for_project(project)

    self.bucket_config = bucket_config
    self.snapshot_config = snapshot_config or SnapshotConfig()
    self.profiling_config = profiling_config or ProfilingConfig()
    self.share_config = share_config

    self._on_flush_start = on_flush_start
    self._on_flush_complete = on_flush_complete
    self._on_flush_error = on_flush_error
    self._status_reporter: StatusReporter = (
        status_reporter if status_reporter is not None else RichStatusReporter()
    )
    self._shares: SharingService | None = None

    self._uri: str | None = None
    self._connection: sqlite3.Connection | None = None
    self._bucket_client: BucketClient | None = None
    self._filestore: FileStore | None = None
    self._kv: KVStore | None = None
    self._docs: DocStore | None = None
    self._branches: BranchManager | None = None
    self._flush_loop: FlushLoop | None = None
    self._signal_teardown: Callable[[], None] | None = None
    self._atexit_teardown: Callable[[], None] | None = None

    self._io_counters: IOCounters | None = (
        IOCounters() if self.profiling_config.io_counters else None
    )
    self._started_at: float | None = None
    self._flush_index = 0
    self._last_snapshot_hash: str | None = None
    self._current_branch: str = self.bucket_config.initial_branch

    self._flush_lock = threading.RLock()
    self._last_flush_time = 0.0
    self._started = False
    self._stopped = False

shares property

File-sharing API. Requires a share_config at construction.

Raises RuntimeError when sharing isn't configured — keeps the "sharing off" case from looking like a typo in the caller.

connect

connect() -> Connection

Open a new connection to the shared in-memory DB.

Intended use: one per thread. Closing any vended connection does not tear down the DB — the keep-alive connection owned by BucketSQLite keeps it alive until stop().

Source code in src/buckethead/core.py
def connect(self) -> sqlite3.Connection:
    """Open a new connection to the shared in-memory DB.

    Intended use: one per thread. Closing any vended connection
    does not tear down the DB — the keep-alive connection owned by
    BucketSQLite keeps it alive until stop().
    """
    if self._uri is None:
        raise RuntimeError("BucketSQLite not started")
    return sqlite3.connect(self._uri, uri=True, check_same_thread=False)

flush

flush() -> None

Debounced manual flush.

No-op if less than snapshot_config.min_interval_seconds has passed since the last successful flush of any kind.

Source code in src/buckethead/core.py
def flush(self) -> None:
    """Debounced manual flush.

    No-op if less than `snapshot_config.min_interval_seconds` has passed
    since the last successful flush of any kind.
    """
    now = time.time()
    if now - self._last_flush_time < self.snapshot_config.min_interval_seconds:
        return
    self._flush_now()

force_flush

force_flush() -> None

Manual flush with no debounce.

Source code in src/buckethead/core.py
def force_flush(self) -> None:
    """Manual flush with no debounce."""
    self._flush_now()