API Reference

class streamstore.S2(auth_token: str, endpoints: Endpoints | None = None, request_timeout: timedelta = datetime.timedelta(seconds=5), max_retries: int = 3, enable_append_retries: bool = True, enable_compression: bool = False)[source]

Async client for interacting with gRPC API of S2.

Parameters:
  • auth_token – Authentication token generated from S2 dashboard.

  • endpoints – S2 endpoints. If None, public endpoints for S2 service running in AWS cloud will be used.

  • request_timeout – Timeout for gRPC requests made by the client. Default value is 5 seconds.

  • max_retries – Maximum number of retries for a gRPC request. Default value is 3.

  • enable_append_retries – Enable retries for appends i.e for both Stream.append() and Stream.append_session(). Default value is True.

  • enable_compression – Enable compression (Gzip) for Stream.append(), Stream.append_session(), Stream.read(), and Stream.read_session(). Default value is False.

async close() None[source]

Close all open connections to S2 service endpoints.

Tip

S2 supports async context manager protocol, so you can also do the following instead of explicitly closing:

async with S2(..) as s2:
    ..
async create_basin(name: str, default_stream_storage_class: StorageClass | None = None, default_stream_retention_age: timedelta | None = None, create_stream_on_append: bool = False) BasinInfo[source]

Create a basin.

Parameters:
  • name – Name of the basin.

  • default_stream_storage_class – Default storage class for streams of this basin.

  • default_stream_retention_age – Default threshold for automatic trimming of records in the streams of this basin. If not specified, streams will have infinite retention.

  • create_stream_on_append – Create stream on append if it doesn’t exist.

Note

name must be globally unique and must be between 8 and 48 characters, comprising lowercase letters, numbers and hyphens. It cannot begin or end with a hyphen.

basin(name: str) Basin[source]

Get a Basin object that can be used for performing basin operations.

Parameters:

name – Name of the basin.

Note

The basin must have been created already, else the operations will fail.

Tip

async with S2(..) as s2:
    basin = s2.basin("your-basin-name")

S2 implements the getitem magic method, so you can also do the following instead:

async with S2(..) as s2:
    basin = s2["your-basin-name"]
async list_basins(prefix: str = '', start_after: str = '', limit: int | None = None) Page[BasinInfo][source]

List basins.

Parameters:
  • prefix – List only those that begin with this value.

  • start_after – List only those that lexicographically start after this value, which can be the name of the last item from previous page, to continue from there. It must be greater than or equal to the prefix if specified.

  • limit – Number of items to return in one page. Maximum number of items that can be returned in one page is 1000.

async delete_basin(name: str) None[source]

Delete a basin.

Parameters:

name – Name of the basin.

Note

Basin deletion is asynchronous, and may take a few minutes to complete.

async get_basin_config(name: str) BasinConfig[source]

Get the current configuration of a basin.

Parameters:

name – Name of the basin.

async reconfigure_basin(name: str, default_stream_storage_class: StorageClass | None = None, default_stream_retention_age: timedelta | None = None, create_stream_on_append: bool = False) BasinConfig[source]

Modify the configuration of a basin.

Parameters:
  • name – Name of the basin.

  • default_stream_storage_class – Default storage class for streams of this basin.

  • default_stream_retention_age – Default threshold for automatic trimming of records in the streams of this basin. If not specified, streams will have infinite retention.

  • create_stream_on_append – Create stream on append if it doesn’t exist.

Note

Modifiying the default stream-related configuration doesn’t affect already existing streams; it only applies to new streams created hereafter.

class streamstore.Basin[source]

Caution

Returned by S2.basin(). Do not instantiate directly.

property name: str

Basin name.

async create_stream(name: str, storage_class: StorageClass | None = None, retention_age: timedelta | None = None) StreamInfo[source]

Create a stream.

Parameters:
  • name – Name of the stream.

  • storage_class – Storage class for this stream.

  • retention_age – Thresold for automatic trimming of records in this stream. If not specified, the stream will have infinite retention.

Note

name must be unique within the basin. It can be an arbitrary string upto 512 characters. Backslash (/) is recommended as a delimiter for hierarchical naming.

stream(name: str) Stream[source]

Get a Stream object that can be used for performing stream operations.

Parameters:

name – Name of the stream.

Note

The stream must have been created already, else the operations will fail.

Tip

async with S2(..) as s2:
    stream = s2.basin("your-basin-name").stream("your-stream-name")

Basin implements the getitem magic method, so you can also do the following instead:

async with S2(..) as s2:
    stream = s2["your-basin-name"]["your-stream-name"]
async list_streams(prefix: str = '', start_after: str = '', limit: int | None = None) Page[StreamInfo][source]

List streams.

Parameters:
  • prefix – List only those that begin with this value.

  • start_after – List only those that lexicographically start after this value, which can be the name of the last item from previous page, to continue from there. It must be greater than or equal to the prefix if specified.

  • limit – Number of items to return in one page. Maximum number of items that can be returned in one page is 1000.

async delete_stream(name: str) None[source]

Delete a stream.

Parameters:

name – Name of the stream.

Note

Stream deletion is asynchronous, and may take a few minutes to complete.

async get_stream_config(name: str) StreamConfig[source]

Get the current configuration of a stream.

Parameters:

name – Name of the stream.

async reconfigure_stream(name: str, storage_class: StorageClass | None = None, retention_age: timedelta | None = None) StreamConfig[source]

Modify the configuration of a stream.

Parameters:
  • name – Name of the stream.

  • storage_class – Storage class for this stream.

  • retention_age – Thresold for automatic trimming of records in this stream. If not specified, the stream will have infinite retention.

Note

Modifying storage_class will take effect only when this stream has been inactive for 10 minutes. This will become a live migration in future.

class streamstore.Stream[source]

Caution

Returned by Basin.stream(). Do not instantiate directly.

property name: str

Stream name.

async check_tail() int[source]
Returns:

Sequence number that will be assigned to the next record on a stream.

async append(input: AppendInput) AppendOutput[source]

Append a batch of records to a stream.

async append_session(inputs: AsyncIterable[AppendInput]) AsyncIterable[AppendOutput][source]

Append batches of records to a stream continuously, while guaranteeing pipelined inputs are processed in order.

Tip

You can use append_inputs_gen() for automatic batching of records instead of explicitly preparing and providing batches of records.

Yields:

AppendOutput for each corresponding AppendInput.

Returns:

If enable_append_retries=False in S2, and if processing any of the AppendInput fails.

(or)

If enable_append_retries=True in S2, and if retry budget gets exhausted after trying to recover from failures.

async read(start_seq_num: int, limit: ReadLimit | None = None, ignore_command_records: bool = False) list[SequencedRecord] | FirstSeqNum | NextSeqNum[source]

Read a batch of records from a stream.

Parameters:
  • start_seq_num – Starting sequence number (inclusive).

  • limit – Number of records to return, up to a maximum of 1000 or 1MiB of metered_bytes().

  • ignore_command_records – Filters out command records if present from the batch.

Returns:

Batch of sequenced records. It can be empty only if limit was provided, and the first record that could have been returned violated the limit.

(or)

Sequence number for the first record on this stream, if the provided start_seq_num was smaller.

(or)

Sequence number for the next record on this stream, if the provided start_seq_num was larger.

async read_session(start_seq_num: int, limit: ReadLimit | None = None, ignore_command_records: bool = False) AsyncIterable[list[SequencedRecord] | FirstSeqNum | NextSeqNum][source]

Read batches of records from a stream continuously.

Parameters:
  • start_seq_num – Starting sequence number (inclusive).

  • limit – Number of records to return, up to a maximum of 1000 or 1MiB of metered_bytes().

  • ignore_command_records – Filters out command records if present from the batch.

Note

With a session, you are able to read in a streaming fashion. If a limit was not provided and the end of the stream is reached, the session goes into real-time tailing mode and will yield records as they are appended to the stream.

Yields:

Batch of sequenced records. It can be empty only if limit was provided, and the first record that could have been returned violated the limit.

(or)

Sequence number for the first record on this stream, if the provided start_seq_num was smaller.

(or)

Sequence number for the next record on this stream, if the provided start_seq_num was larger.

Returns:

If limit was provided, and if it was met or the end of the stream was reached before meeting it.

(or)

If previous yield was not a batch of sequenced records.

class streamstore.schemas.Record(body: bytes, headers: list[tuple[bytes, bytes]] = [])[source]

Record to be appended to a stream.

body: bytes

Body of this record.

headers: list[tuple[bytes, bytes]]

Series of name-value pairs for this record.

class streamstore.schemas.AppendInput(records: list[Record], match_seq_num: int | None = None, fencing_token: bytes | None = None)[source]

Used in the parameters to Stream.append() and Stream.append_session().

records: list[Record]

Batch of records to append atomically, which must contain at least one record, and no more than 1000. The size of the batch must not exceed 1MiB of metered_bytes().

match_seq_num: int | None

Enforce that the sequence number issued to the first record in the batch matches this value.

fencing_token: bytes | None

Enforce a fencing token, which must have been previously set by a fence command record.

class streamstore.schemas.AppendOutput(start_seq_num: int, end_seq_num: int, next_seq_num: int)[source]

Returned from Stream.append().

(or)

Yielded from Stream.append_session().

start_seq_num: int

Sequence number of first record appended.

end_seq_num: int

Sequence number of last record appended + 1. end_seq_num - start_seq_num will be the number of records in the batch.

next_seq_num: int

Sequence number of last durable record on the stream + 1. This can be greater than end_seq_num in case of concurrent appends.

class streamstore.schemas.ReadLimit(count: int | None = None, bytes: int | None = None)[source]

Used in the parameters to Stream.read() and Stream.read_session().

If both count and bytes are specified, either limit may be hit.

count: int | None

Number of records.

bytes: int | None

Cumulative size of records calculated using metered_bytes().

class streamstore.schemas.SequencedRecord(seq_num: int, body: bytes, headers: list[tuple[bytes, bytes]])[source]

Record read from a stream.

seq_num: int

Sequence number for this record.

body: bytes

Body of this record.

headers: list[tuple[bytes, bytes]]

Series of name-value pairs for this record.

class streamstore.schemas.FirstSeqNum(value: int)[source]
class streamstore.schemas.NextSeqNum(value: int)[source]
class streamstore.schemas.Page(items: list[T], has_more: bool)[source]

Page of items.

items: list[T]

List of items of any type T.

has_more: bool

If True, it means that there are more pages.

enum streamstore.schemas.BasinScope(value)[source]

Scope of a basin.

Valid values are as follows:

UNSPECIFIED = <BasinScope.UNSPECIFIED: 0>

UNSPECIFIED defaults to AWS_US_EAST_1.

AWS_US_EAST_1 = <BasinScope.AWS_US_EAST_1: 1>

AWS us-east-1 region.

enum streamstore.schemas.BasinState(value)[source]

Current state of a basin.

Valid values are as follows:

UNSPECIFIED = <BasinState.UNSPECIFIED: 0>
ACTIVE = <BasinState.ACTIVE: 1>
CREATING = <BasinState.CREATING: 2>
DELETING = <BasinState.DELETING: 3>
class streamstore.schemas.BasinInfo(name: str, scope: BasinScope, state: BasinState)[source]

Basin information.

name: str

Basin name.

scope: BasinScope

Basin scope.

state: BasinState

Basin state.

class streamstore.schemas.StreamInfo(name: str, created_at: datetime, deleted_at: datetime | None)[source]

Stream information.

name: str

Stream name.

created_at: datetime

Creation time.

deleted_at: datetime | None

Deletion time, if this stream is being deleted.

enum streamstore.schemas.StorageClass(value)[source]

Storage class for recent appends.

Valid values are as follows:

UNSPECIFIED = <StorageClass.UNSPECIFIED: 0>

UNSPECIFIED defaults to EXPRESS.

STANDARD = <StorageClass.STANDARD: 1>

Offers end-to-end latencies under 500 ms.

EXPRESS = <StorageClass.EXPRESS: 2>

Offers end-to-end latencies under 50 ms.

class streamstore.schemas.StreamConfig(storage_class: StorageClass, retention_age: timedelta)[source]

Current configuration of a stream.

storage_class: StorageClass

Storage class for this stream.

retention_age: timedelta

Thresold for automatic trimming of records in this stream.

class streamstore.schemas.BasinConfig(default_stream_config: StreamConfig, create_stream_on_append: bool)[source]

Current configuration of a basin.

default_stream_config: StreamConfig

Default configuration for streams in this basin.

create_stream_on_append: bool

Create stream on append if it doesn’t exist, using the default stream configuration.

enum streamstore.schemas.Cloud(value)[source]

Cloud in which the S2 service runs.

Valid values are as follows:

AWS = <Cloud.AWS: 1>
class streamstore.schemas.Endpoints[source]

S2 endpoints.

classmethod for_cloud(cloud: Cloud) Endpoints[source]

Construct S2 endpoints for the given cloud.

Parameters:

cloud – Cloud in which the S2 service runs.

class streamstore.utils.CommandRecord[source]

Factory class for creating command records.

static fence(token: bytes) Record[source]

Create a fence command record.

Parameters:

tokenFencing token. Cannot exceed 16 bytes. If empty, clears the previously set token.

static trim(desired_first_seq_num: int) Record[source]

Create a trim command record.

Parameters:

desired_first_seq_num – Sequence number for the first record to exist after trimming preceeding records in the stream.

Note

If desired_first_seq_num was smaller than the sequence number for the first existing record in the stream, trimming doesn’t happen.

streamstore.utils.metered_bytes(records: Iterable[Record | SequencedRecord]) int[source]

Each record is metered using the following formula:

8 + 2 * len(headers)
+ sum((len(name) + len(value)) for (name, value) in headers)
+ len(body)
async streamstore.utils.append_inputs_gen(records: AsyncIterable[Record], match_seq_num: int | None = None, fencing_token: bytes | None = None, max_records_per_batch: int = 1000, max_bytes_per_batch: int = 1048576, max_linger_per_batch: timedelta | None = None) AsyncIterable[AppendInput][source]

Generator function for batching records and yielding AppendInput.

Returned generator object can be used as the parameter to Stream.append_session().

Yields:

AppendInput

Parameters:
  • records – Records that have to be appended to a stream.

  • match_seq_num – If it is not None, it is used in the first yield of AppendInput and is automatically advanced for subsequent yields.

  • fencing_token – Used in each yield of AppendInput.

  • max_records_per_batch – Maximum number of records in each batch.

  • max_bytes_per_batch – Maximum size of each batch calculated using metered_bytes().

  • max_linger_per_batch – Maximum duration for each batch to accumulate records before yielding.

Note

If max_linger_per_batch is None, AppendInput will be yielded only when max_records_per_batch or max_bytes_per_batch is reached.