API Reference

class streamstore.S2(auth_token: str, request_timeout: timedelta = datetime.timedelta(seconds=5), max_retries: int | None = 3, retries_timeout: timedelta = datetime.timedelta(seconds=10), enable_append_retries: bool = True, cloud: Cloud = Cloud.AWS)

Async client for interacting with gRPC API of S2.

Parameters:
  • auth_token – Authentication token generated from S2 dashboard.

  • request_timeout – Timeout for gRPC requests made by the client. Default value is 5 seconds.

  • max_retries – Maximum number of retries for a gRPC request. Default value is 3.

  • retries_timeout – Maximum total time for all retries of a gRPC request. Default value is 10 seconds.

  • enable_append_retries – Enable retries for appends i.e for both Stream.append() and Stream.append_session(). Default value is True.

  • cloud – Cloud in which the S2 service runs. Currently, only AWS is supported.

async close() None

Close all open connections to S2 service endpoints.

Tip

S2 supports async context manager protocol, so you could also do the following instead of explicitly closing:

async with S2(..) as s2:
    ..
async create_basin(name: str, default_stream_storage_class: StorageClass | None = None, default_stream_retention_age: timedelta | None = None) BasinInfo

Create a basin.

Parameters:
  • name – Name of the basin.

  • default_stream_storage_class – Default storage class for streams of this basin.

  • default_stream_retention_age – Default threshold for automatic trimming of records in the streams of this basin. If not specified, streams will have infinite retention.

Note

name must be globally unique and must be between 8 and 48 characters, comprising lowercase letters, numbers and hyphens. It cannot begin or end with a hyphen.

basin(name: str) Basin

Get a Basin object that can be used for performing basin operations.

Parameters:

name – Name of the basin.

Note

The basin must have been created already, else the operations will fail.

Tip

async with S2(..) as s2:
    basin = s2.basin("your-basin-name")

S2 implements the getitem magic method, so you could also do the following instead:

async with S2(..) as s2:
    basin = s2["your-basin-name"]
async list_basins(prefix: str = '', start_after: str = '', limit: int | None = None) Page[BasinInfo]

List basins.

Parameters:
  • prefix – List only those that begin with this value.

  • start_after – List only those that lexicographically start after this value, which can be the name of the last item from previous page, to continue from there. It must be greater than or equal to the prefix if specified.

  • limit – Number of items to return in one page. Maximum number of items that can be returned in one page is 1000.

async delete_basin(name: str) None

Delete a basin.

Parameters:

name – Name of the basin.

Note

Basin deletion is asynchronous, and may take a few minutes to complete.

async get_basin_config(name: str) BasinConfig

Get the current configuration of a basin.

Parameters:

name – Name of the basin.

async reconfigure_basin(name: str, default_stream_storage_class: StorageClass | None = None, default_stream_retention_age: timedelta | None = None) BasinConfig

Modify the configuration of a basin.

Parameters:
  • name – Name of the basin.

  • default_stream_storage_class – Default storage class for streams of this basin.

  • default_stream_retention_age – Default threshold for automatic trimming of records in the streams of this basin. If not specified, streams will have infinite retention.

Note

Modifiying the default stream-related configuration doesn’t affect already existing streams; it only applies to new streams created hereafter.

exception streamstore.S2Error

Base class for all S2 related exceptions.

class streamstore._client.Basin

Caution

Returned by S2.basin(). Do not instantiate directly.

property name: str

Basin name.

async create_stream(name: str, storage_class: StorageClass | None = None, retention_age: timedelta | None = None) StreamInfo

Create a stream.

Parameters:
  • name – Name of the stream.

  • storage_class – Storage class for this stream.

  • retention_age – Thresold for automatic trimming of records in this stream. If not specified, the stream will have infinite retention.

Note

name must be unique within the basin. It can be an arbitrary string upto 512 characters. Backslash (/) is recommended as a delimiter for hierarchical naming.

stream(name: str) Stream

Get a Stream object that can be used for performing stream operations.

Parameters:

name – Name of the stream.

Note

The stream must have been created already, else the operations will fail.

Tip

async with S2(..) as s2:
    stream = s2.basin("your-basin-name").stream("your-stream-name")

Basin implements the getitem magic method, so you could also do the following instead:

async with S2(..) as s2:
    stream = s2["your-basin-name"]["your-stream-name"]
async list_streams(prefix: str = '', start_after: str = '', limit: int | None = None) Page[StreamInfo]

List streams.

Parameters:
  • prefix – List only those that begin with this value.

  • start_after – List only those that lexicographically start after this value, which can be the name of the last item from previous page, to continue from there. It must be greater than or equal to the prefix if specified.

  • limit – Number of items to return in one page. Maximum number of items that can be returned in one page is 1000.

async delete_stream(name: str) None

Delete a stream.

Parameters:

name – Name of the stream.

Note

Stream deletion is asynchronous, and may take a few minutes to complete.

async get_stream_config(name: str) StreamConfig

Get the current configuration of a stream.

Parameters:

name – Name of the stream.

async reconfigure_stream(name: str, storage_class: StorageClass | None = None, retention_age: timedelta | None = None) StreamConfig

Modify the configuration of a stream.

Parameters:
  • name – Name of the stream.

  • storage_class – Storage class for this stream.

  • retention_age – Thresold for automatic trimming of records in this stream. If not specified, the stream will have infinite retention.

Note

Modifying storage_class will take effect only when this stream has been inactive for 10 minutes. This will become a live migration in future.

class streamstore._client.Stream

Caution

Returned by Basin.stream(). Do not instantiate directly.

property name: str

Stream name.

async check_tail() int
Returns:

Sequence number that will be assigned to the next record on a stream.

async append(input: AppendInput) AppendOutput

Append a batch of records to a stream.

async append_session(inputs: AsyncIterable[AppendInput]) AsyncIterable[AppendOutput]

Append batches of records to a stream continuously, while guaranteeing pipelined inputs are processed in order.

Yields:

AppendOutput for each corresponding AppendInput.

Returns:

If enable_append_retries=False in S2, and if processing any of the AppendInput fails.

(or)

If enable_append_retries=True in S2, and if retry budget gets exhausted after trying to recover from failures.

async fence(token: bytes) None

Fence off appends to a stream by setting a fencing token.

Parameters:

token – Fencing token. Cannot exceed 16 bytes. If empty, clears the previously set token.

Note

This is a helper method, which creates a fence command record via CommandRecord.fence() and passes it as the input to Stream.append().

async trim(desired_first_seq_num: int) None

Explicitly trim records in a stream.

Parameters:

desired_first_seq_num – Sequence number for the first record to exist after trimming preceeding records in the stream.

Note

This is a helper method, which creates a trim command record via CommandRecord.trim() and passes it as the input to Stream.append().

Trimming is eventually consistent, and trimmed records may be visible for a brief period.

If desired_first_seq_num was smaller than the sequence number for the first existing record in the stream, trimming doesn’t happen.

async read(start_seq_num: int, limit: ReadLimit | None = None, ignore_command_records: bool = False) list[SequencedRecord] | FirstSeqNum | NextSeqNum

Read a batch of records from a stream.

Parameters:
  • start_seq_num – Starting sequence number (inclusive).

  • limit – Number of records to return, up to a maximum of 1000 or 1MiB of metered_bytes().

  • ignore_command_records – Filters out command records if present from the batch.

Returns:

Batch of sequenced records. It can be empty only if limit was provided, and the first record that could have been returned violated the limit.

(or)

Sequence number for the first record on this stream, in case the provided start_seq_num was smaller.

(or)

Sequence number for the next record on this stream, in case the provided start_seq_num was larger.

async read_session(start_seq_num: int, limit: ReadLimit | None = None, ignore_command_records: bool = False) AsyncIterable[list[SequencedRecord] | FirstSeqNum | NextSeqNum]

Read batches of records from a stream continuously.

Parameters:
  • start_seq_num – Starting sequence number (inclusive).

  • limit – Number of records to return, up to a maximum of 1000 or 1MiB of metered_bytes().

  • ignore_command_records – Filters out command records if present from the batch.

Note

With a session, you are able to read in a streaming fashion. If a limit was not provided and the end of the stream is reached, the session goes into real-time tailing mode and will yield records as they are appended to the stream.

Yields:

Batch of sequenced records. It can be empty only if limit was provided, and the first record that could have been returned violated the limit.

(or)

Sequence number for the first record on this stream, in case the provided start_seq_num was smaller.

(or)

Sequence number for the next record on this stream, in case the provided start_seq_num was larger.

Returns:

If limit was provided, and if it was met or the end of the stream was reached before meeting it.

(or)

If previous yield was not a batch of sequenced records.

class streamstore.schemas.Record(body: bytes, headers: list[tuple[bytes, bytes]] = [])

Record to be appended to a stream.

body: bytes

Body of this record.

headers: list[tuple[bytes, bytes]]

Series of name-value pairs for this record.

class streamstore.schemas.CommandRecord

Helper class for creating command records.

static fence(token: bytes) Record

Create a fence command record.

static trim(desired_first_seq_num: int) Record

Create a trim command record.

class streamstore.schemas.AppendInput(records: list[Record], match_seq_num: int | None = None, fencing_token: bytes | None = None)

Used in the parameters to Stream.append() and Stream.append_session().

records: list[Record]

Batch of records to append atomically, which must contain at least one record, and no more than 1000. The total size of the batch must not exceed 1MiB of metered_bytes().

match_seq_num: int | None

Enforce that the sequence number issued to the first record in the batch matches this value.

fencing_token: bytes | None

Enforce a fencing token, which must have been previously set by a fence command record.

class streamstore.schemas.AppendOutput(start_seq_num: int, end_seq_num: int, next_seq_num: int)

Returned from Stream.append().

(or)

Yielded from Stream.append_session().

start_seq_num: int

Sequence number of first record appended.

end_seq_num: int

Sequence number of last record appended + 1. end_seq_num - start_seq_num will be the number of records in the batch.

next_seq_num: int

Sequence number of last durable record on the stream + 1. This can be greater than end_seq_num in case of concurrent appends.

class streamstore.schemas.ReadLimit(count: int | None = None, bytes: int | None = None)

Used in the parameters to Stream.read() and Stream.read_session()

If both count and bytes are specified, either limit may be hit.

count: int | None

Number of records.

bytes: int | None

Cumulative size of records calculated using metered_bytes().

class streamstore.schemas.SequencedRecord(seq_num: int, body: bytes, headers: list[tuple[bytes, bytes]])

Record read from a stream.

seq_num: int

Sequence number for this record.

body: bytes

Body of this record.

headers: list[tuple[bytes, bytes]]

Series of name-value pairs for this record.

class streamstore.schemas.FirstSeqNum(value: int)
class streamstore.schemas.NextSeqNum(value: int)
class streamstore.schemas.Page(items: list[T], has_more: bool)

Page of items.

items: list[T]

List of items of any type T.

has_more: bool

If True, it means that there are more pages.

enum streamstore.schemas.BasinState(value)

Current state of a basin.

Valid values are as follows:

UNSPECIFIED = <BasinState.UNSPECIFIED: 0>
ACTIVE = <BasinState.ACTIVE: 1>
CREATING = <BasinState.CREATING: 2>
DELETING = <BasinState.DELETING: 3>
class streamstore.schemas.BasinInfo(name: str, scope: str, cell: str, state: BasinState)

Basin information.

name: str

Basin name.

scope: str

Basin scope.

cell: str

Cell assignment.

state: BasinState

Basin state.

class streamstore.schemas.StreamInfo(name: str, created_at: datetime, deleted_at: datetime | None)

Stream information.

name: str

Stream name.

created_at: datetime

Creation time.

deleted_at: datetime | None

Deletion time, if this stream is being deleted.

enum streamstore.schemas.StorageClass(value)

Storage class for recent appends.

Valid values are as follows:

UNSPECIFIED = <StorageClass.UNSPECIFIED: 0>

UNSPECIFIED gets overridden to EXPRESS.

STANDARD = <StorageClass.STANDARD: 1>

Offers end-to-end latencies under 500 ms.

EXPRESS = <StorageClass.EXPRESS: 2>

Offers end-to-end latencies under 50 ms.

class streamstore.schemas.StreamConfig(storage_class: StorageClass, retention_age: timedelta)

Current configuration of a stream.

storage_class: StorageClass

Storage class for this stream.

retention_age: timedelta

Thresold for automatic trimming of records in this stream.

class streamstore.schemas.BasinConfig(default_stream_config: StreamConfig)

Current configuration of a basin.

default_stream_config: StreamConfig

Default configuration for streams in this basin.

enum streamstore.schemas.Cloud(value)

Cloud in which the S2 service runs.

Valid values are as follows:

AWS = <Cloud.AWS: 1>
streamstore.schemas.metered_bytes(records: Iterable[Record | SequencedRecord]) int

Each record is metered using the following formula:

8 + 2 * len(headers)
+ sum((len(name) + len(value)) for (name, value) in headers)
+ len(body)