API Reference¶
- class streamstore.S2(auth_token: str, endpoints: Endpoints | None = None, request_timeout: timedelta = datetime.timedelta(seconds=5), max_retries: int = 3, enable_append_retries: bool = True, enable_compression: bool = False)[source]¶
Async client for interacting with gRPC API of S2.
- Parameters:
auth_token – Authentication token generated from S2 dashboard.
endpoints – S2 endpoints. If None, public endpoints for S2 service running in AWS cloud will be used.
request_timeout – Timeout for gRPC requests made by the client. Default value is 5 seconds.
max_retries – Maximum number of retries for a gRPC request. Default value is 3.
enable_append_retries – Enable retries for appends i.e for both
Stream.append()
andStream.append_session()
. Default value is True.enable_compression – Enable compression (Gzip) for
Stream.append()
,Stream.append_session()
,Stream.read()
, andStream.read_session()
. Default value is False.
- async close() None [source]¶
Close all open connections to S2 service endpoints.
Tip
S2
supports async context manager protocol, so you can also do the following instead of explicitly closing:async with S2(..) as s2: ..
- async create_basin(name: str, default_stream_storage_class: StorageClass | None = None, default_stream_retention_age: timedelta | None = None, create_stream_on_append: bool = False) BasinInfo [source]¶
Create a basin.
- Parameters:
name – Name of the basin.
default_stream_storage_class – Default storage class for streams of this basin.
default_stream_retention_age – Default threshold for automatic trimming of records in the streams of this basin. If not specified, streams will have infinite retention.
create_stream_on_append – Create stream on append if it doesn’t exist.
Note
name must be globally unique and must be between 8 and 48 characters, comprising lowercase letters, numbers and hyphens. It cannot begin or end with a hyphen.
- basin(name: str) Basin [source]¶
Get a Basin object that can be used for performing basin operations.
- Parameters:
name – Name of the basin.
Note
The basin must have been created already, else the operations will fail.
Tip
async with S2(..) as s2: basin = s2.basin("your-basin-name")
S2
implements thegetitem
magic method, so you can also do the following instead:async with S2(..) as s2: basin = s2["your-basin-name"]
- async list_basins(prefix: str = '', start_after: str = '', limit: int | None = None) Page[BasinInfo] [source]¶
List basins.
- Parameters:
prefix – List only those that begin with this value.
start_after – List only those that lexicographically start after this value, which can be the name of the last item from previous page, to continue from there. It must be greater than or equal to the prefix if specified.
limit – Number of items to return in one page. Maximum number of items that can be returned in one page is 1000.
- async delete_basin(name: str) None [source]¶
Delete a basin.
- Parameters:
name – Name of the basin.
Note
Basin deletion is asynchronous, and may take a few minutes to complete.
- async get_basin_config(name: str) BasinConfig [source]¶
Get the current configuration of a basin.
- Parameters:
name – Name of the basin.
- async reconfigure_basin(name: str, default_stream_storage_class: StorageClass | None = None, default_stream_retention_age: timedelta | None = None, create_stream_on_append: bool = False) BasinConfig [source]¶
Modify the configuration of a basin.
- Parameters:
name – Name of the basin.
default_stream_storage_class – Default storage class for streams of this basin.
default_stream_retention_age – Default threshold for automatic trimming of records in the streams of this basin. If not specified, streams will have infinite retention.
create_stream_on_append – Create stream on append if it doesn’t exist.
Note
Modifiying the default stream-related configuration doesn’t affect already existing streams; it only applies to new streams created hereafter.
- class streamstore.Basin[source]¶
Caution
Returned by
S2.basin()
. Do not instantiate directly.- async create_stream(name: str, storage_class: StorageClass | None = None, retention_age: timedelta | None = None) StreamInfo [source]¶
Create a stream.
- Parameters:
name – Name of the stream.
storage_class – Storage class for this stream.
retention_age – Thresold for automatic trimming of records in this stream. If not specified, the stream will have infinite retention.
Note
name must be unique within the basin. It can be an arbitrary string upto 512 characters. Backslash (/) is recommended as a delimiter for hierarchical naming.
- stream(name: str) Stream [source]¶
Get a Stream object that can be used for performing stream operations.
- Parameters:
name – Name of the stream.
Note
The stream must have been created already, else the operations will fail.
Tip
async with S2(..) as s2: stream = s2.basin("your-basin-name").stream("your-stream-name")
Basin
implements thegetitem
magic method, so you can also do the following instead:async with S2(..) as s2: stream = s2["your-basin-name"]["your-stream-name"]
- async list_streams(prefix: str = '', start_after: str = '', limit: int | None = None) Page[StreamInfo] [source]¶
List streams.
- Parameters:
prefix – List only those that begin with this value.
start_after – List only those that lexicographically start after this value, which can be the name of the last item from previous page, to continue from there. It must be greater than or equal to the prefix if specified.
limit – Number of items to return in one page. Maximum number of items that can be returned in one page is 1000.
- async delete_stream(name: str) None [source]¶
Delete a stream.
- Parameters:
name – Name of the stream.
Note
Stream deletion is asynchronous, and may take a few minutes to complete.
- async get_stream_config(name: str) StreamConfig [source]¶
Get the current configuration of a stream.
- Parameters:
name – Name of the stream.
- async reconfigure_stream(name: str, storage_class: StorageClass | None = None, retention_age: timedelta | None = None) StreamConfig [source]¶
Modify the configuration of a stream.
- Parameters:
name – Name of the stream.
storage_class – Storage class for this stream.
retention_age – Thresold for automatic trimming of records in this stream. If not specified, the stream will have infinite retention.
Note
Modifying storage_class will take effect only when this stream has been inactive for 10 minutes. This will become a live migration in future.
- class streamstore.Stream[source]¶
Caution
Returned by
Basin.stream()
. Do not instantiate directly.- async check_tail() int [source]¶
- Returns:
Sequence number that will be assigned to the next record on a stream.
- async append(input: AppendInput) AppendOutput [source]¶
Append a batch of records to a stream.
- async append_session(inputs: AsyncIterable[AppendInput]) AsyncIterable[AppendOutput] [source]¶
Append batches of records to a stream continuously, while guaranteeing pipelined inputs are processed in order.
Tip
You can use
append_inputs_gen()
for automatic batching of records instead of explicitly preparing and providing batches of records.- Yields:
AppendOutput
for each correspondingAppendInput
.- Returns:
If
enable_append_retries=False
inS2
, and if processing any of theAppendInput
fails.(or)
If
enable_append_retries=True
inS2
, and if retry budget gets exhausted after trying to recover from failures.
- async read(start_seq_num: int, limit: ReadLimit | None = None, ignore_command_records: bool = False) list[SequencedRecord] | FirstSeqNum | NextSeqNum [source]¶
Read a batch of records from a stream.
- Parameters:
start_seq_num – Starting sequence number (inclusive).
limit – Number of records to return, up to a maximum of 1000 or 1MiB of
metered_bytes()
.ignore_command_records – Filters out command records if present from the batch.
- Returns:
Batch of sequenced records. It can be empty only if limit was provided, and the first record that could have been returned violated the limit.
(or)
Sequence number for the first record on this stream, if the provided start_seq_num was smaller.
(or)
Sequence number for the next record on this stream, if the provided start_seq_num was larger.
- async read_session(start_seq_num: int, limit: ReadLimit | None = None, ignore_command_records: bool = False) AsyncIterable[list[SequencedRecord] | FirstSeqNum | NextSeqNum] [source]¶
Read batches of records from a stream continuously.
- Parameters:
start_seq_num – Starting sequence number (inclusive).
limit – Number of records to return, up to a maximum of 1000 or 1MiB of
metered_bytes()
.ignore_command_records – Filters out command records if present from the batch.
Note
With a session, you are able to read in a streaming fashion. If a limit was not provided and the end of the stream is reached, the session goes into real-time tailing mode and will yield records as they are appended to the stream.
- Yields:
Batch of sequenced records. It can be empty only if limit was provided, and the first record that could have been returned violated the limit.
(or)
Sequence number for the first record on this stream, if the provided start_seq_num was smaller.
(or)
Sequence number for the next record on this stream, if the provided start_seq_num was larger.
- Returns:
If limit was provided, and if it was met or the end of the stream was reached before meeting it.
(or)
If previous yield was not a batch of sequenced records.
- class streamstore.schemas.Record(body: bytes, headers: list[tuple[bytes, bytes]] = [])[source]¶
Record to be appended to a stream.
- class streamstore.schemas.AppendInput(records: list[Record], match_seq_num: int | None = None, fencing_token: bytes | None = None)[source]¶
Used in the parameters to
Stream.append()
andStream.append_session()
.- records: list[Record]¶
Batch of records to append atomically, which must contain at least one record, and no more than 1000. The size of the batch must not exceed 1MiB of
metered_bytes()
.
- class streamstore.schemas.AppendOutput(start_seq_num: int, end_seq_num: int, next_seq_num: int)[source]¶
Returned from
Stream.append()
.(or)
Yielded from
Stream.append_session()
.
- class streamstore.schemas.ReadLimit(count: int | None = None, bytes: int | None = None)[source]¶
Used in the parameters to
Stream.read()
andStream.read_session()
.If both
count
andbytes
are specified, either limit may be hit.- bytes: int | None¶
Cumulative size of records calculated using
metered_bytes()
.
- class streamstore.schemas.SequencedRecord(seq_num: int, body: bytes, headers: list[tuple[bytes, bytes]])[source]¶
Record read from a stream.
- enum streamstore.schemas.BasinScope(value)[source]¶
Scope of a basin.
Valid values are as follows:
- UNSPECIFIED = <BasinScope.UNSPECIFIED: 0>¶
UNSPECIFIED
defaults toAWS_US_EAST_1
.
- AWS_US_EAST_1 = <BasinScope.AWS_US_EAST_1: 1>¶
AWS
us-east-1
region.
- enum streamstore.schemas.BasinState(value)[source]¶
Current state of a basin.
Valid values are as follows:
- UNSPECIFIED = <BasinState.UNSPECIFIED: 0>¶
- ACTIVE = <BasinState.ACTIVE: 1>¶
- CREATING = <BasinState.CREATING: 2>¶
- DELETING = <BasinState.DELETING: 3>¶
- class streamstore.schemas.BasinInfo(name: str, scope: BasinScope, state: BasinState)[source]¶
Basin information.
- scope: BasinScope¶
Basin scope.
- state: BasinState¶
Basin state.
- class streamstore.schemas.StreamInfo(name: str, created_at: datetime, deleted_at: datetime | None)[source]¶
Stream information.
- enum streamstore.schemas.StorageClass(value)[source]¶
Storage class for recent appends.
Valid values are as follows:
- UNSPECIFIED = <StorageClass.UNSPECIFIED: 0>¶
UNSPECIFIED
defaults toEXPRESS
.
- STANDARD = <StorageClass.STANDARD: 1>¶
Offers end-to-end latencies under 500 ms.
- EXPRESS = <StorageClass.EXPRESS: 2>¶
Offers end-to-end latencies under 50 ms.
- class streamstore.schemas.StreamConfig(storage_class: StorageClass, retention_age: timedelta)[source]¶
Current configuration of a stream.
- storage_class: StorageClass¶
Storage class for this stream.
- class streamstore.schemas.BasinConfig(default_stream_config: StreamConfig, create_stream_on_append: bool)[source]¶
Current configuration of a basin.
- default_stream_config: StreamConfig¶
Default configuration for streams in this basin.
- enum streamstore.schemas.Cloud(value)[source]¶
Cloud in which the S2 service runs.
Valid values are as follows:
- AWS = <Cloud.AWS: 1>¶
- class streamstore.utils.CommandRecord[source]¶
Factory class for creating command records.
- static fence(token: bytes) Record [source]¶
Create a fence command record.
- Parameters:
token – Fencing token. Cannot exceed 16 bytes. If empty, clears the previously set token.
- static trim(desired_first_seq_num: int) Record [source]¶
Create a trim command record.
- Parameters:
desired_first_seq_num – Sequence number for the first record to exist after trimming preceeding records in the stream.
Note
If desired_first_seq_num was smaller than the sequence number for the first existing record in the stream, trimming doesn’t happen.
- streamstore.utils.metered_bytes(records: Iterable[Record | SequencedRecord]) int [source]¶
Each record is metered using the following formula:
8 + 2 * len(headers) + sum((len(name) + len(value)) for (name, value) in headers) + len(body)
- async streamstore.utils.append_inputs_gen(records: AsyncIterable[Record], match_seq_num: int | None = None, fencing_token: bytes | None = None, max_records_per_batch: int = 1000, max_bytes_per_batch: int = 1048576, max_linger_per_batch: timedelta | None = None) AsyncIterable[AppendInput] [source]¶
Generator function for batching records and yielding
AppendInput
.Returned generator object can be used as the parameter to
Stream.append_session()
.- Yields:
- Parameters:
records – Records that have to be appended to a stream.
match_seq_num – If it is not
None
, it is used in the first yield ofAppendInput
and is automatically advanced for subsequent yields.fencing_token – Used in each yield of
AppendInput
.max_records_per_batch – Maximum number of records in each batch.
max_bytes_per_batch – Maximum size of each batch calculated using
metered_bytes()
.max_linger_per_batch – Maximum duration for each batch to accumulate records before yielding.
Note
If max_linger_per_batch is
None
,AppendInput
will be yielded only when max_records_per_batch or max_bytes_per_batch is reached.