Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion agent_sdks/python/src/a2ui/a2a.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
# limitations under the License.

import logging
from typing import Any, Optional, List
from typing import Any, Optional, List, AsyncIterable, TYPE_CHECKING

if TYPE_CHECKING:
from a2ui.core.parser.streaming import A2uiStreamParser
from a2a.server.agent_execution import RequestContext
from a2a.types import AgentExtension, Part, DataPart, TextPart

Expand Down Expand Up @@ -168,3 +170,39 @@ def try_activate_a2ui_extension(context: RequestContext) -> bool:
context.add_activated_extension(A2UI_EXTENSION_URI)
return True
return False


async def stream_response_to_parts(
parser: "A2uiStreamParser",
token_stream: AsyncIterable[str],
) -> AsyncIterable[Part]:
"""Helper to parse a stream of LLM tokens into A2A Parts incrementally.

Args:
parser: A2uiStreamParser instance to process the stream.
token_stream: An async iterable of strings (tokens).

Yields:
A2A Part objects as they are discovered in the stream.
"""
async for token in token_stream:
logger.info("-----------------------------")
logger.info(f"--- AGENT: Received token:\n{token}")
response_parts = parser.process_chunk(token)
logger.info(
f"--- AGENT: Response parts:\n{[part.a2ui_json for part in response_parts]}\n"
)
logger.info("-----------------------------")

for part in response_parts:
if part.text:
yield Part(root=TextPart(text=part.text))

if part.a2ui_json:
json_data = part.a2ui_json

if isinstance(json_data, list):
for message in json_data:
yield create_a2ui_part(message)
else:
yield create_a2ui_part(json_data)
12 changes: 8 additions & 4 deletions agent_sdks/python/src/a2ui/core/schema/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@

DEFAULT_WORKFLOW_RULES = f"""
The generated response MUST follow these rules:
1. The response can contain one or more A2UI JSON blocks.
2. Each A2UI JSON block MUST be wrapped in `{A2UI_OPEN_TAG}` and `{A2UI_CLOSE_TAG}` tags.
3. Between or around these blocks, you can provide conversational text.
4. The JSON part MUST be a single, raw JSON object (usually a list of A2UI messages) and MUST validate against the provided A2UI JSON SCHEMA.
- The response can contain one or more A2UI JSON blocks.
- Each A2UI JSON block MUST be wrapped in `{A2UI_OPEN_TAG}` and `{A2UI_CLOSE_TAG}` tags.
- Between or around these blocks, you can provide conversational text.
- The JSON part MUST be a single, raw JSON object (usually a list of A2UI messages) and MUST validate against the provided A2UI JSON SCHEMA.
- Top-Down Component Ordering: Within the `components` list of a message:
- The 'root' component MUST be the FIRST element.
- Parent components MUST appear before their child components.
This specific ordering allows the streaming parser to yield and render the UI incrementally as it arrives.
"""
124 changes: 78 additions & 46 deletions samples/agent/adk/contact_lookup/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import jsonschema

# Corrected imports from our new/refactored files
from google.adk.agents import run_config
from google.adk.agents.llm_agent import LlmAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
Expand All @@ -43,7 +43,11 @@
from a2ui.core.schema.manager import A2uiSchemaManager
from a2ui.core.parser.parser import parse_response, ResponsePart
from a2ui.basic_catalog.provider import BasicCatalog
from a2ui.a2a import create_a2ui_part, get_a2ui_agent_extension, parse_response_to_parts
from a2ui.a2a import (
get_a2ui_agent_extension,
parse_response_to_parts,
stream_response_to_parts,
)

logger = logging.getLogger(__name__)

Expand All @@ -68,6 +72,7 @@ def __init__(self, base_url: str, use_ui: bool = False):
)
self._agent = self._build_agent(use_ui)
self._user_id = "remote_agent"
self._parsers = {}
self._runner = Runner(
app_name=self._agent.name,
agent=self._agent,
Expand All @@ -77,14 +82,18 @@ def __init__(self, base_url: str, use_ui: bool = False):
)

def get_agent_card(self) -> AgentCard:
extensions = []
if self.use_ui:
extensions.append(
get_a2ui_agent_extension(
self._schema_manager.accepts_inline_catalogs,
self._schema_manager.supported_catalog_ids,
)
)

capabilities = AgentCapabilities(
streaming=True,
extensions=[
get_a2ui_agent_extension(
self._schema_manager.accepts_inline_catalogs,
self._schema_manager.supported_catalog_ids,
)
],
extensions=extensions,
)
skill = AgentSkill(
id="find_contact",
Expand Down Expand Up @@ -164,27 +173,29 @@ async def stream(self, query, session_id) -> AsyncIterable[dict[str, Any]]:
attempt = 0
current_query_text = query

# Ensure catalog schema was loaded
selected_catalog = self._schema_manager.get_selected_catalog()
if self.use_ui and not selected_catalog.catalog_schema:
logger.error(
"--- ContactAgent.stream: A2UI_SCHEMA is not loaded. "
"Cannot perform UI validation. ---"
)
yield {
"is_task_complete": True,
"parts": [
Part(
root=TextPart(
text=(
"I'm sorry, I'm facing an internal configuration error with"
" my UI components. Please contact support."
)
)
)
],
}
return
# Ensure catalog schema was loaded if UI is enabled
selected_catalog = None
if self.use_ui:
selected_catalog = self._schema_manager.get_selected_catalog()
if not selected_catalog.catalog_schema:
logger.error(
"--- ContactAgent.stream: A2UI_SCHEMA is not loaded. "
"Cannot perform UI validation. ---"
)
yield {
"is_task_complete": True,
"parts": [
Part(
root=TextPart(
text=(
"I'm sorry, I'm facing an internal configuration error with"
" my UI components. Please contact support."
)
)
)
],
}
return

while attempt <= max_retries:
attempt += 1
Expand All @@ -196,28 +207,47 @@ async def stream(self, query, session_id) -> AsyncIterable[dict[str, Any]]:
current_message = types.Content(
role="user", parts=[types.Part.from_text(text=current_query_text)]
)
final_response_content = None

async for event in self._runner.run_async(
user_id=self._user_id,
session_id=session.id,
new_message=current_message,
):
logger.info(f"Event from runner: {event}")
if event.is_final_response():
if event.content and event.content.parts and event.content.parts[0].text:
final_response_content = "\n".join(
[p.text for p in event.content.parts if p.text]
)
break # Got the final response, stop consuming events
else:
logger.info(f"Intermediate event: {event}")
# Yield intermediate updates on every attempt
full_content_list = []

async def token_stream():
async for event in self._runner.run_async(
user_id=self._user_id,
session_id=session.id,
run_config=run_config.RunConfig(
streaming_mode=run_config.StreamingMode.SSE
),
new_message=current_message,
):
if event.content and event.content.parts:
for p in event.content.parts:
if p.text:
full_content_list.append(p.text)
yield p.text

if self.use_ui:
from a2ui.core.parser.streaming import A2uiStreamParser

if session_id not in self._parsers:
self._parsers[session_id] = A2uiStreamParser(catalog=selected_catalog)

async for part in stream_response_to_parts(
self._parsers[session_id],
token_stream(),
):
yield {
"is_task_complete": False,
"parts": [part],
}
else:
async for token in token_stream():
yield {
"is_task_complete": False,
"updates": self.get_processing_message(),
"updates": token,
}

final_response_content = "".join(full_content_list)

if final_response_content is None:
logger.warning(
"--- ContactAgent.stream: Received no final response content from runner "
Expand All @@ -228,8 +258,10 @@ async def stream(self, query, session_id) -> AsyncIterable[dict[str, Any]]:
"I received no response. Please try again."
f"Please retry the original request: '{query}'"
)
logger.info(f"Retrying with query: {current_query_text}")
continue # Go to next retry
else:
logger.info("Retries exhausted on no-response")
# Retries exhausted on no-response
final_response_content = (
"I'm sorry, I encountered an error and couldn't process your request."
Expand Down
33 changes: 21 additions & 12 deletions samples/agent/adk/contact_lookup/agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from a2a.utils.errors import ServerError
from agent import ContactAgent
from a2ui.a2a import try_activate_a2ui_extension
from a2ui.a2a import create_a2ui_part

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -130,26 +131,24 @@ async def execute(
async for item in agent.stream(query, task.context_id):
is_task_complete = item["is_task_complete"]
if not is_task_complete:
await updater.update_status(
TaskState.working,
new_agent_text_message(item["updates"], task.context_id, task.id),
)
message = None
if "parts" in item:
message = new_agent_parts_message(item["parts"], task.context_id, task.id)
elif "updates" in item:
message = new_agent_text_message(item["updates"], task.context_id, task.id)

if message:
await updater.update_status(TaskState.working, message)
continue

final_state = TaskState.input_required # Default
final_state = TaskState.input_required
if action in ["send_email", "send_message", "view_full_profile"]:
final_state = TaskState.completed

final_parts = item["parts"]

logger.info("--- FINAL PARTS TO BE SENT ---")
for i, part in enumerate(final_parts):
logger.info(f" - Part {i}: Type = {type(part.root)}")
if isinstance(part.root, TextPart):
logger.info(f" - Text: {part.root.text[:200]}...")
elif isinstance(part.root, DataPart):
logger.info(f" - Data: {str(part.root.data)[:200]}...")
logger.info("-----------------------------")
self._log_parts(final_parts)

await updater.update_status(
final_state,
Expand All @@ -162,3 +161,13 @@ async def cancel(
self, request: RequestContext, event_queue: EventQueue
) -> Task | None:
raise ServerError(error=UnsupportedOperationError())

def _log_parts(self, parts: list[Part]):
logger.info("--- PARTS TO BE SENT ---")
for i, part in enumerate(parts):
logger.info(f" - Part {i}: Type = {type(part.root)}")
if isinstance(part.root, TextPart):
logger.info(f" - Text: {part.root.text[:200]}...")
elif isinstance(part.root, DataPart):
logger.info(f" - Data: {str(part.root.data)[:200]}...")
logger.info("-----------------------------")
48 changes: 24 additions & 24 deletions samples/agent/adk/contact_lookup/examples/action_confirmation.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,22 @@
}
}
},
{
"dataModelUpdate": {
"surfaceId": "action-modal",
"path": "/",
"contents": [
{
"key": "actionTitle",
"valueString": "Action Confirmation"
},
{
"key": "actionMessage",
"valueString": "Your action has been processed."
}
]
}
},
{
"surfaceUpdate": {
"surfaceId": "action-modal",
Expand Down Expand Up @@ -68,16 +84,6 @@
}
}
},
{
"id": "dismiss-button-text",
"component": {
"Text": {
"text": {
"literalString": "Dismiss"
}
}
}
},
{
"id": "dismiss-button",
"component": {
Expand All @@ -89,22 +95,16 @@
}
}
}
}
]
}
},
{
"dataModelUpdate": {
"surfaceId": "action-modal",
"path": "/",
"contents": [
{
"key": "actionTitle",
"valueString": "Action Confirmation"
},
{
"key": "actionMessage",
"valueString": "Your action has been processed."
"id": "dismiss-button-text",
"component": {
"Text": {
"text": {
"literalString": "Dismiss"
}
}
}
}
]
}
Expand Down
Loading
Loading