converse_stream

This module provides an improved interface for the AWS Bedrock Runtime converse_stream API.

The main improvements over the raw AWS API include:

  1. Type-safe event checking: Boolean methods to identify event types

  2. Convenient text extraction: Automatic extraction of text from content blocks

  3. Stream management: Automatic caching of consumed streams for replay

  4. Simplified iteration: Generator methods for both events and text content

exception aws_bedrock_runtime_mate.converse_stream.InternalServerException[source]
exception aws_bedrock_runtime_mate.converse_stream.ModelStreamErrorException[source]
exception aws_bedrock_runtime_mate.converse_stream.ValidationException[source]
exception aws_bedrock_runtime_mate.converse_stream.ThrottlingException[source]
exception aws_bedrock_runtime_mate.converse_stream.ServiceUnavailableException[source]
class aws_bedrock_runtime_mate.converse_stream.ConverseStreamOutput(boto3_raw_data: type_defs.ConverseStreamOutputTypeDef)[source]

Enhanced wrapper for individual streaming output events from Bedrock converse_stream.

This class extends the base ConverseStreamOutput type from boto3_dataclass with convenient helper methods for:

  • Identifying the type of streaming event (messageStart, contentBlockDelta, etc.)

  • Extracting text content from content block deltas

  • Extracting reasoning content from content block deltas (for models that support it)

The Bedrock converse_stream API emits different event types during the streaming response lifecycle:

  • messageStart: Indicates the beginning of a message

  • contentBlockStart: Indicates the beginning of a content block

  • contentBlockDelta: Contains incremental content (text, reasoning, etc.)

  • contentBlockStop: Indicates the end of a content block

  • messageStop: Indicates the end of the message

Attributes:

text: The text content from a contentBlockDelta event, or None if not applicable reasoning_content_text: The reasoning content text from a contentBlockDelta event,

or None if not applicable (only available on models with reasoning capabilities)

is_messageStart() bool[source]

Check if this event represents the start of a message.

Returns:

bool: True if this is a messageStart event, False otherwise.

is_contentBlockStart() bool[source]

Check if this event represents the start of a content block.

Returns:

bool: True if this is a contentBlockStart event, False otherwise.

is_contentBlockDelta() bool[source]

Check if this event contains incremental content (delta).

This is the most common event type during streaming, containing actual content chunks like text or reasoning content.

Returns:

bool: True if this is a contentBlockDelta event, False otherwise.

is_contentBlockStop() bool[source]

Check if this event represents the end of a content block.

Returns:

bool: True if this is a contentBlockStop event, False otherwise.

is_messageStop() bool[source]

Check if this event represents the end of the message.

This event typically includes metadata like stop reason and token usage.

Returns:

bool: True if this is a messageStop event, False otherwise.

property text: str | None

Extract text content from a contentBlockDelta event.

This property provides convenient access to the text content within a streaming delta event. It automatically handles the nested structure of the AWS response and returns None if:

  • This is not a contentBlockDelta event

  • The delta doesn’t contain text content

  • The content structure is different (e.g., image, tool use)

Returns:

str | None: The text content chunk, or None if not applicable.

property reasoning_content_text: str | None

Extract reasoning content text from a contentBlockDelta event.

Some advanced Bedrock models (like Claude with extended thinking) can return reasoning content that shows the model’s thought process. This property provides convenient access to that reasoning text.

Returns:
str | None: The reasoning content text chunk, or None if:
  • This is not a contentBlockDelta event

  • The model doesn’t support reasoning content

  • The delta doesn’t contain reasoning content

class aws_bedrock_runtime_mate.converse_stream.ConverseStreamResponse(boto3_raw_data: type_defs.ConverseStreamResponseTypeDef, _streams: list[ConverseStreamOutput] = <factory>, _stream_consumed: bool = False, _message_collected: bool = False, _message: MessageUnionTypeDef | None = None, _text: str | None = None)[source]

Enhanced wrapper for the streaming response from Bedrock converse_stream API.

This class extends the base ConverseStreamResponse type with intelligent stream management capabilities:

  • Stream caching: Automatically caches events as they are consumed, allowing multiple iterations over the same response

  • Event iteration: Provides a generator for iterating over all streaming events

  • Text-only iteration: Convenient generator for extracting just the text content

The AWS Bedrock streaming API returns an event stream that can only be consumed once. This wrapper solves that limitation by caching events internally, enabling:

  1. Multiple iterations over the same response

  2. Safe inspection after initial consumption

  3. Simplified text extraction without manual event filtering

Attributes:

_streams: Internal cache of consumed streaming events _stream_consumed: Flag indicating whether the stream has been fully consumed

Example:

import boto3
from aws_bedrock_runtime_mate.converse_stream import ConverseStreamResponse

client = boto3.client("bedrock-runtime")
response = client.converse_stream(
    modelId="anthropic.claude-3-sonnet-20240229-v1:0",
    messages=[{"role": "user", "content": [{"text": "Tell me a story"}]}],
)

# Wrap the response
stream_response = ConverseStreamResponse(boto3_raw_data=response)

# First iteration - consumes the stream and caches events
full_text = ""
for text_chunk in stream_response.iterate_text():
    full_text += text_chunk
    print(text_chunk, end="", flush=True)

# Second iteration - replays cached events (no API call)
for event in stream_response.iterate_events():
    if event.is_messageStop():
        print(f"\nStop reason: {event.messageStop.stopReason}")
is_stream_consumed() bool[source]

Check if the streaming response has been fully consumed.

Once a stream is consumed, subsequent iterations will use cached events instead of attempting to read from the original stream.

Returns:

bool: True if the stream has been consumed, False otherwise.

Example:

stream_response = ConverseStreamResponse(boto3_raw_data=response)
print(stream_response.is_stream_consumed())  # False

# Consume the stream
for event in stream_response.iterate_events():
    pass

print(stream_response.is_stream_consumed())  # True
is_message_collected() bool[source]

Check if the message has been collected by multi round converse.

Returns:

bool: True if the message has been collected, False otherwise.

iterate_events() Generator[ConverseStreamOutput, None, None][source]

Iterate over all streaming events with automatic caching.

This method provides a generator that yields ConverseStreamOutput objects for each event in the stream. Key features:

  • First iteration: Consumes the AWS stream, wraps each event in ConverseStreamOutput, caches it, and yields it

  • Subsequent iterations: Yields cached events without making additional API calls

  • Thread-safe caching: Events are stored as they are consumed

The generator yields events in the order they were received:

  1. messageStart - Initial message metadata

  2. contentBlockStart - Beginning of content block(s)

  3. contentBlockDelta - Incremental content chunks (text, reasoning, etc.)

  4. contentBlockStop - End of content block(s)

  5. messageStop - Final message metadata (stop reason, usage stats)

Yields:

ConverseStreamOutput: Enhanced event objects with helper methods

Note:

TODO: Currently, this method collects all event types. Future versions may add filtering capabilities for specific content types (images, documents, video).

Example:

stream_response = ConverseStreamResponse(boto3_raw_data=response)

# Process all events
for event in stream_response.iterate_events():
    if event.is_messageStart():
        print("Message starting...")
    elif event.is_contentBlockDelta():
        if event.text:
            print(event.text, end="", flush=True)
    elif event.is_messageStop():
        print(f"\nFinished: {event.messageStop.stopReason}")

# Iterate again (uses cached events)
for event in stream_response.iterate_events():
    print(f"Event type: {type(event)}")
iterate_text() Generator[str, None, None][source]

Iterate over text content only, filtering out non-text events.

This is a convenience method that wraps iterate_events() and yields only the text content from contentBlockDelta events. It automatically:

  • Filters out non-delta events (messageStart, messageStop, etc.)

  • Extracts text content from delta events

  • Skips events with no text content (e.g., tool use, images)

This is the simplest way to collect or stream text responses from Bedrock models.

Yields:

str: Text chunks from contentBlockDelta events

Example:

stream_response = ConverseStreamResponse(boto3_raw_data=response)

# Stream text to console
for text_chunk in stream_response.iterate_text():
    print(text_chunk, end="", flush=True)

# Or collect all text
full_text = "".join(stream_response.iterate_text())
print(full_text)
property message: MessageUnionTypeDef
Note:

TODO: Currently, this method collects all event types. Future versions may add filtering capabilities for specific content types (images, documents, video).