Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/a2a/server/apps/jsonrpc/fastapi_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,38 +66,43 @@
(SSE).
"""

def __init__( # noqa: PLR0913
self,
agent_card: AgentCard,
http_handler: RequestHandler,
extended_agent_card: AgentCard | None = None,
context_builder: CallContextBuilder | None = None,
card_modifier: Callable[[AgentCard], AgentCard] | None = None,
extended_card_modifier: Callable[
[AgentCard, ServerCallContext], AgentCard
]
| None = None,
max_content_length: int | None = 10 * 1024 * 1024, # 10MB
stream_send_timeout: float | None = None,
) -> None:
"""Initializes the A2AFastAPIApplication.

Args:
agent_card: The AgentCard describing the agent's capabilities.
http_handler: The handler instance responsible for processing A2A
requests via http.
extended_agent_card: An optional, distinct AgentCard to be served
at the authenticated extended card endpoint.
context_builder: The CallContextBuilder used to construct the
ServerCallContext passed to the http_handler. If None, no
ServerCallContext is passed.
card_modifier: An optional callback to dynamically modify the public
agent card before it is served.
extended_card_modifier: An optional callback to dynamically modify
the extended agent card before it is served. It receives the
call context.
max_content_length: The maximum allowed content length for incoming
requests. Defaults to 10MB. Set to None for unbounded maximum.
stream_send_timeout: The timeout in seconds for sending events in
streaming responses. Defaults to `None`, which disables the timeout.
This changes the default behavior from using Starlette's 5-second
default. Set a float value to specify a timeout.
"""

Check notice on line 105 in src/a2a/server/apps/jsonrpc/fastapi_app.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/server/apps/rest/rest_adapter.py (55-87)
if not _package_fastapi_installed:
raise ImportError(
'The `fastapi` package is required to use the `A2AFastAPIApplication`.'
Expand All @@ -112,6 +117,7 @@
card_modifier=card_modifier,
extended_card_modifier=extended_card_modifier,
max_content_length=max_content_length,
stream_send_timeout=stream_send_timeout,
)

def add_routes_to_app(
Expand Down
14 changes: 13 additions & 1 deletion src/a2a/server/apps/jsonrpc/jsonrpc_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,38 +172,43 @@
for model in A2ARequestModel.__args__
}

def __init__( # noqa: PLR0913
self,
agent_card: AgentCard,
http_handler: RequestHandler,
extended_agent_card: AgentCard | None = None,
context_builder: CallContextBuilder | None = None,
card_modifier: Callable[[AgentCard], AgentCard] | None = None,
extended_card_modifier: Callable[
[AgentCard, ServerCallContext], AgentCard
]
| None = None,
max_content_length: int | None = 10 * 1024 * 1024, # 10MB
stream_send_timeout: float | None = None,
) -> None:
"""Initializes the JSONRPCApplication.

Args:
agent_card: The AgentCard describing the agent's capabilities.
http_handler: The handler instance responsible for processing A2A
requests via http.
extended_agent_card: An optional, distinct AgentCard to be served
at the authenticated extended card endpoint.
context_builder: The CallContextBuilder used to construct the
ServerCallContext passed to the http_handler. If None, no
ServerCallContext is passed.
card_modifier: An optional callback to dynamically modify the public
agent card before it is served.
extended_card_modifier: An optional callback to dynamically modify
the extended agent card before it is served. It receives the
call context.
max_content_length: The maximum allowed content length for incoming
requests. Defaults to 10MB. Set to None for unbounded maximum.
stream_send_timeout: The timeout in seconds for sending events in
streaming responses. Defaults to `None`, which disables the timeout.
This changes the default behavior from using Starlette's 5-second
default. Set a float value to specify a timeout.
"""

Check notice on line 211 in src/a2a/server/apps/jsonrpc/jsonrpc_app.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/server/apps/rest/rest_adapter.py (55-87)
if not _package_starlette_installed:
raise ImportError(
'Packages `starlette` and `sse-starlette` are required to use the'
Expand All @@ -222,6 +227,7 @@
)
self._context_builder = context_builder or DefaultCallContextBuilder()
self._max_content_length = max_content_length
self.stream_send_timeout = stream_send_timeout

def _generate_error_response(
self, request_id: str | int | None, error: JSONRPCError | A2AError
Expand Down Expand Up @@ -540,8 +546,14 @@
async for item in stream:
yield {'data': item.root.model_dump_json(exclude_none=True)}

send_timeout = context.state.get(
'stream_send_timeout', self.stream_send_timeout
)

return EventSourceResponse(
event_generator(handler_result), headers=headers
event_generator(handler_result),
headers=headers,
send_timeout=send_timeout,
)
if isinstance(handler_result, JSONRPCErrorResponse):
return JSONResponse(
Expand Down
6 changes: 6 additions & 0 deletions src/a2a/server/apps/jsonrpc/starlette_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,19 @@
(SSE).
"""

def __init__( # noqa: PLR0913
self,
agent_card: AgentCard,
http_handler: RequestHandler,
extended_agent_card: AgentCard | None = None,
context_builder: CallContextBuilder | None = None,
card_modifier: Callable[[AgentCard], AgentCard] | None = None,
extended_card_modifier: Callable[
[AgentCard, ServerCallContext], AgentCard
]
| None = None,
max_content_length: int | None = 10 * 1024 * 1024, # 10MB

Check notice on line 62 in src/a2a/server/apps/jsonrpc/starlette_app.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/server/apps/rest/rest_adapter.py (55-66)
stream_send_timeout: float | None = None,
) -> None:
"""Initializes the A2AStarletteApplication.

Expand All @@ -79,6 +80,10 @@
call context.
max_content_length: The maximum allowed content length for incoming
requests. Defaults to 10MB. Set to None for unbounded maximum.
stream_send_timeout: The timeout in seconds for sending events in
streaming responses. Defaults to `None`, which disables the timeout.
This changes the default behavior from using Starlette's 5-second
default. Set a float value to specify a timeout.
"""
if not _package_starlette_installed:
raise ImportError(
Expand All @@ -94,6 +99,7 @@
card_modifier=card_modifier,
extended_card_modifier=extended_card_modifier,
max_content_length=max_content_length,
stream_send_timeout=stream_send_timeout,
)

def routes(
Expand Down
17 changes: 17 additions & 0 deletions tests/server/apps/jsonrpc/test_fastapi_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,23 @@ def test_create_a2a_fastapi_app_with_missing_deps_raises_importerror(
):
_app = A2AFastAPIApplication(**mock_app_params)

def test_stream_send_timeout_parameter(self, mock_app_params: dict):
try:
app_default = A2AFastAPIApplication(**mock_app_params)
assert app_default.stream_send_timeout is None

app_custom = A2AFastAPIApplication(
**mock_app_params, stream_send_timeout=30.0
)
assert app_custom.stream_send_timeout == 30.0

app_none = A2AFastAPIApplication(
**mock_app_params, stream_send_timeout=None
)
assert app_none.stream_send_timeout is None
except ImportError:
pytest.skip('FastAPI dependencies not available')


if __name__ == '__main__':
pytest.main([__file__])
186 changes: 186 additions & 0 deletions tests/server/apps/jsonrpc/test_jsonrpc_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,192 @@ def build(
):
_app = DummyJSONRPCApp(**mock_app_params)

@pytest.mark.asyncio
async def test_stream_send_timeout_applied_to_event_source_response(
self, mock_app_params: dict
):
"""Test that stream_send_timeout is correctly applied to EventSourceResponse."""
from unittest.mock import patch

class DummyJSONRPCApp(JSONRPCApplication):
def __init__(self, **kwargs):
# Skip parent __init__ to avoid package checks
self.stream_send_timeout = kwargs.get(
'stream_send_timeout', None
)
self.agent_card = kwargs.get('agent_card')
self.extended_agent_card = kwargs.get('extended_agent_card')
self.card_modifier = kwargs.get('card_modifier')
self.extended_card_modifier = kwargs.get(
'extended_card_modifier'
)
self.max_content_length = kwargs.get(
'max_content_length', 10 * 1024 * 1024
)
self.handler = kwargs.get('http_handler')

def build(self, **kwargs):
return object()

# Test with app-level timeout
app = DummyJSONRPCApp(stream_send_timeout=30.0, **mock_app_params)

# Mock context
context = MagicMock(spec=ServerCallContext)
context.state = {}
context.activated_extensions = None

# Mock streaming handler result
async def mock_generator():
yield SendMessageResponse(
root=SendMessageSuccessResponse(
message=Message(
message_id='1',
role=Role.assistant,
parts=[Part(TextPart(text='test'))],
)
)
)

handler_result = mock_generator()

with patch(
'a2a.server.apps.jsonrpc.jsonrpc_app.EventSourceResponse'
) as mock_esr:
mock_esr.return_value = MagicMock()

# Call the method
response = app._create_response(context, handler_result)

# Assert EventSourceResponse was called with correct timeout
mock_esr.assert_called_once()
call_args = mock_esr.call_args
assert call_args[1]['send_timeout'] == 30.0

@pytest.mark.asyncio
async def test_stream_send_timeout_none_disables_timeout(
self, mock_app_params: dict
):
"""Test that stream_send_timeout=None disables the timeout."""
from unittest.mock import patch

class DummyJSONRPCApp(JSONRPCApplication):
def __init__(self, **kwargs):
# Skip parent __init__ to avoid package checks
self.stream_send_timeout = kwargs.get(
'stream_send_timeout', None
)
self.agent_card = kwargs.get('agent_card')
self.extended_agent_card = kwargs.get('extended_agent_card')
self.card_modifier = kwargs.get('card_modifier')
self.extended_card_modifier = kwargs.get(
'extended_card_modifier'
)
self.max_content_length = kwargs.get(
'max_content_length', 10 * 1024 * 1024
)
self.handler = kwargs.get('http_handler')

def build(self, **kwargs):
return object()

# Test with None timeout (default)
app = DummyJSONRPCApp(stream_send_timeout=None, **mock_app_params)

# Mock context
context = MagicMock(spec=ServerCallContext)
context.state = {}
context.activated_extensions = None

# Mock streaming handler result
async def mock_generator():
yield SendMessageResponse(
root=SendMessageSuccessResponse(
message=Message(
message_id='1',
role=Role.assistant,
parts=[Part(TextPart(text='test'))],
)
)
)

handler_result = mock_generator()

with patch(
'a2a.server.apps.jsonrpc.jsonrpc_app.EventSourceResponse'
) as mock_esr:
mock_esr.return_value = MagicMock()

# Call the method
response = app._create_response(context, handler_result)

# Assert EventSourceResponse was called with None (disabled timeout)
mock_esr.assert_called_once()
call_args = mock_esr.call_args
assert call_args[1]['send_timeout'] is None

@pytest.mark.asyncio
async def test_stream_send_timeout_context_override(
self, mock_app_params: dict
):
"""Test that context.state can override the app-level stream_send_timeout."""
from unittest.mock import patch

class DummyJSONRPCApp(JSONRPCApplication):
def __init__(self, **kwargs):
# Skip parent __init__ to avoid package checks
self.stream_send_timeout = kwargs.get(
'stream_send_timeout', None
)
self.agent_card = kwargs.get('agent_card')
self.extended_agent_card = kwargs.get('extended_agent_card')
self.card_modifier = kwargs.get('card_modifier')
self.extended_card_modifier = kwargs.get(
'extended_card_modifier'
)
self.max_content_length = kwargs.get(
'max_content_length', 10 * 1024 * 1024
)
self.handler = kwargs.get('http_handler')

def build(self, **kwargs):
return object()

# Test with app-level timeout
app = DummyJSONRPCApp(stream_send_timeout=30.0, **mock_app_params)

# Mock context with override
context = MagicMock(spec=ServerCallContext)
context.state = {'stream_send_timeout': 60.0}
context.activated_extensions = None

# Mock streaming handler result
async def mock_generator():
yield SendMessageResponse(
root=SendMessageSuccessResponse(
message=Message(
message_id='1',
role=Role.assistant,
parts=[Part(TextPart(text='test'))],
)
)
)

handler_result = mock_generator()

with patch(
'a2a.server.apps.jsonrpc.jsonrpc_app.EventSourceResponse'
) as mock_esr:
mock_esr.return_value = MagicMock()

# Call the method
response = app._create_response(context, handler_result)

# Assert EventSourceResponse was called with context override
mock_esr.assert_called_once()
call_args = mock_esr.call_args
assert call_args[1]['send_timeout'] == 60.0


class TestJSONRPCExtensions:
@pytest.fixture
Expand Down
17 changes: 17 additions & 0 deletions tests/server/apps/jsonrpc/test_starlette_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,23 @@ def test_create_a2a_starlette_app_with_missing_deps_raises_importerror(
):
_app = A2AStarletteApplication(**mock_app_params)

def test_stream_send_timeout_parameter(self, mock_app_params: dict):
try:
app_default = A2AStarletteApplication(**mock_app_params)
assert app_default.stream_send_timeout is None

app_custom = A2AStarletteApplication(
**mock_app_params, stream_send_timeout=30.0
)
assert app_custom.stream_send_timeout == 30.0

app_none = A2AStarletteApplication(
**mock_app_params, stream_send_timeout=None
)
assert app_none.stream_send_timeout is None
except ImportError:
pytest.skip('Starlette dependencies not available')


if __name__ == '__main__':
pytest.main([__file__])
Loading