API Reference¶
- class streamstore.S2(access_token: str, endpoints: Endpoints | None = None, request_timeout: timedelta = datetime.timedelta(seconds=5), max_retries: int = 3, enable_append_retries: bool = True, enable_compression: bool = False)[source]¶
Async client for interacting with s2.dev.
- Parameters:
access_token – Access token generated from S2 dashboard.
endpoints – S2 endpoints. If not specified, public endpoints for S2 service running in AWS cloud will be used.
request_timeout – Timeout for requests made by the client. Default value is
5seconds.max_retries – Maximum number of retries for a request. Default value is
3.enable_append_retries – Enable retries for appends i.e for both
Stream.append()andStream.append_session(). Default value isTrue.enable_compression – Enable compression (Gzip) for
Stream.append(),Stream.append_session(),Stream.read(), andStream.read_session(). Default value isFalse.
- async close() None[source]¶
Close all open connections to S2 service endpoints.
Tip
S2supports async context manager protocol, so you can also do the following instead of explicitly closing:async with S2(..) as s2: ..
- async create_basin(name: str, config: BasinConfig | None = None) BasinInfo[source]¶
Create a basin.
- Parameters:
name – Name of the basin.
config – Configuration for the basin.
Note
namemust be globally unique and must be between 8 and 48 characters, comprising lowercase letters, numbers and hyphens. It cannot begin or end with a hyphen.
- basin(name: str) Basin[source]¶
Get a Basin object that can be used for performing basin operations.
- Parameters:
name – Name of the basin.
Note
The basin must have been created already, else the operations will fail.
Tip
async with S2(..) as s2: basin = s2.basin("your-basin-name")
S2implements thegetitemmagic method, so you can also do the following instead:async with S2(..) as s2: basin = s2["your-basin-name"]
- async list_basins(prefix: str = '', start_after: str = '', limit: int = 1000) Page[BasinInfo][source]¶
List basins.
- Parameters:
prefix – Filter to basins whose name begins with this prefix.
start_after – Filter to basins whose name starts lexicographically after this value.
limit – Number of items to return per page, up to a maximum of 1000.
- async delete_basin(name: str) None[source]¶
Delete a basin.
- Parameters:
name – Name of the basin.
Note
Basin deletion is asynchronous, and may take a few minutes to complete.
- async get_basin_config(name: str) BasinConfig[source]¶
Get the current configuration of a basin.
- Parameters:
name – Name of the basin.
- async reconfigure_basin(name: str, config: BasinConfig) BasinConfig[source]¶
Modify the configuration of a basin.
- Parameters:
name – Name of the basin.
config – Configuration for the basin.
Note
Modifiying the
BasinConfig.default_stream_configdoesn’t affect already existing streams; it only applies to new streams created hereafter.
- async issue_access_token(id: str, scope: AccessTokenScope, expires_at: int | None = None, auto_prefix_streams: bool = False) str[source]¶
Issue a new access token.
- Parameters:
id – Access token ID.
scope – Access token scope.
expires_at – Expiration time in seconds since Unix epoch. If not specified, expiration time of
access_tokenpassed toS2will be used.auto_prefix_streams – Enable auto-prefixing: the specified prefix in
AccessTokenScope.streamswill be added to stream names in requests and stripped from stream names in responses.
Note
idmust be unique to the account and between 1 and 96 bytes in length.
- async list_access_tokens(prefix: str = '', start_after: str = '', limit: int = 1000) Page[AccessTokenInfo][source]¶
List access tokens.
- Parameters:
prefix – Filter to access tokens whose ID begins with this prefix.
start_after – Filter to access tokens whose ID starts lexicographically after this value.
limit – Number of items to return per page, up to a maximum of 1000.
- async revoke_access_token(id: str) AccessTokenInfo[source]¶
Revoke an access token.
- Parameters:
id – Access token ID.
- class streamstore.Basin[source]¶
Caution
Returned by
S2.basin(). Do not instantiate directly.- async create_stream(name: str, config: StreamConfig | None = None) StreamInfo[source]¶
Create a stream.
- Parameters:
name – Name of the stream.
config – Configuration for the stream.
Note
namemust be unique within the basin. It can be an arbitrary string upto 512 characters. Backslash (/) is recommended as a delimiter for hierarchical naming.
- stream(name: str) Stream[source]¶
Get a Stream object that can be used for performing stream operations.
- Parameters:
name – Name of the stream.
Note
The stream must have been created already, else the operations will fail.
Tip
async with S2(..) as s2: stream = s2.basin("your-basin-name").stream("your-stream-name")
Basinimplements thegetitemmagic method, so you can also do the following instead:async with S2(..) as s2: stream = s2["your-basin-name"]["your-stream-name"]
- async list_streams(prefix: str = '', start_after: str = '', limit: int = 1000) Page[StreamInfo][source]¶
List streams.
- Parameters:
prefix – Filter to streams whose name begins with this prefix.
start_after – Filter to streams whose name starts lexicographically after this value.
limit – Number of items to return per page, up to a maximum of 1000.
- async delete_stream(name: str) None[source]¶
Delete a stream.
- Parameters:
name – Name of the stream.
Note
Stream deletion is asynchronous, and may take a few minutes to complete.
- async get_stream_config(name: str) StreamConfig[source]¶
Get the current configuration of a stream.
- Parameters:
name – Name of the stream.
- async reconfigure_stream(name: str, config: StreamConfig) StreamConfig[source]¶
Modify the configuration of a stream.
- Parameters:
name – Name of the stream.
config – Configuration for the stream.
Note
Modifying
StreamConfig.storage_classwill take effect only when this stream has been inactive for 10 minutes. This will become a live migration in future.
- class streamstore.Stream[source]¶
Caution
Returned by
Basin.stream(). Do not instantiate directly.- async append(input: AppendInput) AppendOutput[source]¶
Append a batch of records to a stream.
- async append_session(inputs: AsyncIterable[AppendInput]) AsyncIterable[AppendOutput][source]¶
Append batches of records to a stream continuously, while guaranteeing pipelined inputs are processed in order.
Tip
You can use
append_inputs_gen()for automatic batching of records instead of explicitly preparing and providing batches of records.- Yields:
AppendOutputfor each correspondingAppendInput.- Returns:
If
enable_append_retries=FalseinS2, and if processing any of theAppendInputfails.(or)
If
enable_append_retries=TrueinS2, and if retry budget gets exhausted after trying to recover from failures.
- async read(start: SeqNum | Timestamp | TailOffset, limit: ReadLimit | None = None, until: int | None = None, ignore_command_records: bool = False) list[SequencedRecord] | Tail[source]¶
Read a batch of records from a stream.
- Parameters:
start – Inclusive start position.
limit – Number of records to return, up to a maximum of 1000 or 1MiB of
metered_bytes().until – Exclusive timestamp to read until. It is applied as an additional constraint on top of the
limitand guarantees that all returned records have timestamps less than this timestamp.ignore_command_records – Filters out command records if present from the batch.
- Returns:
Batch of sequenced records. It can be empty only if
limitand/oruntilwere provided and no records satisfy those constraints.(or)
Tail of the stream. It will be returned only if
startequals or exceeds the tail of the stream.
- async read_session(start: SeqNum | Timestamp | TailOffset, limit: ReadLimit | None = None, until: int | None = None, clamp: bool = False, ignore_command_records: bool = False) AsyncIterable[list[SequencedRecord] | Tail][source]¶
Read batches of records from a stream continuously.
- Parameters:
start – Inclusive start position.
limit – Number of records to return, up to a maximum of 1000 or 1MiB of
metered_bytes().until – Exclusive timestamp to read until. It is applied as an additional constraint on top of the
limitand guarantees that all returned records have timestamps less than this timestamp.clamp – Clamp the
startposition to the stream’s tail when it exceeds the tail.ignore_command_records – Filters out command records if present from the batch.
Note
With a session, you are able to read in a streaming fashion. If
limitand/oruntilwere not provided and the tail of the stream is reached, the session goes into real-time tailing mode and will yield records as they are appended to the stream.- Yields:
Batch of sequenced records.
(or)
Tail of the stream. It will be yielded only if
startexceeds the tail andclampwasFalse.- Returns:
If
limitand/oruntilwere provided, and if there are no further records that satisfy those constraints.(or)
If the previous yield was the tail of the stream.
- class streamstore.schemas.Record(body: bytes, headers: list[tuple[bytes, bytes]] = [])[source]¶
Record to be appended to a stream.
- timestamp: int | None¶
Timestamp for this record.
Precise semantics depend on
StreamConfig.timestamping.
- class streamstore.schemas.AppendInput(records: list[Record], match_seq_num: int | None = None, fencing_token: str | None = None)[source]¶
Used in the parameters to
Stream.append()andStream.append_session().- records: list[Record]¶
Batch of records to append atomically, which must contain at least one record, and no more than 1000. The size of the batch must not exceed 1MiB of
metered_bytes().
- class streamstore.schemas.AppendOutput(start_seq_num: int, start_timestamp: int, end_seq_num: int, end_timestamp: int, next_seq_num: int, last_timestamp: int)[source]¶
Returned from
Stream.append().(or)
Yielded from
Stream.append_session().- end_seq_num: int¶
Sequence number of the last appended record + 1.
end_seq_num - start_seq_numwill be the number of records in the batch.
- class streamstore.schemas.ReadLimit(count: int | None = None, bytes: int | None = None)[source]¶
Used in the parameters to
Stream.read()andStream.read_session().If both
countandbytesare specified, either limit may be hit.- bytes: int | None¶
Cumulative size of records calculated using
metered_bytes().
- class streamstore.schemas.SequencedRecord(seq_num: int, body: bytes, headers: list[tuple[bytes, bytes]], timestamp: int)[source]¶
Record read from a stream.
- enum streamstore.schemas.BasinScope(value)[source]¶
Scope of a basin.
Valid values are as follows:
- UNSPECIFIED = <BasinScope.UNSPECIFIED: 0>¶
UNSPECIFIEDdefaults toAWS_US_EAST_1.
- AWS_US_EAST_1 = <BasinScope.AWS_US_EAST_1: 1>¶
AWS
us-east-1region.
- enum streamstore.schemas.BasinState(value)[source]¶
Current state of a basin.
Valid values are as follows:
- UNSPECIFIED = <BasinState.UNSPECIFIED: 0>¶
- ACTIVE = <BasinState.ACTIVE: 1>¶
- CREATING = <BasinState.CREATING: 2>¶
- DELETING = <BasinState.DELETING: 3>¶
- class streamstore.schemas.BasinInfo(name: str, scope: BasinScope, state: BasinState)[source]¶
Basin information.
- scope: BasinScope¶
Basin scope.
- state: BasinState¶
Basin state.
- class streamstore.schemas.StreamInfo(name: str, created_at: datetime, deleted_at: datetime | None)[source]¶
Stream information.
- enum streamstore.schemas.StorageClass(value)[source]¶
Storage class for recent appends.
Valid values are as follows:
- STANDARD = <StorageClass.STANDARD: 1>¶
Offers end-to-end latencies under 500 ms.
- EXPRESS = <StorageClass.EXPRESS: 2>¶
Offers end-to-end latencies under 50 ms.
- enum streamstore.schemas.TimestampingMode(value)[source]¶
Timestamping mode.
Note
The arrival time is always in milliseconds since Unix epoch.
Valid values are as follows:
- UNSPECIFIED = <TimestampingMode.UNSPECIFIED: 0>¶
Defaults to
CLIENT_PREFER.
- CLIENT_PREFER = <TimestampingMode.CLIENT_PREFER: 1>¶
Prefer client-specified timestamp if present, otherwise use arrival time.
- CLIENT_REQUIRE = <TimestampingMode.CLIENT_REQUIRE: 2>¶
Require a client-specified timestamp and reject the append if it is absent.
- ARRIVAL = <TimestampingMode.ARRIVAL: 3>¶
Use the arrival time and ignore any client-specified timestamp.
- class streamstore.schemas.Timestamping(mode: TimestampingMode | None = None, uncapped: bool | None = None)[source]¶
Timestamping behavior.
- mode: TimestampingMode | None¶
Timestamping mode.
If not specified, the default is
TimestampingMode.CLIENT_PREFER.
- class streamstore.schemas.StreamConfig(storage_class: StorageClass | None = None, retention_policy: int | Literal['infinite'] | None = None, timestamping: Timestamping | None = None, delete_on_empty_min_age: int | None = None)[source]¶
Stream configuration.
- storage_class: StorageClass | None¶
Storage class for this stream.
If not specified, the default is
StorageClass.EXPRESS.
- retention_policy: int | Literal['infinite'] | None¶
Retention policy for records in this stream.
Retention duration in seconds to automatically trim records older than this duration.
'infinite'to retain records indefinitely. (While S2 is in public preview, this is capped at 28 days. Let us know if you’d like the cap removed.)If not specified, the default is to retain records for 7 days.
- timestamping: Timestamping | None¶
Timestamping behavior for appends to this stream, which influences how timestamps are handled.
- class streamstore.schemas.BasinConfig(default_stream_config: StreamConfig | None = None, create_stream_on_append: bool | None = None)[source]¶
Basin configuration.
- default_stream_config: StreamConfig | None¶
Default configuration for streams in this basin.
- enum streamstore.schemas.ResourceMatchOp(value)[source]¶
Resource match operator.
Valid values are as follows:
- EXACT = <ResourceMatchOp.EXACT: 1>¶
Match only the resource with the exact value. Use an empty string to match no resources.
- PREFIX = <ResourceMatchOp.PREFIX: 2>¶
Match all resources that start with the prefix value. Use an empty string to match all resources.
- class streamstore.schemas.ResourceMatchRule(match_op: ResourceMatchOp, value: str)[source]¶
Resource match rule.
- match_op: ResourceMatchOp¶
Match operator.
- enum streamstore.schemas.Permission(value)[source]¶
Permission.
Valid values are as follows:
- UNSPECIFIED = <Permission.UNSPECIFIED: 0>¶
- READ = <Permission.READ: 1>¶
- WRITE = <Permission.WRITE: 2>¶
- READ_WRITE = <Permission.READ_WRITE: 3>¶
- class streamstore.schemas.OperationGroupPermissions(account: Permission = Permission.UNSPECIFIED, basin: Permission = Permission.UNSPECIFIED, stream: Permission = Permission.UNSPECIFIED)[source]¶
Operation group permissions.
- account: Permission¶
Permission for account operations.
- basin: Permission¶
Permission for basin operations.
- stream: Permission¶
Permission for stream operations.
- enum streamstore.schemas.Operation(value)[source]¶
Operation.
Valid values are as follows:
- UNSPECIFIED = <Operation.UNSPECIFIED: 0>¶
- LIST_BASINS = <Operation.LIST_BASINS: 1>¶
- CREATE_BASIN = <Operation.CREATE_BASIN: 2>¶
- DELETE_BASIN = <Operation.DELETE_BASIN: 3>¶
- RECONFIGURE_BASIN = <Operation.RECONFIGURE_BASIN: 4>¶
- GET_BASIN_CONFIG = <Operation.GET_BASIN_CONFIG: 5>¶
- ISSUE_ACCESS_TOKEN = <Operation.ISSUE_ACCESS_TOKEN: 6>¶
- REVOKE_ACCESS_TOKEN = <Operation.REVOKE_ACCESS_TOKEN: 7>¶
- LIST_ACCESS_TOKENS = <Operation.LIST_ACCESS_TOKENS: 8>¶
- LIST_STREAMS = <Operation.LIST_STREAMS: 9>¶
- CREATE_STREAM = <Operation.CREATE_STREAM: 10>¶
- DELETE_STREAM = <Operation.DELETE_STREAM: 11>¶
- GET_STREAM_CONFIG = <Operation.GET_STREAM_CONFIG: 12>¶
- RECONFIGURE_STREAM = <Operation.RECONFIGURE_STREAM: 13>¶
- CHECK_TAIL = <Operation.CHECK_TAIL: 14>¶
- APPEND = <Operation.APPEND: 15>¶
- READ = <Operation.READ: 16>¶
- class streamstore.schemas.AccessTokenScope(basins: ~streamstore.schemas.ResourceMatchRule | None = None, streams: ~streamstore.schemas.ResourceMatchRule | None = None, access_tokens: ~streamstore.schemas.ResourceMatchRule | None = None, op_group_perms: ~streamstore.schemas.OperationGroupPermissions | None = None, ops: list[~streamstore.schemas.Operation] = <factory>)[source]¶
Access token scope.
- basins: ResourceMatchRule | None¶
Allowed basins.
- streams: ResourceMatchRule | None¶
Allowed streams.
- access_tokens: ResourceMatchRule | None¶
Allowed access token IDs.
- op_group_perms: OperationGroupPermissions | None¶
Permissions at operation group level.
- class streamstore.schemas.AccessTokenInfo(id: str, scope: AccessTokenScope, expires_at: int | None, auto_prefix_streams: bool)[source]¶
Access token information.
- scope: AccessTokenScope¶
Access token scope.
- enum streamstore.schemas.Cloud(value)[source]¶
Cloud in which the S2 service runs.
Valid values are as follows:
- AWS = <Cloud.AWS: 1>¶
- class streamstore.utils.CommandRecord[source]¶
Factory class for creating command records.
- static fence(token: str) Record[source]¶
Create a fence command record.
- Parameters:
token – Fencing token. Its UTF-8 byte count must not exceed 36 bytes. If empty, clears the previously set token.
- static trim(desired_first_seq_num: int) Record[source]¶
Create a trim command record.
- Parameters:
desired_first_seq_num – Sequence number for the first record to exist after trimming preceeding records in the stream.
Note
If
desired_first_seq_numwas smaller than the sequence number for the first existing record in the stream, trimming doesn’t happen.
- streamstore.utils.metered_bytes(records: Iterable[Record | SequencedRecord]) int[source]¶
Each record is metered using the following formula:
8 + 2 * len(headers) + sum((len(name) + len(value)) for (name, value) in headers) + len(body)
- async streamstore.utils.append_inputs_gen(records: AsyncIterable[Record], match_seq_num: int | None = None, fencing_token: str | None = None, max_records_per_batch: int = 1000, max_bytes_per_batch: int = 1048576, max_linger_per_batch: timedelta | None = None) AsyncIterable[AppendInput][source]¶
Generator function for batching records and yielding
AppendInput.Returned generator object can be used as the parameter to
Stream.append_session().- Yields:
- Parameters:
records – Records that have to be appended to a stream.
match_seq_num – If it is not
None, it is used in the first yield ofAppendInputand is automatically advanced for subsequent yields.fencing_token – Used in each yield of
AppendInput.max_records_per_batch – Maximum number of records in each batch.
max_bytes_per_batch – Maximum size of each batch calculated using
metered_bytes().max_linger_per_batch – Maximum duration for each batch to accumulate records before yielding.
Note
If
max_linger_per_batchisNone,AppendInputwill be yielded only whenmax_records_per_batchormax_bytes_per_batchis reached.