Skip to content

buckethead.sharing

File-sharing surface. Construct a ShareConfig pointing at a public-read or presigned share bucket, pass it as BucketSQLite(..., share_config=...), then use bh.shares.share() / bh.shares.list_shares() / bh.shares.unshare() / bh.shares.sweep_expired().

Share buckets are a separate R2/S3 bucket from the primary snapshot bucket; they hold copies of FileStore blobs under stable URL-friendly keys. Provision one via buckethead provision share-bucket.

One-liner for embedders

Once a project has been through buckethead provision share-bucket --project <name>, library code can collapse sharing setup to:

from buckethead import BucketSQLite

bh = BucketSQLite(project="my-project")
bh.start()
result = bh.shares.share(bh_key)

BucketSQLite(project=...) reads the share bucket from ~/.config/buckethead/config.toml and pulls credentials from the configured secret store — the same conventions the buckethead shares CLI uses. When a project has no share bucket attached, share_config stays None and bh.shares raises as if no sharing were configured.

Building a ShareConfig explicitly

If you need a ShareConfig on its own (e.g. to pass around) use ShareConfig.from_project:

from buckethead import ShareConfig

share_cfg = ShareConfig.from_project("my-project")

It raises KeyError when the project has no share bucket and ValueError when the secret-store vault can't be resolved from args, env, or user config. Override the defaults with secret_store=, op_vault=, op_title_prefix=.

buckethead.sharing

File-sharing service (private/public share buckets).

The Typer app for buckethead shares lives at buckethead.sharing.cli; it is imported lazily there to avoid pulling :mod:buckethead.core into the import graph of every consumer of :class:SharingService.

ShareBucketConfig

Bases: BaseModel

Subset of BucketConfig that BucketClient actually reads. Decoupled so we don't have to satisfy BucketConfig's other fields (key, branch, files_prefix) when building a share-bucket client.

ShareConfig

Bases: BaseModel

Where shares go and how their URLs are rendered.

public must match how the share bucket was provisioned: if True, the bucket has public-read access and URLs are rendered via public_base_url. If False, URLs are sig-v4 presigned GETs.

from_project classmethod

from_project(
    project: str,
    *,
    secret_store: str | None = None,
    op_vault: str | None = None,
    op_title_prefix: str | None = None,
) -> Self

Compose a ShareConfig from the BH user config + secret store.

Resolves the share bucket from ~/.config/buckethead/config.toml (via UserConfig.share_bucket_for_project) and pulls share-bucket credentials from the configured secret store under <op_title_prefix> <project> Share.

Defaults for secret_store / op_vault / op_title_prefix come from BucketHeadSettings().

Raises

KeyError When project has no share bucket attached (run buckethead provision share-bucket --project <project> first). ValueError When op_vault can't be resolved from args, settings, or env.

Source code in src/buckethead/sharing/service.py
@classmethod
def from_project(
    cls,
    project: str,
    *,
    secret_store: str | None = None,
    op_vault: str | None = None,
    op_title_prefix: str | None = None,
) -> Self:
    """Compose a `ShareConfig` from the BH user config + secret store.

    Resolves the share bucket from `~/.config/buckethead/config.toml`
    (via `UserConfig.share_bucket_for_project`) and pulls share-bucket
    credentials from the configured secret store under
    ``<op_title_prefix> <project> Share``.

    Defaults for `secret_store` / `op_vault` / `op_title_prefix` come
    from `BucketHeadSettings()`.

    Raises
    ------
    KeyError
        When `project` has no share bucket attached (run
        ``buckethead provision share-bucket --project <project>`` first).
    ValueError
        When `op_vault` can't be resolved from args, settings, or env.
    """
    from ..config.settings import BucketHeadSettings
    from ..config.user import UserConfig
    from ..provisioning.factories import make_secret_store

    settings = BucketHeadSettings()
    resolved_store = secret_store or settings.secret_store
    resolved_vault = op_vault or settings.onepassword.vault
    if not resolved_vault:
        raise ValueError(
            "op_vault is not set. Pass op_vault=, set "
            "BUCKETHEAD_ONEPASSWORD__VAULT, or configure it under [env] "
            "in ~/.config/buckethead/config.toml."
        )
    resolved_prefix = op_title_prefix or settings.onepassword.title_prefix

    cfg = UserConfig.load()
    share_bucket = cfg.share_bucket_for_project(project)
    if share_bucket is None:
        raise KeyError(
            f"project {project!r} has no share bucket. Run "
            f"`buckethead provision share-bucket --project {project}` first."
        )
    mode = cfg.share_mode_for_project(project) or "public"
    public_base_url = cfg.share_public_base_url_for_project(project)

    store = make_secret_store(resolved_store, vault=resolved_vault)
    share_title = f"{resolved_prefix} {project} Share"
    access_key_id = store.read(f"op://{resolved_vault}/{share_title}/access-key-id")
    secret_access_key = store.read(
        f"op://{resolved_vault}/{share_title}/secret-access-key"
    )
    hostname = store.read(f"op://{resolved_vault}/{share_title}/hostname")

    return cls(
        bucket=share_bucket,
        endpoint_url=hostname,
        region="auto",
        access_key_id=access_key_id,
        secret_access_key=SecretStr(secret_access_key),
        public=(mode == "public"),
        public_base_url=public_base_url,
    )

SharingService

SharingService(
    *,
    filestore: FileStore,
    share_bucket: BucketClient,
    share_config: ShareConfig,
    connection: Connection,
)

Runtime API for file sharing. Attached to BucketSQLite as bh.shares when a ShareConfig is passed in.

Source code in src/buckethead/sharing/service.py
def __init__(
    self,
    *,
    filestore: FileStore,
    share_bucket: BucketClient,
    share_config: ShareConfig,
    connection: sqlite3.Connection,
) -> None:
    self._filestore = filestore
    self._share_bucket = share_bucket
    self._config = share_config
    self._conn = connection
    self._init_schema()

share

share(
    bh_key: str,
    *,
    share_name: str | None = None,
    ttl_days: int | None = None,
) -> ShareResult

Copy bh_key into the share bucket and return the URL.

Idempotent per (bh_key, slug): calling again with the same share_name (or derived slug) refreshes expires_at and returns a fresh URL without re-uploading bytes if the object is present in the share bucket.

Pass ttl_days=0 to disable expiry (public mode only — protected URLs are still hard-capped at 7 days by sig-v4).

Source code in src/buckethead/sharing/service.py
def share(
    self,
    bh_key: str,
    *,
    share_name: str | None = None,
    ttl_days: int | None = None,
) -> ShareResult:
    """Copy `bh_key` into the share bucket and return the URL.

    Idempotent per `(bh_key, slug)`: calling again with the same
    `share_name` (or derived slug) refreshes `expires_at` and returns
    a fresh URL without re-uploading bytes if the object is present
    in the share bucket.

    Pass ``ttl_days=0`` to disable expiry (public mode only — protected
    URLs are still hard-capped at 7 days by sig-v4).
    """
    meta = self._filestore.metadata(bh_key)
    if meta is None:
        raise FileNotFoundError(f"no filestore entry for bh-key {bh_key}")

    effective_ttl_days = (
        ttl_days if ttl_days is not None else self._config.default_ttl_days
    )
    ext = _extract_ext(meta.filename)
    slug_source = (
        share_name if share_name is not None else (meta.filename or bh_key)
    )
    slug = _slugify(slug_source)
    hash_prefix = bh_key[:8]
    share_key = _compose_share_key(slug=slug, hash_prefix=hash_prefix, ext=ext)

    # Upload bytes to the share bucket if the object isn't already
    # there — avoids a re-upload on idempotent reshare.
    if not self._share_bucket.exists(share_key):
        data = self._filestore.get(bh_key)
        if not isinstance(data, bytes):
            raise TypeError(
                "FileStore.get returned non-bytes when dest was omitted"
            )
        self._share_bucket.put_bytes(data, share_key)

    now = time.time()
    expires_at: float | None
    if effective_ttl_days == 0 and self._config.public:
        expires_at = None  # "live forever until unshared"
    else:
        # Protected mode: clamp to 7 days. Public mode: honor caller.
        days = effective_ttl_days if effective_ttl_days > 0 else 7
        if not self._config.public:
            max_days = _PRESIGN_MAX_SECONDS // 86400
            days = min(days, max_days)
        expires_at = now + days * 86400

    mode: Mode = "public" if self._config.public else "protected"

    self._conn.execute(
        "INSERT OR REPLACE INTO filestore_shares "
        "(share_key, bh_key, slug, hash_prefix, mode, created_at, expires_at) "
        "VALUES (?, ?, ?, ?, ?, ?, ?)",
        (share_key, bh_key, slug, hash_prefix, mode, now, expires_at),
    )
    self._conn.commit()

    url = self._render_url(share_key, expires_at, now, mode)
    return ShareResult(
        bh_key=bh_key,
        share_key=share_key,
        url=url,
        mode=mode,
        expires_at=expires_at,
    )

unshare

unshare(key: str, *, all_for_bh_key: bool = False) -> int

Delete a share by share_key OR bh_key.

When key is a bh_key with multiple shares, raises ValueError unless all_for_bh_key=True. Returns the number of shares removed.

Source code in src/buckethead/sharing/service.py
def unshare(self, key: str, *, all_for_bh_key: bool = False) -> int:
    """Delete a share by share_key OR bh_key.

    When `key` is a bh_key with multiple shares, raises
    ValueError unless `all_for_bh_key=True`. Returns the number of
    shares removed.
    """
    rows = self._conn.execute(
        "SELECT share_key FROM filestore_shares WHERE share_key = ?",
        (key,),
    ).fetchall()
    if rows:
        return self._delete_share_keys([r[0] for r in rows])

    rows = self._conn.execute(
        "SELECT share_key FROM filestore_shares WHERE bh_key = ?",
        (key,),
    ).fetchall()
    if not rows:
        return 0
    if len(rows) > 1 and not all_for_bh_key:
        raise ValueError(
            f"bh-key {key} matches {len(rows)} shares; pass "
            f"all_for_bh_key=True to delete them all"
        )
    return self._delete_share_keys([r[0] for r in rows])

list_shares

list_shares() -> list[ShareResult]

All live shares. Expired public shares appear until swept.

Source code in src/buckethead/sharing/service.py
def list_shares(self) -> list[ShareResult]:
    """All live shares. Expired public shares appear until swept."""
    now = time.time()
    rows = self._conn.execute(
        "SELECT share_key, bh_key, slug, hash_prefix, mode, "
        "created_at, expires_at "
        "FROM filestore_shares "
        "ORDER BY created_at DESC"
    ).fetchall()
    results: list[ShareResult] = []
    for share_key, bh_key, _slug, _hp, mode, _created, expires_at in rows:
        url = self._render_url(share_key, expires_at, now, mode)
        results.append(
            ShareResult(
                bh_key=bh_key,
                share_key=share_key,
                url=url,
                mode=mode,
                expires_at=expires_at,
            )
        )
    return results

sweep_expired

sweep_expired() -> int

Delete every share whose expires_at is in the past. Returns the number of shares removed.

Source code in src/buckethead/sharing/service.py
def sweep_expired(self) -> int:
    """Delete every share whose `expires_at` is in the past. Returns
    the number of shares removed."""
    now = time.time()
    rows = self._conn.execute(
        "SELECT share_key FROM filestore_shares "
        "WHERE expires_at IS NOT NULL AND expires_at < ?",
        (now,),
    ).fetchall()
    return self._delete_share_keys([r[0] for r in rows])