-
Notifications
You must be signed in to change notification settings - Fork 289
fix(client): surface real HTTP status & message for streaming errors; avoid SSE parse on non-2xx #505
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
fix(client): surface real HTTP status & message for streaming errors; avoid SSE parse on non-2xx #505
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,7 @@ | ||
| """Custom exceptions for the A2A client.""" | ||
|
|
||
| from collections.abc import Mapping | ||
|
|
||
| from a2a.types import JSONRPCErrorResponse | ||
|
|
||
|
|
||
|
|
@@ -10,16 +12,27 @@ class A2AClientError(Exception): | |
| class A2AClientHTTPError(A2AClientError): | ||
| """Client exception for HTTP errors received from the server.""" | ||
|
|
||
| def __init__(self, status_code: int, message: str): | ||
| def __init__( | ||
| self, | ||
| status: int, | ||
| message: str, | ||
| body: str | None = None, | ||
| headers: Mapping[str, str] | None = None, | ||
| ): | ||
| """Initializes the A2AClientHTTPError. | ||
|
|
||
| Args: | ||
| status_code: The HTTP status code of the response. | ||
| status: The HTTP status code of the response. | ||
| message: A descriptive error message. | ||
| body: The raw response body, if available. | ||
| headers: The HTTP response headers. | ||
| """ | ||
| self.status_code = status_code | ||
| self.status = status | ||
| self.status_code = status | ||
| self.message = message | ||
| super().__init__(f'HTTP Error {status_code}: {message}') | ||
| self.body = body | ||
| self.headers = dict(headers or {}) | ||
| super().__init__(f'HTTP {status} - {message}') | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's not switch the structure of the message here. Some implementations may already depend on it. |
||
|
|
||
|
|
||
| class A2AClientJSONError(A2AClientError): | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,116 @@ | ||||||||||||||||||
| """Shared helpers for handling streaming HTTP responses.""" | ||||||||||||||||||
|
|
||||||||||||||||||
| from __future__ import annotations | ||||||||||||||||||
|
|
||||||||||||||||||
| import json | ||||||||||||||||||
|
|
||||||||||||||||||
| from typing import Any | ||||||||||||||||||
|
|
||||||||||||||||||
| import httpx # noqa: TC002 | ||||||||||||||||||
|
|
||||||||||||||||||
| from httpx_sse import EventSource # noqa: TC002 | ||||||||||||||||||
|
|
||||||||||||||||||
| from a2a.client.errors import A2AClientHTTPError | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
||||||||||||||||||
| SUCCESS_STATUS_MIN = 200 | ||||||||||||||||||
| SUCCESS_STATUS_MAX = 300 | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
||||||||||||||||||
| async def ensure_streaming_response(event_source: EventSource) -> None: | ||||||||||||||||||
| """Validate the initial streaming response before attempting SSE parsing.""" | ||||||||||||||||||
| response = event_source.response | ||||||||||||||||||
| if not SUCCESS_STATUS_MIN <= response.status_code < SUCCESS_STATUS_MAX: | ||||||||||||||||||
| error = await _build_http_error(response) | ||||||||||||||||||
| raise error | ||||||||||||||||||
|
|
||||||||||||||||||
| if not _has_event_stream_content_type(response): | ||||||||||||||||||
| error = await _build_content_type_error(response) | ||||||||||||||||||
| raise error | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
||||||||||||||||||
| async def _build_http_error(response: httpx.Response) -> A2AClientHTTPError: | ||||||||||||||||||
| body_text = await _read_body(response) | ||||||||||||||||||
| json_payload: Any | None | ||||||||||||||||||
| try: | ||||||||||||||||||
| json_payload = response.json() | ||||||||||||||||||
| except (json.JSONDecodeError, ValueError): | ||||||||||||||||||
| json_payload = None | ||||||||||||||||||
|
Comment on lines
+35
to
+38
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The response body is already read into the
Suggested change
|
||||||||||||||||||
|
|
||||||||||||||||||
| message = _extract_message(response, json_payload, body_text) | ||||||||||||||||||
| return A2AClientHTTPError( | ||||||||||||||||||
| response.status_code, | ||||||||||||||||||
| message, | ||||||||||||||||||
| body=body_text, | ||||||||||||||||||
| headers=dict(response.headers), | ||||||||||||||||||
| ) | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
||||||||||||||||||
| async def _build_content_type_error( | ||||||||||||||||||
| response: httpx.Response, | ||||||||||||||||||
| ) -> A2AClientHTTPError: | ||||||||||||||||||
| body_text = await _read_body(response) | ||||||||||||||||||
| content_type = response.headers.get('content-type', None) | ||||||||||||||||||
| descriptor = content_type or 'missing' | ||||||||||||||||||
| message = f'Unexpected Content-Type {descriptor!r} for streaming response' | ||||||||||||||||||
| return A2AClientHTTPError( | ||||||||||||||||||
| response.status_code, | ||||||||||||||||||
| message, | ||||||||||||||||||
| body=body_text, | ||||||||||||||||||
| headers=dict(response.headers), | ||||||||||||||||||
| ) | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
||||||||||||||||||
| async def _read_body(response: httpx.Response) -> str | None: | ||||||||||||||||||
| await response.aread() | ||||||||||||||||||
| text = response.text | ||||||||||||||||||
| return text if text else None | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
||||||||||||||||||
| def _extract_message( | ||||||||||||||||||
| response: httpx.Response, | ||||||||||||||||||
| json_payload: Any | None, | ||||||||||||||||||
| body_text: str | None, | ||||||||||||||||||
| ) -> str: | ||||||||||||||||||
| message: str | None = None | ||||||||||||||||||
| if isinstance(json_payload, dict): | ||||||||||||||||||
| title = _coerce_str(json_payload.get('title')) | ||||||||||||||||||
| detail = _coerce_str(json_payload.get('detail')) | ||||||||||||||||||
| if title and detail: | ||||||||||||||||||
| message = f'{title}: {detail}' | ||||||||||||||||||
| else: | ||||||||||||||||||
| for key in ('message', 'detail', 'error', 'title'): | ||||||||||||||||||
| value = _coerce_str(json_payload.get(key)) | ||||||||||||||||||
| if value: | ||||||||||||||||||
| message = value | ||||||||||||||||||
| break | ||||||||||||||||||
| elif isinstance(json_payload, list): | ||||||||||||||||||
| # Some APIs return a list of error descriptions—prefer the first string entry. | ||||||||||||||||||
| for item in json_payload: | ||||||||||||||||||
| value = _coerce_str(item) | ||||||||||||||||||
| if value: | ||||||||||||||||||
| message = value | ||||||||||||||||||
| break | ||||||||||||||||||
|
|
||||||||||||||||||
| if not message and body_text: | ||||||||||||||||||
| stripped = body_text.strip() | ||||||||||||||||||
| if stripped: | ||||||||||||||||||
| message = stripped | ||||||||||||||||||
|
|
||||||||||||||||||
| if not message: | ||||||||||||||||||
| reason = getattr(response, 'reason_phrase', '') or '' | ||||||||||||||||||
| message = reason or 'HTTP error' | ||||||||||||||||||
|
|
||||||||||||||||||
| return message | ||||||||||||||||||
|
Check failure on line 104 in src/a2a/client/transports/_streaming_utils.py
|
||||||||||||||||||
|
|
||||||||||||||||||
|
|
||||||||||||||||||
| def _coerce_str(value: Any) -> str | None: | ||||||||||||||||||
| if isinstance(value, str): | ||||||||||||||||||
| stripped = value.strip() | ||||||||||||||||||
| return stripped or None | ||||||||||||||||||
| return None | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
||||||||||||||||||
| def _has_event_stream_content_type(response: httpx.Response) -> bool: | ||||||||||||||||||
| content_type = response.headers.get('content-type', '') | ||||||||||||||||||
| return 'text/event-stream' in content_type.lower() | ||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,29 +12,30 @@ | |
| from a2a.client.card_resolver import A2ACardResolver | ||
| from a2a.client.errors import A2AClientHTTPError, A2AClientJSONError | ||
| from a2a.client.middleware import ClientCallContext, ClientCallInterceptor | ||
| from a2a.client.transports._streaming_utils import ensure_streaming_response | ||
| from a2a.client.transports.base import ClientTransport | ||
| from a2a.grpc import a2a_pb2 | ||
| from a2a.types import ( | ||
| AgentCard, | ||
| GetTaskPushNotificationConfigParams, | ||
| Message, | ||
| MessageSendParams, | ||
| Task, | ||
| TaskArtifactUpdateEvent, | ||
| TaskIdParams, | ||
| TaskPushNotificationConfig, | ||
| TaskQueryParams, | ||
| TaskStatusUpdateEvent, | ||
| ) | ||
| from a2a.utils import proto_utils | ||
| from a2a.utils.telemetry import SpanKind, trace_class | ||
|
|
||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| @trace_class(kind=SpanKind.CLIENT) | ||
| class RestTransport(ClientTransport): | ||
| """A REST transport for the A2A client.""" | ||
|
|
||
| def __init__( | ||
|
|
@@ -139,23 +140,28 @@ | |
| json=payload, | ||
| **modified_kwargs, | ||
| ) as event_source: | ||
| http_response = event_source.response | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: no need to declare a new variable for a single access |
||
| try: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The best practise here seems to be using I think this would be cleanest solution, could you implement these changes here and for JSON-RPC? |
||
| await ensure_streaming_response(event_source) | ||
| async for sse in event_source.aiter_sse(): | ||
| event = a2a_pb2.StreamResponse() | ||
| Parse(sse.data, event) | ||
| yield proto_utils.FromProto.stream_response(event) | ||
| except SSEError as e: | ||
| raise A2AClientHTTPError( | ||
| 400, f'Invalid SSE response or protocol error: {e}' | ||
| http_response.status_code, | ||
| f'Invalid SSE response or protocol error: {e}', | ||
| headers=dict(http_response.headers), | ||
| ) from e | ||
| except json.JSONDecodeError as e: | ||
| raise A2AClientJSONError(str(e)) from e | ||
| except httpx.RequestError as e: | ||
| raise A2AClientHTTPError( | ||
| 503, f'Network communication error: {e}' | ||
| 503, | ||
| f'Network communication error: {e}', | ||
| ) from e | ||
|
|
||
| async def _send_request(self, request: httpx.Request) -> dict[str, Any]: | ||
| try: | ||
| response = await self.httpx_client.send(request) | ||
| response.raise_for_status() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency and to avoid redundancy, it would be clearer to use a single attribute for the status code. The
httpxlibrary usesstatus_code, which is a common convention. Using bothstatusandstatus_codefor the same value can be confusing. I suggest standardizing onstatus_code.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1