diff --git a/samples/agent/adk/restaurant_finder/agent.py b/samples/agent/adk/restaurant_finder/agent.py index e5239c5fd..a1a58b99c 100644 --- a/samples/agent/adk/restaurant_finder/agent.py +++ b/samples/agent/adk/restaurant_finder/agent.py @@ -27,6 +27,7 @@ Part, TextPart, ) +from google.adk.agents import run_config from google.adk.agents.llm_agent import LlmAgent from google.adk.artifacts import InMemoryArtifactService from google.adk.memory.in_memory_memory_service import InMemoryMemoryService @@ -45,7 +46,11 @@ from a2ui.core.parser.parser import parse_response, ResponsePart from a2ui.basic_catalog.provider import BasicCatalog from a2ui.core.schema.common_modifiers import remove_strict_validation -from a2ui.a2a import create_a2ui_part, get_a2ui_agent_extension, parse_response_to_parts +from a2ui.a2a import ( + get_a2ui_agent_extension, + parse_response_to_parts, + stream_response_to_parts, +) logger = logging.getLogger(__name__) @@ -71,6 +76,7 @@ def __init__(self, base_url: str, use_ui: bool = False): ) self._agent = self._build_agent(use_ui) self._user_id = "remote_agent" + self._parsers = {} self._runner = Runner( app_name=self._agent.name, agent=self._agent, @@ -80,14 +86,17 @@ def __init__(self, base_url: str, use_ui: bool = False): ) def get_agent_card(self) -> AgentCard: + extensions = [] + if self.use_ui: + extensions.append( + get_a2ui_agent_extension( + self._schema_manager.accepts_inline_catalogs, + self._schema_manager.supported_catalog_ids, + ) + ) capabilities = AgentCapabilities( streaming=True, - extensions=[ - get_a2ui_agent_extension( - self._schema_manager.accepts_inline_catalogs, - self._schema_manager.supported_catalog_ids, - ) - ], + extensions=extensions, ) skill = AgentSkill( id="find_restaurants", @@ -161,26 +170,28 @@ async def stream(self, query, session_id) -> AsyncIterable[dict[str, Any]]: current_query_text = query # Ensure schema was loaded - selected_catalog = self._schema_manager.get_selected_catalog() - if self.use_ui and not selected_catalog.catalog_schema: - logger.error( - "--- RestaurantAgent.stream: A2UI_SCHEMA is not loaded. " - "Cannot perform UI validation. ---" - ) - yield { - "is_task_complete": True, - "parts": [ - Part( - root=TextPart( - text=( - "I'm sorry, I'm facing an internal configuration error with" - " my UI components. Please contact support." - ) - ) - ) - ], - } - return + selected_catalog = None + if self.use_ui: + selected_catalog = self._schema_manager.get_selected_catalog() + if not selected_catalog.catalog_schema: + logger.error( + "--- RestaurantAgent.stream: A2UI_SCHEMA is not loaded. " + "Cannot perform UI validation. ---" + ) + yield { + "is_task_complete": True, + "parts": [ + Part( + root=TextPart( + text=( + "I'm sorry, I'm facing an internal configuration error with" + " my UI components. Please contact support." + ) + ) + ) + ], + } + return while attempt <= max_retries: attempt += 1 @@ -192,45 +203,46 @@ async def stream(self, query, session_id) -> AsyncIterable[dict[str, Any]]: current_message = types.Content( role="user", parts=[types.Part.from_text(text=current_query_text)] ) - final_response_content = None - async for event in self._runner.run_async( - user_id=self._user_id, - session_id=session.id, - new_message=current_message, - ): - logger.info(f"Event from runner: {event}") - if event.is_final_response(): - if event.content and event.content.parts and event.content.parts[0].text: - final_response_content = "\n".join( - [p.text for p in event.content.parts if p.text] - ) - break # Got the final response, stop consuming events - else: - logger.info(f"Intermediate event: {event}") - # Yield intermediate updates on every attempt + full_content_list = [] + + async def token_stream(): + async for event in self._runner.run_async( + user_id=self._user_id, + session_id=session.id, + run_config=run_config.RunConfig( + streaming_mode=run_config.StreamingMode.SSE + ), + new_message=current_message, + ): + if event.content and event.content.parts: + for p in event.content.parts: + if p.text: + full_content_list.append(p.text) + yield p.text + + if self.use_ui: + from a2ui.core.parser.streaming import A2uiStreamParser + + if session_id not in self._parsers: + self._parsers[session_id] = A2uiStreamParser(catalog=selected_catalog) + + async for part in stream_response_to_parts( + self._parsers[session_id], + token_stream(), + ): yield { "is_task_complete": False, - "updates": self.get_processing_message(), + "parts": [part], + } + else: + async for token in token_stream(): + yield { + "is_task_complete": False, + "updates": token, } - if final_response_content is None: - logger.warning( - "--- RestaurantAgent.stream: Received no final response content from" - f" runner (Attempt {attempt}). ---" - ) - if attempt <= max_retries: - current_query_text = ( - "I received no response. Please try again." - f"Please retry the original request: '{query}'" - ) - continue # Go to next retry - else: - # Retries exhausted on no-response - final_response_content = ( - "I'm sorry, I encountered an error and couldn't process your request." - ) - # Fall through to send this as a text-only error + final_response_content = "".join(full_content_list) is_valid = False error_message = "" diff --git a/samples/agent/adk/restaurant_finder/agent_executor.py b/samples/agent/adk/restaurant_finder/agent_executor.py index f552b407d..c0ab7892b 100644 --- a/samples/agent/adk/restaurant_finder/agent_executor.py +++ b/samples/agent/adk/restaurant_finder/agent_executor.py @@ -129,10 +129,14 @@ async def execute( async for item in agent.stream(query, task.context_id): is_task_complete = item["is_task_complete"] if not is_task_complete: - await updater.update_status( - TaskState.working, - new_agent_text_message(item["updates"], task.context_id, task.id), - ) + message = None + if "parts" in item: + message = new_agent_parts_message(item["parts"], task.context_id, task.id) + elif "updates" in item: + message = new_agent_text_message(item["updates"], task.context_id, task.id) + + if message: + await updater.update_status(TaskState.working, message) continue final_state = ( diff --git a/samples/client/angular/projects/restaurant/src/app/app.css b/samples/client/angular/projects/restaurant/src/app/app.css index d651b17a7..e42933eb1 100644 --- a/samples/client/angular/projects/restaurant/src/app/app.css +++ b/samples/client/angular/projects/restaurant/src/app/app.css @@ -225,3 +225,51 @@ form { rotate: 360deg; } } + +.streaming-indicator { + display: flex; + flex-direction: column; + gap: 8px; + padding: 16px; + background: light-dark(var(--n-95), var(--n-20)); + border-radius: 8px; + margin-bottom: 16px; + animation: fadeIn 0.5s ease-out; + + & span { + font-size: 14px; + color: light-dark(var(--n-40), var(--n-70)); + font-weight: 500; + } +} + +.progress-bar { + width: 100%; + height: 4px; + background-color: light-dark(var(--n-90), var(--n-30)); + border-radius: 2px; + overflow: hidden; + position: relative; + + &::after { + content: ""; + position: absolute; + left: 0; + top: 0; + height: 100%; + width: 30%; + background-color: var(--p-60); + border-radius: 2px; + animation: loading-bar 2s infinite ease-in-out; + } +} + +@keyframes loading-bar { + 0% { + left: -30%; + } + + 100% { + left: 100%; + } +} \ No newline at end of file diff --git a/samples/client/angular/projects/restaurant/src/app/app.html b/samples/client/angular/projects/restaurant/src/app/app.html index 02a3a5ea3..bf3621512 100644 --- a/samples/client/angular/projects/restaurant/src/app/app.html +++ b/samples/client/angular/projects/restaurant/src/app/app.html @@ -14,12 +14,7 @@ limitations under the License. --> -@if (client.isLoading()) { -
-
-
{{loadingTextLines[loadingTextIndex()]}}
-
-} @else if (!hasData()) { +@if (!hasData()) {
Image of the restaurant @@ -41,10 +36,24 @@

Restaurant Finder

} @else {
- @let surfaces = processor.getSurfaces(); + @let surfaces = processor.surfacesSignal(); + + @if (client.isLoading() && surfaces.size === 0) { +
+
+
{{loadingTextLines[loadingTextIndex()]}}
+
+ } @else { + @if (client.isLoading()) { +
+
+ Finding the best spots for you... +
+ } - @for (entry of surfaces; track $index) { - + @for (entry of surfaces; track $index) { + + } }
} diff --git a/samples/client/angular/projects/restaurant/src/app/app.ts b/samples/client/angular/projects/restaurant/src/app/app.ts index 4af4d0fc5..4b19a4ffa 100644 --- a/samples/client/angular/projects/restaurant/src/app/app.ts +++ b/samples/client/angular/projects/restaurant/src/app/app.ts @@ -54,8 +54,8 @@ export class App { if (body) { this.startLoadingAnimation(); const message = body as Types.A2UIClientEventMessage; - await this.client.makeRequest(message); this.hasData.set(true); + await this.client.makeRequest(message); this.stopLoadingAnimation(); } } diff --git a/samples/client/angular/projects/restaurant/src/app/client.ts b/samples/client/angular/projects/restaurant/src/app/client.ts index d26186f83..6784f7286 100644 --- a/samples/client/angular/projects/restaurant/src/app/client.ts +++ b/samples/client/angular/projects/restaurant/src/app/client.ts @@ -36,49 +36,131 @@ export class Client { }); } - async makeRequest(request: Types.A2UIClientEventMessage | string) { - let messages: Types.ServerToClientMessage[]; - + async makeRequest(request: Types.A2UIClientEventMessage | string): Promise { + let messages: Types.ServerToClientMessage[] = []; try { this.isLoading.set(true); - const response = await this.send(request as Types.A2UIClientEventMessage); - messages = response; + // Clear surfaces at the start of a new request + this.processor.clearSurfaces(); + + const response = await fetch('/a2a', { + body: JSON.stringify(request as Types.A2UIClientEventMessage), + method: 'POST', + }); + + if (!response.ok) { + const error = (await response.json()) as { error: string }; + throw new Error(error.error); + } + + const contentType = response.headers.get('content-type'); + console.log(`[client] Received response with content-type: ${contentType}`); + if (contentType?.includes('text/event-stream')) { + await this.handleStreamingResponse(response, messages); + } else { + await this.handleNonStreamingResponse(response, messages); + } } catch (err) { console.error(err); throw err; } finally { this.isLoading.set(false); } - - this.processor.clearSurfaces(); - this.processor.processMessages(messages); return messages; } - private async send( - message: Types.A2UIClientEventMessage, - ): Promise { - const response = await fetch('/a2a', { - body: JSON.stringify(message), - method: 'POST', - }); + private async handleStreamingResponse( + response: Response, + messages: Types.ServerToClientMessage[] + ): Promise { + const reader = response.body?.getReader(); + if (!reader) { + throw new Error('No response body'); + } - if (response.ok) { - const data = (await response.json()) as A2AServerPayload; - const messages: Types.ServerToClientMessage[] = []; + const decoder = new TextDecoder(); + let buffer = ''; - if ('error' in data) { - throw new Error(data.error); - } else { - for (const item of data) { - if (item.kind === 'text') continue; - messages.push(item.data); + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + const now = performance.now(); + buffer += decoder.decode(value, { stream: true }); + + // Parse SSE events. The server sends "data: \n\n" + const lines = buffer.split('\n\n'); + buffer = lines.pop() || ''; + + for (const line of lines) { + if (line.startsWith('data: ')) { + const jsonStr = line.slice(6); + try { + const data = JSON.parse(jsonStr) as A2AServerPayload; + console.log(`[client] [${now.toFixed(2)}ms] Received SSE data:`, data); + + if ('error' in data) { + throw new Error(data.error); + } else { + console.log( + `[client] [${performance.now().toFixed(2)}ms] Scheduling processing for ${data.length} parts` + ); + // Use a microtask to ensure we don't block the stream reader + await Promise.resolve(); + const newMessages = this.processParts(data as any[]); + messages.push(...newMessages); + } + } catch (e) { + console.error('Error parsing SSE data:', e, jsonStr); + } + } + } + } + } + + private async handleNonStreamingResponse( + response: Response, + messages: Types.ServerToClientMessage[] + ): Promise { + const data = (await response.json()) as any[]; + console.log(`[client] Received JSON response:`, data); + const newMessages = this.processParts(data); + messages.push(...newMessages); + } + + private processParts(parts: any[]): Types.ServerToClientMessage[] { + const messages: Types.ServerToClientMessage[] = []; + for (const item of parts) { + if (item.data) { + messages.push(item.data); + } else if (item.kind === 'text' || item.text) { + const text = item.text || ''; + const match = text.match(/(.*?)<\/a2ui-json>/s); + if (match) { + try { + const parsed = JSON.parse(match[1]); + const commands = Array.isArray(parsed) ? parsed : [parsed]; + for (const cmd of commands) { + if (this.isValidA2uiCommand(cmd)) { + messages.push(cmd); + } else { + console.warn('[client] Ignored invalid A2UI command from text:', cmd); + } + } + } catch (e) { + console.error('Failed to parse a2ui-json from text:', e); + } } } - return messages; } + if (messages.length > 0) { + console.log(`[client] Processing ${messages.length} A2UI commands:`, messages); + this.processor.processMessages(messages); + } + return messages; + } - const error = (await response.json()) as { error: string }; - throw new Error(error.error); + private isValidA2uiCommand(cmd: any): boolean { + return !!(cmd.surfaceUpdate || cmd.dataModelUpdate || cmd.beginRendering || cmd.deleteSurface); } } diff --git a/samples/client/angular/projects/restaurant/src/server.ts b/samples/client/angular/projects/restaurant/src/server.ts index e22f08e33..86f92730a 100644 --- a/samples/client/angular/projects/restaurant/src/server.ts +++ b/samples/client/angular/projects/restaurant/src/server.ts @@ -30,6 +30,7 @@ const browserDistFolder = join(import.meta.dirname, '../browser'); const app = express(); const angularApp = new AngularNodeAppEngine(); let client: A2AClient | null = null; +const enableStreaming = process.env['ENABLE_STREAMING'] === 'true'; app.use( express.static(browserDistFolder, { @@ -80,21 +81,69 @@ app.post('/a2a', (req, res) => { } const client = await createOrGetClient(); - const response = await client.sendMessage(sendParams); - res.set('Cache-Control', 'no-store'); - - if ('error' in response) { - console.error('Error:', response.error.message); - res.status(500).json({ error: response.error.message }); - return; + try { + if (enableStreaming) { + await handleStreamingResponse(client, sendParams, res); + } else { + await handleNonStreamingResponse(client, sendParams, res); + } + } catch (error: any) { + console.error('Request error:', error.message); + if (!res.headersSent) { + res.status(500).json({ error: error.message }); + } else if (!res.writableEnded) { + res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`); + res.end(); + } } - - const result = (response as SendMessageSuccessResponse).result as Task; - res.json(result.kind === 'task' ? result.status.message?.parts || [] : []); }); }); +async function handleStreamingResponse(client: A2AClient, sendParams: MessageSendParams, res: express.Response) { + process.stdout.write('[server] Streaming mode enabled\n'); + const stream = client.sendMessageStream(sendParams); + + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Connection', 'keep-alive'); + res.setHeader('X-Accel-Buffering', 'no'); // Disable buffering + res.status(200); + + for await (const event of stream) { + console.log(`[server] Received event from agent: ${event.kind}`); + let parts: Part[] = []; + if (event.kind === 'task' || event.kind === 'status-update') { + parts = event.status.message?.parts || []; + } else if (event.kind === 'artifact-update') { + parts = event.artifact.parts || []; + } + + if (parts.length > 0) { + console.log(`[server] Streaming ${parts.length} parts to client`); + console.log(`[server] Streaming parts: ${JSON.stringify(parts)}`); + res.write(`data: ${JSON.stringify(parts)}\n\n`); + } + } + res.end(); + console.log('[server] Stream finished'); +} + +async function handleNonStreamingResponse(client: A2AClient, sendParams: MessageSendParams, res: express.Response) { + process.stdout.write('[server] Streaming mode disabled\n'); + const response = await client.sendMessage(sendParams); + res.set('Cache-Control', 'no-store'); + + if ('error' in response) { + console.error('Error:', response.error.message); + res.status(500).json({ error: response.error.message }); + return; + } + + const result = (response as SendMessageSuccessResponse).result as Task; + res.json(result.kind === 'task' ? result.status.message?.parts || [] : []); +} + app.use((req, res, next) => { angularApp .handle(req)