API Reference¶
- class streamstore.S2(access_token: str, endpoints: Endpoints | None = None, request_timeout: timedelta = datetime.timedelta(seconds=5), max_retries: int = 3, enable_append_retries: bool = True, enable_compression: bool = False)[source]¶
Async client for interacting with s2.dev.
- Parameters:
access_token – Access token generated from S2 dashboard.
endpoints – S2 endpoints. If not specified, public endpoints for S2 service running in AWS cloud will be used.
request_timeout – Timeout for requests made by the client. Default value is
5seconds.max_retries – Maximum number of retries for a request. Default value is
3.enable_append_retries – Enable retries for appends i.e for both
Stream.append()andStream.append_session(). Default value isTrue.enable_compression – Enable compression (Gzip) for
Stream.append(),Stream.append_session(),Stream.read(), andStream.read_session(). Default value isFalse.
- async close() None[source]¶
Close all open connections to S2 service endpoints.
Tip
S2supports async context manager protocol, so you can also do the following instead of explicitly closing:async with S2(..) as s2: ..
- async create_basin(name: str, config: BasinConfig | None = None) BasinInfo[source]¶
Create a basin.
- Parameters:
name – Name of the basin.
config – Configuration for the basin.
Note
namemust be globally unique and must be between 8 and 48 characters, comprising lowercase letters, numbers and hyphens. It cannot begin or end with a hyphen.
- basin(name: str) Basin[source]¶
Get a Basin object that can be used for performing basin operations.
- Parameters:
name – Name of the basin.
Note
The basin must have been created already, else the operations will fail.
Tip
async with S2(..) as s2: basin = s2.basin("your-basin-name")
S2implements thegetitemmagic method, so you can also do the following instead:async with S2(..) as s2: basin = s2["your-basin-name"]
- async list_basins(prefix: str = '', start_after: str = '', limit: int = 1000) Page[BasinInfo][source]¶
List basins.
- Parameters:
prefix – Filter to basins whose name begins with this prefix.
start_after – Filter to basins whose name starts lexicographically after this value.
limit – Number of items to return per page, up to a maximum of 1000.
- async delete_basin(name: str) None[source]¶
Delete a basin.
- Parameters:
name – Name of the basin.
Note
Basin deletion is asynchronous, and may take a few minutes to complete.
- async get_basin_config(name: str) BasinConfig[source]¶
Get the current configuration of a basin.
- Parameters:
name – Name of the basin.
- async reconfigure_basin(name: str, config: BasinConfig) BasinConfig[source]¶
Modify the configuration of a basin.
- Parameters:
name – Name of the basin.
config – Configuration for the basin.
Note
Modifiying the
BasinConfig.default_stream_configdoesn’t affect already existing streams; it only applies to new streams created hereafter.
- async issue_access_token(id: str, scope: AccessTokenScope, expires_at: int | None = None, auto_prefix_streams: bool = False) str[source]¶
Issue a new access token.
- Parameters:
id – Access token ID.
scope – Access token scope.
expires_at – Expiration time in seconds since Unix epoch. If not specified, expiration time of
access_tokenpassed toS2will be used.auto_prefix_streams – Enable auto-prefixing: the specified prefix in
AccessTokenScope.streamswill be added to stream names in requests and stripped from stream names in responses.
Note
idmust be unique to the account and between 1 and 96 bytes in length.
- async list_access_tokens(prefix: str = '', start_after: str = '', limit: int = 1000) Page[AccessTokenInfo][source]¶
List access tokens.
- Parameters:
prefix – Filter to access tokens whose ID begins with this prefix.
start_after – Filter to access tokens whose ID starts lexicographically after this value.
limit – Number of items to return per page, up to a maximum of 1000.
- async revoke_access_token(id: str) AccessTokenInfo[source]¶
Revoke an access token.
- Parameters:
id – Access token ID.
- class streamstore.Basin[source]¶
Caution
Returned by
S2.basin(). Do not instantiate directly.- async create_stream(name: str, config: StreamConfig | None = None) StreamInfo[source]¶
Create a stream.
- Parameters:
name – Name of the stream.
config – Configuration for the stream.
Note
namemust be unique within the basin. It can be an arbitrary string upto 512 characters. Backslash (/) is recommended as a delimiter for hierarchical naming.
- stream(name: str) Stream[source]¶
Get a Stream object that can be used for performing stream operations.
- Parameters:
name – Name of the stream.
Note
The stream must have been created already, else the operations will fail.
Tip
async with S2(..) as s2: stream = s2.basin("your-basin-name").stream("your-stream-name")
Basinimplements thegetitemmagic method, so you can also do the following instead:async with S2(..) as s2: stream = s2["your-basin-name"]["your-stream-name"]
- async list_streams(prefix: str = '', start_after: str = '', limit: int = 1000) Page[StreamInfo][source]¶
List streams.
- Parameters:
prefix – Filter to streams whose name begins with this prefix.
start_after – Filter to streams whose name starts lexicographically after this value.
limit – Number of items to return per page, up to a maximum of 1000.
- async delete_stream(name: str) None[source]¶
Delete a stream.
- Parameters:
name – Name of the stream.
Note
Stream deletion is asynchronous, and may take a few minutes to complete.
- async get_stream_config(name: str) StreamConfig[source]¶
Get the current configuration of a stream.
- Parameters:
name – Name of the stream.
- async reconfigure_stream(name: str, config: StreamConfig) StreamConfig[source]¶
Modify the configuration of a stream.
- Parameters:
name – Name of the stream.
config – Configuration for the stream.
Note
Modifying
StreamConfig.storage_classwill take effect only when this stream has been inactive for 10 minutes. This will become a live migration in future.
- class streamstore.Stream[source]¶
Caution
Returned by
Basin.stream(). Do not instantiate directly.- async append(input: AppendInput) AppendOutput[source]¶
Append a batch of records to a stream.
- async append_session(inputs: AsyncIterable[AppendInput]) AsyncIterable[AppendOutput][source]¶
Append batches of records to a stream continuously, while guaranteeing pipelined inputs are processed in order.
Tip
You can use
append_inputs_gen()for automatic batching of records instead of explicitly preparing and providing batches of records.- Yields:
AppendOutputfor each correspondingAppendInput.- Returns:
If
enable_append_retries=FalseinS2, and if processing any of theAppendInputfails.(or)
If
enable_append_retries=TrueinS2, and if retry budget gets exhausted after trying to recover from failures.
- async read(start: SeqNum | Timestamp | TailOffset, limit: ReadLimit | None = None, until: int | None = None, ignore_command_records: bool = False) list[SequencedRecord] | Tail[source]¶
Read a batch of records from a stream.
- Parameters:
start – Inclusive start position.
limit – Number of records to return, up to a maximum of 1000 or 1MiB of
metered_bytes().until – Exclusive timestamp to read until. It is applied as an additional constraint on top of the
limitand guarantees that all returned records have timestamps less than this timestamp.ignore_command_records – Filters out command records if present from the batch.
- Returns:
Batch of sequenced records. It can be empty only if
limitand/oruntilwere provided and no records satisfy those constraints.(or)
Tail of the stream. It will be returned only if
startequals or exceeds the tail of the stream.
- async read_session(start: SeqNum | Timestamp | TailOffset, limit: ReadLimit | None = None, until: int | None = None, clamp: bool = False, ignore_command_records: bool = False) AsyncIterable[list[SequencedRecord] | Tail][source]¶
Read batches of records from a stream continuously.
- Parameters:
start – Inclusive start position.
limit – Number of records to return, up to a maximum of 1000 or 1MiB of
metered_bytes().until – Exclusive timestamp to read until. It is applied as an additional constraint on top of the
limitand guarantees that all returned records have timestamps less than this timestamp.clamp – Clamp the
startposition to the stream’s tail when it exceeds the tail.ignore_command_records – Filters out command records if present from the batch.
Note
With a session, you are able to read in a streaming fashion. If
limitand/oruntilwere not provided and the tail of the stream is reached, the session goes into real-time tailing mode and will yield records as they are appended to the stream.- Yields:
Batch of sequenced records.
(or)
Tail of the stream. It will be yielded only if
startexceeds the tail andclampwasFalse.- Returns:
If
limitand/oruntilwere provided, and if there are no further records that satisfy those constraints.(or)
If the previous yield was the tail of the stream.
- class streamstore.schemas.Record(body: bytes, headers: list[tuple[bytes, bytes]] = [])[source]¶
Record to be appended to a stream.
- timestamp: int | None¶
Timestamp for this record.
Precise semantics depend on
StreamConfig.timestamping.
- class streamstore.schemas.AppendInput(records: list[Record], match_seq_num: int | None = None, fencing_token: str | None = None)[source]¶
Used in the parameters to
Stream.append()andStream.append_session().- records: list[Record]¶
Batch of records to append atomically, which must contain at least one record, and no more than 1000. The size of the batch must not exceed 1MiB of
metered_bytes().
- class streamstore.schemas.AppendOutput(start_seq_num: int, start_timestamp: int, end_seq_num: int, end_timestamp: int, next_seq_num: int, last_timestamp: int)[source]¶
Returned from
Stream.append().(or)
Yielded from
Stream.append_session().- end_seq_num: int¶
Sequence number of the last appended record + 1.
end_seq_num - start_seq_numwill be the number of records in the batch.
- class streamstore.schemas.ReadLimit(count: int | None = None, bytes: int | None = None)[source]¶
Used in the parameters to
Stream.read()andStream.read_session().If both
countandbytesare specified, either limit may be hit.- bytes: int | None¶
Cumulative size of records calculated using
metered_bytes().
- class streamstore.schemas.SequencedRecord(seq_num: int, body: bytes, headers: list[tuple[bytes, bytes]], timestamp: int)[source]¶
Record read from a stream.
- enum streamstore.schemas.BasinScope(value)[source]¶
Scope of a basin.
Valid values are as follows:
- UNSPECIFIED = <BasinScope.UNSPECIFIED: 0>¶
UNSPECIFIEDdefaults toAWS_US_EAST_1.
- AWS_US_EAST_1 = <BasinScope.AWS_US_EAST_1: 1>¶
AWS
us-east-1region.
- enum streamstore.schemas.BasinState(value)[source]¶
Current state of a basin.
Valid values are as follows:
- UNSPECIFIED = <BasinState.UNSPECIFIED: 0>¶
- ACTIVE = <BasinState.ACTIVE: 1>¶
- CREATING = <BasinState.CREATING: 2>¶
- DELETING = <BasinState.DELETING: 3>¶
- class streamstore.schemas.BasinInfo(name: str, scope: BasinScope, state: BasinState)[source]¶
Basin information.
- scope: BasinScope¶
Basin scope.
- state: BasinState¶
Basin state.
- class streamstore.schemas.StreamInfo(name: str, created_at: datetime, deleted_at: datetime | None)[source]¶
Stream information.
- enum streamstore.schemas.StorageClass(value)[source]¶
Storage class for recent appends.
Valid values are as follows:
- STANDARD = <StorageClass.STANDARD: 1>¶
Offers end-to-end latencies under 500 ms.
- EXPRESS = <StorageClass.EXPRESS: 2>¶
Offers end-to-end latencies under 50 ms.
- enum streamstore.schemas.TimestampingMode(value)[source]¶
Timestamping mode.
Note
The arrival time is always in milliseconds since Unix epoch.
Valid values are as follows:
- UNSPECIFIED = <TimestampingMode.UNSPECIFIED: 0>¶
Defaults to
CLIENT_PREFER.
- CLIENT_PREFER = <TimestampingMode.CLIENT_PREFER: 1>¶
Prefer client-specified timestamp if present, otherwise use arrival time.
- CLIENT_REQUIRE = <TimestampingMode.CLIENT_REQUIRE: 2>¶
Require a client-specified timestamp and reject the append if it is absent.
- ARRIVAL = <TimestampingMode.ARRIVAL: 3>¶
Use the arrival time and ignore any client-specified timestamp.
- class streamstore.schemas.Timestamping(mode: TimestampingMode | None = None, uncapped: bool | None = None)[source]¶
Timestamping behavior.
- mode: TimestampingMode | None¶
Timestamping mode.
If not specified, the default is
TimestampingMode.CLIENT_PREFER.
- class streamstore.schemas.StreamConfig(storage_class: StorageClass | None = None, retention_age: int | None = None, timestamping: Timestamping | None = None, delete_on_empty_min_age: int | None = None)[source]¶
Stream configuration.
- storage_class: StorageClass | None¶
Storage class for this stream.
If not specified, the default is
StorageClass.EXPRESS.
- retention_age: int | None¶
Age in seconds for automatic trimming of records older than this threshold.
If not specified, the default is to retain records for 7 days.
If set to
0, the stream will have infinite retention. (While S2 is in public preview, this is capped at 28 days. Let us know if you’d like the cap removed.)
- timestamping: Timestamping | None¶
Timestamping behavior for appends to this stream, which influences how timestamps are handled.
- class streamstore.schemas.BasinConfig(default_stream_config: StreamConfig | None = None, create_stream_on_append: bool | None = None)[source]¶
Basin configuration.
- default_stream_config: StreamConfig | None¶
Default configuration for streams in this basin.
- enum streamstore.schemas.ResourceMatchOp(value)[source]¶
Resource match operator.
Valid values are as follows:
- EXACT = <ResourceMatchOp.EXACT: 1>¶
Match only the resource with the exact value. Use an empty string to match no resources.
- PREFIX = <ResourceMatchOp.PREFIX: 2>¶
Match all resources that start with the prefix value. Use an empty string to match all resources.
- class streamstore.schemas.ResourceMatchRule(match_op: ResourceMatchOp, value: str)[source]¶
Resource match rule.
- match_op: ResourceMatchOp¶
Match operator.
- enum streamstore.schemas.Permission(value)[source]¶
Permission.
Valid values are as follows:
- UNSPECIFIED = <Permission.UNSPECIFIED: 0>¶
- READ = <Permission.READ: 1>¶
- WRITE = <Permission.WRITE: 2>¶
- READ_WRITE = <Permission.READ_WRITE: 3>¶
- class streamstore.schemas.OperationGroupPermissions(account: Permission = Permission.UNSPECIFIED, basin: Permission = Permission.UNSPECIFIED, stream: Permission = Permission.UNSPECIFIED)[source]¶
Operation group permissions.
- account: Permission¶
Permission for account operations.
- basin: Permission¶
Permission for basin operations.
- stream: Permission¶
Permission for stream operations.
- enum streamstore.schemas.Operation(value)[source]¶
Operation.
Valid values are as follows:
- UNSPECIFIED = <Operation.UNSPECIFIED: 0>¶
- LIST_BASINS = <Operation.LIST_BASINS: 1>¶
- CREATE_BASIN = <Operation.CREATE_BASIN: 2>¶
- DELETE_BASIN = <Operation.DELETE_BASIN: 3>¶
- RECONFIGURE_BASIN = <Operation.RECONFIGURE_BASIN: 4>¶
- GET_BASIN_CONFIG = <Operation.GET_BASIN_CONFIG: 5>¶
- ISSUE_ACCESS_TOKEN = <Operation.ISSUE_ACCESS_TOKEN: 6>¶
- REVOKE_ACCESS_TOKEN = <Operation.REVOKE_ACCESS_TOKEN: 7>¶
- LIST_ACCESS_TOKENS = <Operation.LIST_ACCESS_TOKENS: 8>¶
- LIST_STREAMS = <Operation.LIST_STREAMS: 9>¶
- CREATE_STREAM = <Operation.CREATE_STREAM: 10>¶
- DELETE_STREAM = <Operation.DELETE_STREAM: 11>¶
- GET_STREAM_CONFIG = <Operation.GET_STREAM_CONFIG: 12>¶
- RECONFIGURE_STREAM = <Operation.RECONFIGURE_STREAM: 13>¶
- CHECK_TAIL = <Operation.CHECK_TAIL: 14>¶
- APPEND = <Operation.APPEND: 15>¶
- READ = <Operation.READ: 16>¶
- class streamstore.schemas.AccessTokenScope(basins: ~streamstore.schemas.ResourceMatchRule | None = None, streams: ~streamstore.schemas.ResourceMatchRule | None = None, access_tokens: ~streamstore.schemas.ResourceMatchRule | None = None, op_group_perms: ~streamstore.schemas.OperationGroupPermissions | None = None, ops: list[~streamstore.schemas.Operation] = <factory>)[source]¶
Access token scope.
- basins: ResourceMatchRule | None¶
Allowed basins.
- streams: ResourceMatchRule | None¶
Allowed streams.
- access_tokens: ResourceMatchRule | None¶
Allowed access token IDs.
- op_group_perms: OperationGroupPermissions | None¶
Permissions at operation group level.
- class streamstore.schemas.AccessTokenInfo(id: str, scope: AccessTokenScope, expires_at: int | None, auto_prefix_streams: bool)[source]¶
Access token information.
- scope: AccessTokenScope¶
Access token scope.
- enum streamstore.schemas.Cloud(value)[source]¶
Cloud in which the S2 service runs.
Valid values are as follows:
- AWS = <Cloud.AWS: 1>¶
- class streamstore.utils.CommandRecord[source]¶
Factory class for creating command records.
- static fence(token: str) Record[source]¶
Create a fence command record.
- Parameters:
token – Fencing token. Its UTF-8 byte count must not exceed 36 bytes. If empty, clears the previously set token.
- static trim(desired_first_seq_num: int) Record[source]¶
Create a trim command record.
- Parameters:
desired_first_seq_num – Sequence number for the first record to exist after trimming preceeding records in the stream.
Note
If
desired_first_seq_numwas smaller than the sequence number for the first existing record in the stream, trimming doesn’t happen.
- streamstore.utils.metered_bytes(records: Iterable[Record | SequencedRecord]) int[source]¶
Each record is metered using the following formula:
8 + 2 * len(headers) + sum((len(name) + len(value)) for (name, value) in headers) + len(body)
- async streamstore.utils.append_inputs_gen(records: AsyncIterable[Record], match_seq_num: int | None = None, fencing_token: str | None = None, max_records_per_batch: int = 1000, max_bytes_per_batch: int = 1048576, max_linger_per_batch: timedelta | None = None) AsyncIterable[AppendInput][source]¶
Generator function for batching records and yielding
AppendInput.Returned generator object can be used as the parameter to
Stream.append_session().- Yields:
- Parameters:
records – Records that have to be appended to a stream.
match_seq_num – If it is not
None, it is used in the first yield ofAppendInputand is automatically advanced for subsequent yields.fencing_token – Used in each yield of
AppendInput.max_records_per_batch – Maximum number of records in each batch.
max_bytes_per_batch – Maximum size of each batch calculated using
metered_bytes().max_linger_per_batch – Maximum duration for each batch to accumulate records before yielding.
Note
If
max_linger_per_batchisNone,AppendInputwill be yielded only whenmax_records_per_batchormax_bytes_per_batchis reached.