API Reference

class streamstore.S2(access_token: str, endpoints: Endpoints | None = None, request_timeout: timedelta = datetime.timedelta(seconds=5), max_retries: int = 3, enable_append_retries: bool = True, enable_compression: bool = False)[source]

Async client for interacting with s2.dev.

Parameters:
  • access_token – Access token generated from S2 dashboard.

  • endpoints – S2 endpoints. If not specified, public endpoints for S2 service running in AWS cloud will be used.

  • request_timeout – Timeout for requests made by the client. Default value is 5 seconds.

  • max_retries – Maximum number of retries for a request. Default value is 3.

  • enable_append_retries – Enable retries for appends i.e for both Stream.append() and Stream.append_session(). Default value is True.

  • enable_compression – Enable compression (Gzip) for Stream.append(), Stream.append_session(), Stream.read(), and Stream.read_session(). Default value is False.

async close() None[source]

Close all open connections to S2 service endpoints.

Tip

S2 supports async context manager protocol, so you can also do the following instead of explicitly closing:

async with S2(..) as s2:
    ..
async create_basin(name: str, config: BasinConfig | None = None) BasinInfo[source]

Create a basin.

Parameters:
  • name – Name of the basin.

  • config – Configuration for the basin.

Note

name must be globally unique and must be between 8 and 48 characters, comprising lowercase letters, numbers and hyphens. It cannot begin or end with a hyphen.

basin(name: str) Basin[source]

Get a Basin object that can be used for performing basin operations.

Parameters:

name – Name of the basin.

Note

The basin must have been created already, else the operations will fail.

Tip

async with S2(..) as s2:
    basin = s2.basin("your-basin-name")

S2 implements the getitem magic method, so you can also do the following instead:

async with S2(..) as s2:
    basin = s2["your-basin-name"]
async list_basins(prefix: str = '', start_after: str = '', limit: int = 1000) Page[BasinInfo][source]

List basins.

Parameters:
  • prefix – Filter to basins whose name begins with this prefix.

  • start_after – Filter to basins whose name starts lexicographically after this value.

  • limit – Number of items to return per page, up to a maximum of 1000.

async delete_basin(name: str) None[source]

Delete a basin.

Parameters:

name – Name of the basin.

Note

Basin deletion is asynchronous, and may take a few minutes to complete.

async get_basin_config(name: str) BasinConfig[source]

Get the current configuration of a basin.

Parameters:

name – Name of the basin.

async reconfigure_basin(name: str, config: BasinConfig) BasinConfig[source]

Modify the configuration of a basin.

Parameters:
  • name – Name of the basin.

  • config – Configuration for the basin.

Note

Modifiying the BasinConfig.default_stream_config doesn’t affect already existing streams; it only applies to new streams created hereafter.

async issue_access_token(id: str, scope: AccessTokenScope, expires_at: int | None = None, auto_prefix_streams: bool = False) str[source]

Issue a new access token.

Parameters:
  • id – Access token ID.

  • scope – Access token scope.

  • expires_at – Expiration time in seconds since Unix epoch. If not specified, expiration time of access_token passed to S2 will be used.

  • auto_prefix_streams – Enable auto-prefixing: the specified prefix in AccessTokenScope.streams will be added to stream names in requests and stripped from stream names in responses.

Note

id must be unique to the account and between 1 and 96 bytes in length.

async list_access_tokens(prefix: str = '', start_after: str = '', limit: int = 1000) Page[AccessTokenInfo][source]

List access tokens.

Parameters:
  • prefix – Filter to access tokens whose ID begins with this prefix.

  • start_after – Filter to access tokens whose ID starts lexicographically after this value.

  • limit – Number of items to return per page, up to a maximum of 1000.

async revoke_access_token(id: str) AccessTokenInfo[source]

Revoke an access token.

Parameters:

id – Access token ID.

class streamstore.Basin[source]

Caution

Returned by S2.basin(). Do not instantiate directly.

property name: str

Basin name.

async create_stream(name: str, config: StreamConfig | None = None) StreamInfo[source]

Create a stream.

Parameters:
  • name – Name of the stream.

  • config – Configuration for the stream.

Note

name must be unique within the basin. It can be an arbitrary string upto 512 characters. Backslash (/) is recommended as a delimiter for hierarchical naming.

stream(name: str) Stream[source]

Get a Stream object that can be used for performing stream operations.

Parameters:

name – Name of the stream.

Note

The stream must have been created already, else the operations will fail.

Tip

async with S2(..) as s2:
    stream = s2.basin("your-basin-name").stream("your-stream-name")

Basin implements the getitem magic method, so you can also do the following instead:

async with S2(..) as s2:
    stream = s2["your-basin-name"]["your-stream-name"]
async list_streams(prefix: str = '', start_after: str = '', limit: int = 1000) Page[StreamInfo][source]

List streams.

Parameters:
  • prefix – Filter to streams whose name begins with this prefix.

  • start_after – Filter to streams whose name starts lexicographically after this value.

  • limit – Number of items to return per page, up to a maximum of 1000.

async delete_stream(name: str) None[source]

Delete a stream.

Parameters:

name – Name of the stream.

Note

Stream deletion is asynchronous, and may take a few minutes to complete.

async get_stream_config(name: str) StreamConfig[source]

Get the current configuration of a stream.

Parameters:

name – Name of the stream.

async reconfigure_stream(name: str, config: StreamConfig) StreamConfig[source]

Modify the configuration of a stream.

Parameters:
  • name – Name of the stream.

  • config – Configuration for the stream.

Note

Modifying StreamConfig.storage_class will take effect only when this stream has been inactive for 10 minutes. This will become a live migration in future.

class streamstore.Stream[source]

Caution

Returned by Basin.stream(). Do not instantiate directly.

property name: str

Stream name.

async check_tail() Tail[source]

Check the tail of a stream.

async append(input: AppendInput) AppendOutput[source]

Append a batch of records to a stream.

async append_session(inputs: AsyncIterable[AppendInput]) AsyncIterable[AppendOutput][source]

Append batches of records to a stream continuously, while guaranteeing pipelined inputs are processed in order.

Tip

You can use append_inputs_gen() for automatic batching of records instead of explicitly preparing and providing batches of records.

Yields:

AppendOutput for each corresponding AppendInput.

Returns:

If enable_append_retries=False in S2, and if processing any of the AppendInput fails.

(or)

If enable_append_retries=True in S2, and if retry budget gets exhausted after trying to recover from failures.

async read(start: SeqNum | Timestamp | TailOffset, limit: ReadLimit | None = None, until: int | None = None, ignore_command_records: bool = False) list[SequencedRecord] | Tail[source]

Read a batch of records from a stream.

Parameters:
  • start – Inclusive start position.

  • limit – Number of records to return, up to a maximum of 1000 or 1MiB of metered_bytes().

  • until – Exclusive timestamp to read until. It is applied as an additional constraint on top of the limit and guarantees that all returned records have timestamps less than this timestamp.

  • ignore_command_records – Filters out command records if present from the batch.

Returns:

Batch of sequenced records. It can be empty only if limit and/or until were provided and no records satisfy those constraints.

(or)

Tail of the stream. It will be returned only if start equals or exceeds the tail of the stream.

async read_session(start: SeqNum | Timestamp | TailOffset, limit: ReadLimit | None = None, until: int | None = None, clamp: bool = False, ignore_command_records: bool = False) AsyncIterable[list[SequencedRecord] | Tail][source]

Read batches of records from a stream continuously.

Parameters:
  • start – Inclusive start position.

  • limit – Number of records to return, up to a maximum of 1000 or 1MiB of metered_bytes().

  • until – Exclusive timestamp to read until. It is applied as an additional constraint on top of the limit and guarantees that all returned records have timestamps less than this timestamp.

  • clamp – Clamp the start position to the stream’s tail when it exceeds the tail.

  • ignore_command_records – Filters out command records if present from the batch.

Note

With a session, you are able to read in a streaming fashion. If limit and/or until were not provided and the tail of the stream is reached, the session goes into real-time tailing mode and will yield records as they are appended to the stream.

Yields:

Batch of sequenced records.

(or)

Tail of the stream. It will be yielded only if start exceeds the tail and clamp was False.

Returns:

If limit and/or until were provided, and if there are no further records that satisfy those constraints.

(or)

If the previous yield was the tail of the stream.

class streamstore.schemas.Record(body: bytes, headers: list[tuple[bytes, bytes]] = [])[source]

Record to be appended to a stream.

body: bytes

Body of this record.

headers: list[tuple[bytes, bytes]]

Series of name-value pairs for this record.

timestamp: int | None

Timestamp for this record.

Precise semantics depend on StreamConfig.timestamping.

class streamstore.schemas.AppendInput(records: list[Record], match_seq_num: int | None = None, fencing_token: str | None = None)[source]

Used in the parameters to Stream.append() and Stream.append_session().

records: list[Record]

Batch of records to append atomically, which must contain at least one record, and no more than 1000. The size of the batch must not exceed 1MiB of metered_bytes().

match_seq_num: int | None

Enforce that the sequence number issued to the first record in the batch matches this value.

fencing_token: str | None

Enforce a fencing token, which must have been previously set by a fence command record.

class streamstore.schemas.AppendOutput(start_seq_num: int, start_timestamp: int, end_seq_num: int, end_timestamp: int, next_seq_num: int, last_timestamp: int)[source]

Returned from Stream.append().

(or)

Yielded from Stream.append_session().

start_seq_num: int

Sequence number of the first appended record.

start_timestamp: int

Timestamp of the first appended record.

end_seq_num: int

Sequence number of the last appended record + 1. end_seq_num - start_seq_num will be the number of records in the batch.

end_timestamp: int

Timestamp of the last appended record.

next_seq_num: int

Sequence number of the last durable record on the stream + 1. This can be greater than end_seq_num in case of concurrent appends.

last_timestamp: int

Timestamp of the last durable record on the stream.

class streamstore.schemas.Tail(next_seq_num: int, last_timestamp: int)[source]

Tail of a stream.

next_seq_num: int

Sequence number of the last durable record on the stream + 1.

last_timestamp: int

Timestamp of the last durable record on the stream.

class streamstore.schemas.SeqNum(value: int)[source]
class streamstore.schemas.Timestamp(value: int)[source]
class streamstore.schemas.TailOffset(value: int)[source]

Number of records before the tail.

class streamstore.schemas.ReadLimit(count: int | None = None, bytes: int | None = None)[source]

Used in the parameters to Stream.read() and Stream.read_session().

If both count and bytes are specified, either limit may be hit.

count: int | None

Number of records.

bytes: int | None

Cumulative size of records calculated using metered_bytes().

class streamstore.schemas.SequencedRecord(seq_num: int, body: bytes, headers: list[tuple[bytes, bytes]], timestamp: int)[source]

Record read from a stream.

seq_num: int

Sequence number assigned to this record.

body: bytes

Body of this record.

headers: list[tuple[bytes, bytes]]

Series of name-value pairs for this record.

timestamp: int

Timestamp for this record.

class streamstore.schemas.FirstSeqNum(value: int)[source]
class streamstore.schemas.NextSeqNum(value: int)[source]
class streamstore.schemas.Page(items: list[T], has_more: bool)[source]

Page of items.

items: list[T]

List of items of any type T.

has_more: bool

If True, it means that there are more pages.

enum streamstore.schemas.BasinScope(value)[source]

Scope of a basin.

Valid values are as follows:

UNSPECIFIED = <BasinScope.UNSPECIFIED: 0>

UNSPECIFIED defaults to AWS_US_EAST_1.

AWS_US_EAST_1 = <BasinScope.AWS_US_EAST_1: 1>

AWS us-east-1 region.

enum streamstore.schemas.BasinState(value)[source]

Current state of a basin.

Valid values are as follows:

UNSPECIFIED = <BasinState.UNSPECIFIED: 0>
ACTIVE = <BasinState.ACTIVE: 1>
CREATING = <BasinState.CREATING: 2>
DELETING = <BasinState.DELETING: 3>
class streamstore.schemas.BasinInfo(name: str, scope: BasinScope, state: BasinState)[source]

Basin information.

name: str

Basin name.

scope: BasinScope

Basin scope.

state: BasinState

Basin state.

class streamstore.schemas.StreamInfo(name: str, created_at: datetime, deleted_at: datetime | None)[source]

Stream information.

name: str

Stream name.

created_at: datetime

Creation time.

deleted_at: datetime | None

Deletion time, if this stream is being deleted.

enum streamstore.schemas.StorageClass(value)[source]

Storage class for recent appends.

Valid values are as follows:

STANDARD = <StorageClass.STANDARD: 1>

Offers end-to-end latencies under 500 ms.

EXPRESS = <StorageClass.EXPRESS: 2>

Offers end-to-end latencies under 50 ms.

enum streamstore.schemas.TimestampingMode(value)[source]

Timestamping mode.

Note

The arrival time is always in milliseconds since Unix epoch.

Valid values are as follows:

UNSPECIFIED = <TimestampingMode.UNSPECIFIED: 0>

Defaults to CLIENT_PREFER.

CLIENT_PREFER = <TimestampingMode.CLIENT_PREFER: 1>

Prefer client-specified timestamp if present, otherwise use arrival time.

CLIENT_REQUIRE = <TimestampingMode.CLIENT_REQUIRE: 2>

Require a client-specified timestamp and reject the append if it is absent.

ARRIVAL = <TimestampingMode.ARRIVAL: 3>

Use the arrival time and ignore any client-specified timestamp.

class streamstore.schemas.Timestamping(mode: TimestampingMode | None = None, uncapped: bool | None = None)[source]

Timestamping behavior.

mode: TimestampingMode | None

Timestamping mode.

If not specified, the default is TimestampingMode.CLIENT_PREFER.

uncapped: bool | None

Allow client-specified timestamps to exceed the arrival time.

class streamstore.schemas.StreamConfig(storage_class: StorageClass | None = None, retention_policy: int | Literal['infinite'] | None = None, timestamping: Timestamping | None = None, delete_on_empty_min_age: int | None = None)[source]

Stream configuration.

storage_class: StorageClass | None

Storage class for this stream.

If not specified, the default is StorageClass.EXPRESS.

retention_policy: int | Literal['infinite'] | None

Retention policy for records in this stream.

Retention duration in seconds to automatically trim records older than this duration.

'infinite' to retain records indefinitely. (While S2 is in public preview, this is capped at 28 days. Let us know if you’d like the cap removed.)

If not specified, the default is to retain records for 7 days.

timestamping: Timestamping | None

Timestamping behavior for appends to this stream, which influences how timestamps are handled.

delete_on_empty_min_age: int | None

Minimum age in seconds before this stream can be automatically deleted if empty.

If not specified or set to 0, this stream will not be automatically deleted.

class streamstore.schemas.BasinConfig(default_stream_config: StreamConfig | None = None, create_stream_on_append: bool | None = None)[source]

Basin configuration.

default_stream_config: StreamConfig | None

Default configuration for streams in this basin.

create_stream_on_append: bool | None

Create stream on append if it doesn’t exist, using the default stream configuration.

enum streamstore.schemas.ResourceMatchOp(value)[source]

Resource match operator.

Valid values are as follows:

EXACT = <ResourceMatchOp.EXACT: 1>

Match only the resource with the exact value. Use an empty string to match no resources.

PREFIX = <ResourceMatchOp.PREFIX: 2>

Match all resources that start with the prefix value. Use an empty string to match all resources.

class streamstore.schemas.ResourceMatchRule(match_op: ResourceMatchOp, value: str)[source]

Resource match rule.

match_op: ResourceMatchOp

Match operator.

value: str

Value to match.

enum streamstore.schemas.Permission(value)[source]

Permission.

Valid values are as follows:

UNSPECIFIED = <Permission.UNSPECIFIED: 0>
READ = <Permission.READ: 1>
WRITE = <Permission.WRITE: 2>
READ_WRITE = <Permission.READ_WRITE: 3>
class streamstore.schemas.OperationGroupPermissions(account: Permission = Permission.UNSPECIFIED, basin: Permission = Permission.UNSPECIFIED, stream: Permission = Permission.UNSPECIFIED)[source]

Operation group permissions.

account: Permission

Permission for account operations.

basin: Permission

Permission for basin operations.

stream: Permission

Permission for stream operations.

enum streamstore.schemas.Operation(value)[source]

Operation.

Valid values are as follows:

UNSPECIFIED = <Operation.UNSPECIFIED: 0>
LIST_BASINS = <Operation.LIST_BASINS: 1>
CREATE_BASIN = <Operation.CREATE_BASIN: 2>
DELETE_BASIN = <Operation.DELETE_BASIN: 3>
RECONFIGURE_BASIN = <Operation.RECONFIGURE_BASIN: 4>
GET_BASIN_CONFIG = <Operation.GET_BASIN_CONFIG: 5>
ISSUE_ACCESS_TOKEN = <Operation.ISSUE_ACCESS_TOKEN: 6>
REVOKE_ACCESS_TOKEN = <Operation.REVOKE_ACCESS_TOKEN: 7>
LIST_ACCESS_TOKENS = <Operation.LIST_ACCESS_TOKENS: 8>
LIST_STREAMS = <Operation.LIST_STREAMS: 9>
CREATE_STREAM = <Operation.CREATE_STREAM: 10>
DELETE_STREAM = <Operation.DELETE_STREAM: 11>
GET_STREAM_CONFIG = <Operation.GET_STREAM_CONFIG: 12>
RECONFIGURE_STREAM = <Operation.RECONFIGURE_STREAM: 13>
CHECK_TAIL = <Operation.CHECK_TAIL: 14>
APPEND = <Operation.APPEND: 15>
READ = <Operation.READ: 16>
class streamstore.schemas.AccessTokenScope(basins: ~streamstore.schemas.ResourceMatchRule | None = None, streams: ~streamstore.schemas.ResourceMatchRule | None = None, access_tokens: ~streamstore.schemas.ResourceMatchRule | None = None, op_group_perms: ~streamstore.schemas.OperationGroupPermissions | None = None, ops: list[~streamstore.schemas.Operation] = <factory>)[source]

Access token scope.

basins: ResourceMatchRule | None

Allowed basins.

streams: ResourceMatchRule | None

Allowed streams.

access_tokens: ResourceMatchRule | None

Allowed access token IDs.

op_group_perms: OperationGroupPermissions | None

Permissions at operation group level.

ops: list[Operation]

Allowed operations.

Note

A union of allowed operations and groups is used as the effective set of allowed operations.

class streamstore.schemas.AccessTokenInfo(id: str, scope: AccessTokenScope, expires_at: int | None, auto_prefix_streams: bool)[source]

Access token information.

id: str

Access token ID.

scope: AccessTokenScope

Access token scope.

expires_at: int | None

Expiration time in seconds since Unix epoch.

auto_prefix_streams: bool

Whether auto-prefixing is enabled for streams in scope.

enum streamstore.schemas.Cloud(value)[source]

Cloud in which the S2 service runs.

Valid values are as follows:

AWS = <Cloud.AWS: 1>
class streamstore.schemas.Endpoints[source]

S2 endpoints.

classmethod for_cloud(cloud: Cloud) Endpoints[source]

Construct S2 endpoints for the given cloud.

Parameters:

cloud – Cloud in which the S2 service runs.

class streamstore.utils.CommandRecord[source]

Factory class for creating command records.

static fence(token: str) Record[source]

Create a fence command record.

Parameters:

token – Fencing token. Its UTF-8 byte count must not exceed 36 bytes. If empty, clears the previously set token.

static trim(desired_first_seq_num: int) Record[source]

Create a trim command record.

Parameters:

desired_first_seq_num – Sequence number for the first record to exist after trimming preceeding records in the stream.

Note

If desired_first_seq_num was smaller than the sequence number for the first existing record in the stream, trimming doesn’t happen.

streamstore.utils.metered_bytes(records: Iterable[Record | SequencedRecord]) int[source]

Each record is metered using the following formula:

8 + 2 * len(headers)
+ sum((len(name) + len(value)) for (name, value) in headers)
+ len(body)
async streamstore.utils.append_inputs_gen(records: AsyncIterable[Record], match_seq_num: int | None = None, fencing_token: str | None = None, max_records_per_batch: int = 1000, max_bytes_per_batch: int = 1048576, max_linger_per_batch: timedelta | None = None) AsyncIterable[AppendInput][source]

Generator function for batching records and yielding AppendInput.

Returned generator object can be used as the parameter to Stream.append_session().

Yields:

AppendInput

Parameters:
  • records – Records that have to be appended to a stream.

  • match_seq_num – If it is not None, it is used in the first yield of AppendInput and is automatically advanced for subsequent yields.

  • fencing_token – Used in each yield of AppendInput.

  • max_records_per_batch – Maximum number of records in each batch.

  • max_bytes_per_batch – Maximum size of each batch calculated using metered_bytes().

  • max_linger_per_batch – Maximum duration for each batch to accumulate records before yielding.

Note

If max_linger_per_batch is None, AppendInput will be yielded only when max_records_per_batch or max_bytes_per_batch is reached.