Skip to content

buckethead.storage.lifecycle

Flush-loop daemon thread + signal / atexit handlers.

buckethead.storage.lifecycle

Flush-loop thread, signal handlers, atexit wiring.

The pattern here is a direct port of spikes/signals_atexit.py (Phase 0 Validation #3 + #4). Three principles, in order of importance:

  1. Signal handlers never do I/O. They set a flag. The main thread does the real work via BucketSQLite.stop().
  2. The flush loop is a daemon thread. Its stop is event-driven (not a polling sleep), so stop() returns promptly even if the interval is long.
  3. Everything is idempotent: stop can be called twice, teardown can be called twice, a signal can arrive after atexit has already run.

FlushLoop

FlushLoop(
    flush_callable: Callable[[], None],
    interval_seconds: float,
    *,
    on_error: Callable[[BaseException], None] | None = None,
)

Daemon thread that calls flush_callable every interval_seconds.

Exceptions inside flush_callable are reported via on_error (if provided) and otherwise logged at WARNING; they never terminate the loop. Exceptions inside on_error itself are also suppressed — no user code can kill the thread.

Source code in src/buckethead/storage/lifecycle.py
def __init__(
    self,
    flush_callable: Callable[[], None],
    interval_seconds: float,
    *,
    on_error: Callable[[BaseException], None] | None = None,
) -> None:
    self._flush = flush_callable
    self._interval = interval_seconds
    self._on_error = on_error
    self._stop = threading.Event()
    self._thread = threading.Thread(
        target=self._run, daemon=True, name="buckethead-flush"
    )
    self._started = False

install_signal_handlers

install_signal_handlers(
    on_signal: Callable[[int], None],
) -> Callable[[], None]

Install SIGTERM + SIGINT handlers that call on_signal(signum).

The caller is responsible for everything else (flushing, restoring default disposition, re-raising). Returns a teardown closure that restores the previous handlers exactly.

Must be called from the main thread — Python's signal.signal rejects calls from other threads.

Source code in src/buckethead/storage/lifecycle.py
def install_signal_handlers(
    on_signal: Callable[[int], None],
) -> Callable[[], None]:
    """Install SIGTERM + SIGINT handlers that call `on_signal(signum)`.

    The caller is responsible for everything else (flushing, restoring
    default disposition, re-raising). Returns a teardown closure that
    restores the previous handlers exactly.

    Must be called from the main thread — Python's `signal.signal`
    rejects calls from other threads.
    """
    # signal.getsignal returns a union that includes int/callable/None — typing
    # each case precisely adds noise for no payoff, so keep it Any.
    previous: dict[signal.Signals, Any] = {}
    for sig in (signal.SIGTERM, signal.SIGINT):
        previous[sig] = signal.getsignal(sig)
        signal.signal(sig, lambda signum, frame: on_signal(signum))

    def teardown() -> None:
        for sig, prev in previous.items():
            signal.signal(sig, prev)

    return teardown

install_atexit

install_atexit(
    on_exit: Callable[[], None],
) -> Callable[[], None]

Register on_exit as an atexit hook. Returns a teardown closure.

atexit is the last-resort safety net for sys.exit() paths that don't involve a signal. Signal paths should not rely on atexit running — on some signals (SIG_DFL termination) atexit is bypassed.

Source code in src/buckethead/storage/lifecycle.py
def install_atexit(on_exit: Callable[[], None]) -> Callable[[], None]:
    """Register `on_exit` as an atexit hook. Returns a teardown closure.

    atexit is the last-resort safety net for sys.exit() paths that don't
    involve a signal. Signal paths should not rely on atexit running —
    on some signals (SIG_DFL termination) atexit is bypassed.
    """
    atexit.register(on_exit)

    def teardown() -> None:
        atexit.unregister(on_exit)

    return teardown