Source code for aws_bedrock_runtime_mate.patterns.multi_round_converse

# -*- coding: utf-8 -*-

"""
Multi-round conversation management for Amazon Bedrock Runtime.

This module provides high-level abstractions for managing multi-turn conversations
with Amazon Bedrock foundation models. It automatically handles conversation history,
message state management, and provides both regular and streaming response modes.

Key Features:

1. **Automatic History Management**: Maintains conversation history across multiple turns
2. **Unified Configuration**: Set default parameters for all API calls in a session
3. **Streaming Support**: Full support for streaming responses with automatic message collection
4. **Debug Mode**: Optional verbose logging for debugging and monitoring
5. **Type Safety**: Full type hints for all conversation operations

The main component is ``ChatSession``, which manages stateful multi-turn conversations
while providing a simple, intuitive API for both regular and streaming interactions.
"""

import typing as T
import json
import hashlib
import dataclasses

from func_args.api import BaseModel
from rich import print as rprint

from ..converse import ConverseKwargs, ConverseResponse, MessageContentBuilder
from ..converse_stream import ConverseStreamOutput, ConverseStreamResponse

if T.TYPE_CHECKING:  # pragma: no cover
    from mypy_boto3_bedrock_runtime import Client
    from mypy_boto3_bedrock_runtime.type_defs import (
        MessageUnionTypeDef,
    )
    from boto3_dataclass_bedrock_runtime.type_defs import (
        ConverseResponse,
    )


[docs] def json_hash(obj: T.Any) -> str: """ Generate MD5 hash of a JSON-serializable object for debugging. This utility function creates a deterministic hash of any JSON-serializable object by converting it to formatted JSON and computing its MD5 digest. Useful for tracking message state changes in conversation history. Args: obj: Any JSON-serializable object (dict, list, str, etc.) Returns: str: MD5 hash hexdigest of the JSON representation Example:: messages = [{"role": "user", "content": [{"text": "Hello"}]}] hash_value = json_hash(messages) print(hash_value) # "a1b2c3d4e5f6..." """ j = json.dumps(obj, ensure_ascii=False, sort_keys=True, indent=4) h = hashlib.md5(j.encode("utf-8")).hexdigest() return h
[docs] @dataclasses.dataclass class ChatSession(BaseModel): """ Stateful multi-turn conversation manager for Amazon Bedrock Runtime. This class manages conversation state across multiple API calls, automatically maintaining message history and applying default configuration. It provides a simplified interface for building chatbots, conversational AI, and multi-turn interactions with Bedrock foundation models. **Key Capabilities:** - **Conversation History**: Automatically tracks all messages (user and assistant) - **Default Configuration**: Apply consistent settings (model, inference config, etc.) across all turns - **Streaming Support**: Full support for streaming responses with automatic message collection - **Debug Mode**: Verbose logging for troubleshooting and monitoring - **Flexible Overrides**: Override default configuration per-message as needed **Architecture:** The session maintains an internal message list that grows with each turn. Every API call includes the full conversation history, allowing the model to maintain context across turns. For streaming responses, the session automatically collects the complete message for history after streaming completes. Attributes: client: AWS Bedrock Runtime client from boto3 converse_kwargs: Default configuration for all converse API calls printer: Custom print function for debug output (default: rich.print) verbose: Enable verbose debug logging Example:: import boto3 from aws_bedrock_runtime_mate.patterns.multi_round_converse import ( ChatSession, ConverseKwargs, ) # Initialize client = boto3.client("bedrock-runtime") session = ChatSession( client=client, converse_kwargs=ConverseKwargs( model_id="anthropic.claude-3-sonnet-20240229-v1:0", inference_config={ "temperature": 0.7, "maxTokens": 1000, }, ), verbose=True, # Enable debug logging ) # Simple text conversation res1 = session.converse_text("What is the capital of France?") print(res1.text) # "The capital of France is Paris." # Streaming response res2 = session.converse_text_stream("Tell me more about Paris") for text in session.iterate_text(res2): print(text, end="", flush=True) # Complex message with images from aws_bedrock_runtime_mate.converse import MessageContentBuilder builder = MessageContentBuilder() builder.add_text("What's in this image?") builder.add_image(format="png", bytes=image_bytes) res3 = session.converse([builder.to_message()]) print(res3.text) See Also: - `converse API <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-runtime/client/converse.html>`_ - `converse_stream API <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-runtime/client/converse_stream.html>`_ """ # fmt: off client: "Client" = dataclasses.field() converse_kwargs: ConverseKwargs | None = dataclasses.field(default=None) printer: T.Callable = dataclasses.field(default=rprint) verbose: bool = dataclasses.field(default=False) _messages: list["MessageUnionTypeDef"] = dataclasses.field(init=False) # fmt: on def __post_init__(self): self._messages = []
[docs] def debug(self, msg: str): """ Print debug message if verbose mode is enabled. Args: msg: The debug message to print """ if self.verbose: self.printer(msg)
def _prepare_kwargs( self, converse_kwargs: ConverseKwargs | None, ) -> dict[str, T.Any]: """ Prepare API kwargs by merging per-call and default configurations. This internal method merges the per-call configuration with the session's default configuration, with per-call parameters taking precedence. Args: converse_kwargs: Per-call configuration to override defaults Returns: dict: Merged kwargs ready for boto3 API call """ if converse_kwargs is None: kwargs = {} else: kwargs = converse_kwargs.to_boto3_kwargs() if self.converse_kwargs is None: default_kwargs = {} else: default_kwargs = self.converse_kwargs.to_boto3_kwargs() kwargs.update(default_kwargs) return kwargs
[docs] def converse( self, messages: T.Sequence["MessageUnionTypeDef"], converse_kwargs: ConverseKwargs | None = None, ) -> "ConverseResponse": """ Send one or more messages and receive a non-streaming response. This method adds the provided messages to the conversation history, sends all accumulated messages to the model, and stores the assistant's response in the history. This directly maps to the AWS Bedrock converse API. Args: messages: One or more message dictionaries to send. Each message should have "role" and "content" fields. converse_kwargs: Optional configuration to override session defaults for this specific call Returns: ConverseResponse: Enhanced response object with convenient text extraction Example:: from aws_bedrock_runtime_mate.converse import MessageContentBuilder # Send simple text builder = MessageContentBuilder() builder.add_text("What is 2+2?") response = session.converse([builder.to_message()]) print(response.text) # "2+2 equals 4" # Send with custom config response = session.converse( messages=[builder.to_message()], converse_kwargs=ConverseKwargs( inference_config={"temperature": 0.5} ) ) """ self._messages.extend(messages) messages = self._messages hashes = json_hash(messages) n_messages = len(messages) self.debug(f"===== Send messages, n_msg = {n_messages}, hash = {hashes}") kwargs = self._prepare_kwargs(converse_kwargs=converse_kwargs) kwargs["messages"] = messages self.debug("----- Converse kwargs:") self.debug(kwargs) response = self.client.converse(**kwargs) response = ConverseResponse(boto3_raw_data=response) self.debug("----- Converse response:") response_metadata = response.boto3_raw_data.pop("ResponseMetadata") self.debug(response.boto3_raw_data) response.boto3_raw_data["ResponseMetadata"] = response_metadata self._messages.append(response.output.message.boto3_raw_data) return response
[docs] def converse_text( self, message: str, converse_kwargs: ConverseKwargs | None = None, ) -> "ConverseResponse": """ Send a single text message and receive a non-streaming response. This is a convenience method that wraps the text in a proper message structure and calls ``converse``. Ideal for simple text-only interactions. Args: message: The text message to send converse_kwargs: Optional configuration to override session defaults Returns: ConverseResponse: Enhanced response object with convenient text extraction Example:: session = ChatSession(client=client, converse_kwargs=ConverseKwargs( model_id="anthropic.claude-3-sonnet-20240229-v1:0" )) # First turn response = session.converse_text("Hello, how are you?") print(response.text) # Second turn (maintains context) response = session.converse_text("What was my first question?") print(response.text) # Model remembers the conversation """ content_builder = MessageContentBuilder() content_builder.add_text(message) return self.converse( messages=[content_builder.to_message()], converse_kwargs=converse_kwargs, )
[docs] def converse_stream( self, messages: T.Sequence["MessageUnionTypeDef"], converse_kwargs: ConverseKwargs | None = None, ) -> "ConverseStreamResponse": """ Send one or more messages and receive a streaming response. This method adds the provided messages to the conversation history and initiates a streaming response. The response must be consumed using ``iterate_events`` or ``iterate_text`` to automatically collect the complete message for conversation history. This directly maps to the AWS Bedrock converse_stream API. Args: messages: One or more message dictionaries to send converse_kwargs: Optional configuration to override session defaults Returns: ConverseStreamResponse: Enhanced streaming response object Note: After consuming the stream using ``iterate_events`` or ``iterate_text``, the session automatically adds the assistant's complete message to the conversation history. Example:: from aws_bedrock_runtime_mate.converse import MessageContentBuilder builder = MessageContentBuilder() builder.add_text("Tell me a story") response = session.converse_stream([builder.to_message()]) # Consume the stream and collect message for event in session.iterate_events(response): if event.text: print(event.text, end="", flush=True) """ self._messages.extend(messages) messages = self._messages hashes = json_hash(messages) n_messages = len(messages) self.debug(f"===== Send messages, n_msg = {n_messages}, hash = {hashes}") kwargs = self._prepare_kwargs(converse_kwargs=converse_kwargs) kwargs["messages"] = messages self.debug("----- Converse kwargs:") self.debug(kwargs) response = self.client.converse_stream(**kwargs) response = ConverseStreamResponse(boto3_raw_data=response) self.debug("----- Converse response:") response_metadata = response.boto3_raw_data.pop("ResponseMetadata") self.debug(response.boto3_raw_data) response.boto3_raw_data["ResponseMetadata"] = response_metadata # self._messages.append(response.output.message.boto3_raw_data) return response
[docs] def converse_text_stream( self, message: str, converse_kwargs: ConverseKwargs | None = None, ) -> "ConverseStreamResponse": """ Send a single text message and receive a streaming response. This is a convenience method that wraps the text in a proper message structure and calls ``converse_stream``. Perfect for streaming text-only conversations with real-time output. Args: message: The text message to send converse_kwargs: Optional configuration to override session defaults Returns: ConverseStreamResponse: Enhanced streaming response object Example:: session = ChatSession(client=client, converse_kwargs=ConverseKwargs( model_id="anthropic.claude-3-sonnet-20240229-v1:0" )) # Stream the response response = session.converse_text_stream("Write a poem about AI") # Print text as it streams for text_chunk in session.iterate_text(response): print(text_chunk, end="", flush=True) print() # New line after streaming # Continue conversation (history includes streamed response) response2 = session.converse_text("Make it shorter") print(response2.text) """ content_builder = MessageContentBuilder() content_builder.add_text(message) return self.converse_stream( messages=[content_builder.to_message()], converse_kwargs=converse_kwargs, )
[docs] def collect_message(self, response: "ConverseStreamResponse"): """ Collect the complete message from a streaming response and add to history. This method is automatically called by ``iterate_events`` and ``iterate_text`` after the stream is fully consumed. It ensures the assistant's message is added to the conversation history exactly once. Args: response: The streaming response to collect from Note: You typically don't need to call this method directly. Use ``iterate_events`` or ``iterate_text`` instead, which handle collection automatically. """ if response.is_message_collected() is False: self._messages.append(response.message) object.__setattr__(response, "_message_collected", True)
[docs] def iterate_events( self, response: "ConverseStreamResponse", ) -> T.Generator["ConverseStreamOutput", None, None]: """ Iterate over all streaming events and automatically collect the message. This is the recommended way to consume streaming responses in a chat session. After all events are yielded, the method automatically adds the complete assistant message to the conversation history. Args: response: The streaming response from ``converse_stream`` or ``converse_text_stream`` Yields: ConverseStreamOutput: Enhanced event objects with helper methods Example:: response = session.converse_text_stream("Tell me about Python") # Process all events for event in session.iterate_events(response): if event.is_messageStart(): print("Message starting...") elif event.is_contentBlockDelta(): if event.text: print(event.text, end="", flush=True) elif event.is_messageStop(): print(f"\\nDone. Stop reason: {event.messageStop.stopReason}") # Message is now in history - continue conversation response2 = session.converse_text("Tell me more") """ yield from response.iterate_events() self.collect_message(response)
[docs] def iterate_text( self, response: "ConverseStreamResponse", ) -> T.Generator[str, None, None]: """ Iterate over text content only and automatically collect the message. This is the simplest way to consume streaming text responses in a chat session. After all text chunks are yielded, the method automatically adds the complete assistant message to the conversation history. Args: response: The streaming response from ``converse_stream`` or ``converse_text_stream`` Yields: str: Text chunks from contentBlockDelta events Example:: response = session.converse_text_stream("Write a haiku") # Stream text to console for text_chunk in session.iterate_text(response): print(text_chunk, end="", flush=True) print() # New line # Or collect all text response2 = session.converse_text_stream("Write another") full_text = "".join(session.iterate_text(response2)) print(full_text) # Both messages are now in history """ yield from response.iterate_text() self.collect_message(response)