Skip to content

buckethead.observability.profiling

Optional profiling hooks — I/O counters, memray, pyinstrument.

buckethead.observability.profiling

Optional profiling hooks for BucketHead.

Three opt-in layers:

  • I/O counters — in-house, zero external deps. Tracks bytes and op counts through BucketClient. Cheap enough to always enable in production. Answers the 'how many R2 Class-A ops am I generating' question that drives R2 cost.
  • Memory — memray, lazy-imported when config.memory is True. Wraps each flush in a memray.Tracker; emits a .bin per cycle.
  • CPU — pyinstrument, lazy-imported when config.cpu is True. Wraps each flush; emits an HTML flame graph per cycle.

Only I/O counters are implemented fully in v1 — the memray and pyinstrument hooks are ready for wiring but ship as no-ops unless the optional 'profiling' extra is installed (pip install buckethead[profiling]).

ProfilingConfig

Bases: BaseModel

Which profiling layers to enable and where their output goes.

IOCounters

IOCounters()

Tracks bucket operation counts, byte totals, and wall-time totals.

Thread-safe for the patterns BucketClient uses (single writer per operation; no lock needed because Python's GIL + atomic dict operations cover us). If multiple threads share a client and collide on the same op name, the count is still correct.

Source code in src/buckethead/observability/profiling.py
def __init__(self) -> None:
    self.bytes_up: int = 0
    self.bytes_down: int = 0
    self.ops: dict[str, int] = {}
    self.total_time_s: dict[str, float] = {}

write_summary

write_summary(
    config: ProfilingConfig,
    counters: IOCounters,
    *,
    duration_s: float,
) -> Path

Write an io-summary JSON to the configured output directory.

Returns the path written. Creates the directory if it doesn't exist.

Source code in src/buckethead/observability/profiling.py
def write_summary(
    config: ProfilingConfig,
    counters: IOCounters,
    *,
    duration_s: float,
) -> Path:
    """Write an io-summary JSON to the configured output directory.

    Returns the path written. Creates the directory if it doesn't exist.
    """
    config.output_dir.mkdir(parents=True, exist_ok=True)
    path = config.output_dir / f"io-summary-{int(time.time())}-{id(counters)}.json"
    payload = {
        "duration_s": duration_s,
        "io_counters": counters.as_dict(),
    }
    path.write_text(json.dumps(payload, indent=2))
    return path

write_flush_record

write_flush_record(
    config: ProfilingConfig,
    *,
    flush_index: int,
    duration_s: float,
    bytes_written: int,
    counters: IOCounters,
) -> Path

Append a per-flush record to flushes.jsonl in the output directory.

Each line is a self-contained JSON object so the stress report can reconstruct a time series without re-instrumenting anything.

Source code in src/buckethead/observability/profiling.py
def write_flush_record(
    config: ProfilingConfig,
    *,
    flush_index: int,
    duration_s: float,
    bytes_written: int,
    counters: IOCounters,
) -> Path:
    """Append a per-flush record to flushes.jsonl in the output directory.

    Each line is a self-contained JSON object so the stress report can
    reconstruct a time series without re-instrumenting anything.
    """
    config.output_dir.mkdir(parents=True, exist_ok=True)
    path = config.output_dir / "flushes.jsonl"
    record = {
        "flush_index": flush_index,
        "timestamp": time.time(),
        "duration_s": duration_s,
        "bytes_written": bytes_written,
        "io_counters": counters.as_dict(),
    }
    with path.open("a") as f:
        f.write(json.dumps(record) + "\n")
    return path

memory_tracker

memory_tracker(config: ProfilingConfig, flush_index: int)

Context manager that writes a memray snapshot for one flush.

Source code in src/buckethead/observability/profiling.py
def memory_tracker(config: ProfilingConfig, flush_index: int):  # pragma: no cover
    """Context manager that writes a memray snapshot for one flush."""
    if not config.memory:
        return _NullContext()
    try:
        import memray  # type: ignore[import-not-found]
    except ImportError:
        return _NullContext()
    config.output_dir.mkdir(parents=True, exist_ok=True)
    path = config.output_dir / f"memray-flush-{flush_index}.bin"
    return memray.Tracker(path)

cpu_profiler

cpu_profiler(config: ProfilingConfig, flush_index: int)

Context manager that writes a pyinstrument HTML for one flush.

Source code in src/buckethead/observability/profiling.py
def cpu_profiler(config: ProfilingConfig, flush_index: int):  # pragma: no cover
    """Context manager that writes a pyinstrument HTML for one flush."""
    if not config.cpu:
        return _NullContext()
    try:
        import pyinstrument  # type: ignore[import-not-found]
    except ImportError:
        return _NullContext()
    config.output_dir.mkdir(parents=True, exist_ok=True)
    return _PyInstrumentSession(
        pyinstrument, config.output_dir, flush_index, config.sample_interval_ms
    )