Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion src/mcp/shared/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,20 @@ async def cancel(self) -> None:
response=ErrorData(code=0, message="Request cancelled", data=None),
)

async def mark_cancelled_without_response(self) -> None:
"""Cancel this request and mark it as completed without sending a response.

This is used when cancellation is initiated by a cancellation notification,
where the receiver SHOULD NOT send a response per the MCP spec.
"""
if not self._entered:
raise RuntimeError("RequestResponder must be used as a context manager")
if not self._cancel_scope:
raise RuntimeError("No active cancel scope")

self._cancel_scope.cancel()
self._completed = True

@property
def in_flight(self) -> bool:
return not self._completed and not self.cancelled
Expand Down Expand Up @@ -314,6 +328,24 @@ async def send_notification(
)
await self._write_stream.send(session_message)

# If we are emitting a cancellation notification for a request that we
# originally sent, proactively cancel the local waiter so callers of
# send_request() are unblocked without relying on a peer response.
try:
from mcp.types import CancelledNotification as _CancelledNotification # local import to avoid cycle

root = getattr(notification, "root", None)
if isinstance(root, _CancelledNotification):
cancelled_id = root.params.requestId
stream = self._response_streams.pop(cancelled_id, None)
if stream is not None:
error = ErrorData(code=0, message="Request cancelled", data=None)
await stream.send(JSONRPCError(jsonrpc="2.0", id=cancelled_id, error=error))
await stream.aclose()
except Exception:
# Never let local cancellation propagation break notification sending
pass

async def _send_response(self, request_id: RequestId, response: SendResultT | ErrorData) -> None:
if isinstance(response, ErrorData):
jsonrpc_error = JSONRPCError(jsonrpc="2.0", id=request_id, error=response)
Expand Down Expand Up @@ -383,7 +415,8 @@ async def _receive_loop(self) -> None:
if isinstance(notification.root, CancelledNotification):
cancelled_id = notification.root.params.requestId
if cancelled_id in self._in_flight:
await self._in_flight[cancelled_id].cancel()
# Silent cancellation in response to a cancellation notification
await self._in_flight[cancelled_id].mark_cancelled_without_response()
else:
# Handle progress notifications callback
if isinstance(notification.root, ProgressNotification):
Expand Down
Loading