Skip to content

Commit 93bf8cf

Browse files
Dynamic Refresh of MCP servers and Tool Health Status (#1507)
* health check status update with events Signed-off-by: Keval Mahajan <[email protected]> * event service Signed-off-by: Keval Mahajan <[email protected]> * fix events notifications Signed-off-by: Keval Mahajan <[email protected]> * use centralized event service Signed-off-by: Keval Mahajan <[email protected]> * updated docstrings and doctests Signed-off-by: Keval Mahajan <[email protected]> * added comments Signed-off-by: Keval Mahajan <[email protected]> * linting Signed-off-by: Keval Mahajan <[email protected]> * minor update Signed-off-by: Keval Mahajan <[email protected]> * using centralized event service for prompts Signed-off-by: Keval Mahajan <[email protected]> * event service updated Signed-off-by: Keval Mahajan <[email protected]> * minor change Signed-off-by: Keval Mahajan <[email protected]> * linting Signed-off-by: Keval Mahajan <[email protected]> * updated event generator Signed-off-by: Keval Mahajan <[email protected]> * resolved pylint issues Signed-off-by: Keval Mahajan <[email protected]> * update test cases Signed-off-by: Keval Mahajan <[email protected]> * update test case Signed-off-by: Keval Mahajan <[email protected]> * fix tests Signed-off-by: Keval Mahajan <[email protected]> * fix doctests Signed-off-by: Keval Mahajan <[email protected]> * docstring coverage Signed-off-by: Keval Mahajan <[email protected]> * web linting Signed-off-by: Keval Mahajan <[email protected]> * minor flake8 fix Signed-off-by: Keval Mahajan <[email protected]> * Linting Signed-off-by: Mihai Criveti <[email protected]> * fix: update test to use EventService subscriber list The test was setting tool_service._event_subscribers but ToolService now delegates event publishing to _event_service. Updated to set _event_service._event_subscribers and force local mode. Signed-off-by: Mihai Criveti <[email protected]> --------- Signed-off-by: Keval Mahajan <[email protected]> Signed-off-by: Mihai Criveti <[email protected]> Co-authored-by: Mihai Criveti <[email protected]>
1 parent ecf9eda commit 93bf8cf

28 files changed

+1911
-419
lines changed

docs/docs/manage/supported-databases.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ mysql -u mysql -p mcp < mcp_backup.sql
142142
```bash
143143
# Check MariaDB service status
144144
sudo systemctl status mariadb
145-
145+
146146
# Verify port is open
147147
netstat -tlnp | grep 3306
148148
```
@@ -172,4 +172,4 @@ mysql -u mysql -p mcp < mcp_backup.sql
172172
- [Configuration Reference](configuration.md) - Complete database configuration options
173173
- [Docker Compose Deployment](../deployment/compose.md) - MariaDB container setup
174174
- [Kubernetes Deployment](../deployment/kubernetes.md) - MariaDB in Kubernetes
175-
- [Performance Tuning](tuning.md) - Database optimization guidelines
175+
- [Performance Tuning](tuning.md) - Database optimization guidelines

mcpgateway/admin.py

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
"""
1919

2020
# Standard
21+
import asyncio
2122
from collections import defaultdict
2223
import csv
2324
from datetime import datetime, timedelta, timezone
@@ -9773,6 +9774,244 @@ async def admin_test_gateway(request: GatewayTestRequest, team_id: Optional[str]
97739774
return GatewayTestResponse(status_code=502, latency_ms=latency_ms, body={"error": "Request failed", "details": str(e)})
97749775

97759776

9777+
# Event Streaming via SSE to the Admin UI
9778+
@admin_router.get("/events")
9779+
async def admin_events(request: Request, _user=Depends(get_current_user_with_permissions)):
9780+
"""
9781+
Stream admin events from all services via SSE (Server-Sent Events).
9782+
9783+
This endpoint establishes a persistent connection to stream real-time updates
9784+
from the gateway service and tool service to the frontend. It aggregates
9785+
multiple event streams into a single asyncio queue for unified delivery.
9786+
9787+
Args:
9788+
request (Request): The FastAPI request object, used to detect client disconnection.
9789+
_user (Any): Authenticated user dependency (ensures admin permissions).
9790+
9791+
Returns:
9792+
StreamingResponse: An async generator yielding SSE-formatted strings
9793+
(media_type="text/event-stream").
9794+
9795+
Examples:
9796+
>>> import asyncio
9797+
>>> from unittest.mock import AsyncMock, MagicMock, patch
9798+
>>> from fastapi import Request
9799+
>>>
9800+
>>> # Mock the request to simulate connection status
9801+
>>> mock_request = MagicMock(spec=Request)
9802+
>>> # Return False (connected) twice, then True (disconnected) to exit the loop
9803+
>>> mock_request.is_disconnected = AsyncMock(side_effect=[False, False, True])
9804+
>>>
9805+
>>> # Define a mock event generator for services
9806+
>>> async def mock_service_stream(service_name):
9807+
... yield {"type": "update", "data": {"service": service_name, "status": "active"}}
9808+
>>>
9809+
>>> async def test_streaming_endpoint():
9810+
... # Patch the global services used inside the function
9811+
... # Note: Adjust the patch path 'mcpgateway.admin' to your actual module path
9812+
... with patch('mcpgateway.admin.gateway_service') as mock_gw_service, patch('mcpgateway.admin.tool_service') as mock_tool_service:
9813+
...
9814+
... # Setup mocks to return our async generator
9815+
... mock_gw_service.subscribe_events.side_effect = lambda: mock_service_stream("gateway")
9816+
... mock_tool_service.subscribe_events.side_effect = lambda: mock_service_stream("tool")
9817+
...
9818+
... # Call the endpoint
9819+
... response = await admin_events(mock_request, _user="admin_user")
9820+
...
9821+
... # Consume the StreamingResponse body iterator
9822+
... results = []
9823+
... async for chunk in response.body_iterator:
9824+
... results.append(chunk)
9825+
...
9826+
... return results
9827+
>>>
9828+
>>> # Run the test
9829+
>>> events = asyncio.run(test_streaming_endpoint())
9830+
>>>
9831+
>>> # Verify SSE formatting
9832+
>>> first_event = events[0]
9833+
>>> assert "event: update" in first_event
9834+
>>> assert "data:" in first_event
9835+
>>> assert "gateway" in first_event or "tool" in first_event
9836+
>>> print("SSE Stream Test Passed")
9837+
SSE Stream Test Passed
9838+
"""
9839+
# Create a shared queue to aggregate events from all services
9840+
event_queue = asyncio.Queue()
9841+
9842+
# Define a generic producer that feeds a specific stream into the queue
9843+
async def stream_to_queue(generator, source_name: str):
9844+
"""Consume events from an async generator and forward them to a queue.
9845+
9846+
This coroutine iterates over an asynchronous generator and enqueues each
9847+
yielded event into a global or external `event_queue`. It gracefully
9848+
handles task cancellation and logs unexpected exceptions.
9849+
9850+
Args:
9851+
generator (AsyncGenerator): An asynchronous generator that yields events.
9852+
source_name (str): A human-readable label for the event source, used
9853+
for logging error messages.
9854+
9855+
Raises:
9856+
Exception: Any unexpected exception raised while iterating over the
9857+
generator will be caught, logged, and suppressed.
9858+
9859+
Doctest:
9860+
>>> import asyncio
9861+
>>> class FakeQueue:
9862+
... def __init__(self):
9863+
... self.items = []
9864+
... async def put(self, item):
9865+
... self.items.append(item)
9866+
...
9867+
>>> async def fake_gen():
9868+
... yield 1
9869+
... yield 2
9870+
... yield 3
9871+
...
9872+
>>> event_queue = FakeQueue() # monkey-patch the global name
9873+
>>> async def run_test():
9874+
... await stream_to_queue(fake_gen(), "test_source")
9875+
... return event_queue.items
9876+
...
9877+
>>> asyncio.run(run_test())
9878+
[1, 2, 3]
9879+
9880+
"""
9881+
try:
9882+
async for event in generator:
9883+
await event_queue.put(event)
9884+
except asyncio.CancelledError:
9885+
pass # Task cancelled normally
9886+
except Exception as e:
9887+
LOGGER.error(f"Error in {source_name} event subscription: {e}")
9888+
9889+
async def event_generator():
9890+
"""
9891+
Asynchronous Server-Sent Events (SSE) generator.
9892+
9893+
This coroutine listens to multiple background event streams (e.g., from
9894+
gateway and tool services), funnels their events into a shared queue, and
9895+
yields them to the client in proper SSE format.
9896+
9897+
The function:
9898+
- Spawns background tasks to consume events from subscribed services.
9899+
- Monitors the client connection for disconnection.
9900+
- Yields SSE-formatted messages as they arrive.
9901+
- Cleans up subscription tasks on exit.
9902+
9903+
The SSE format emitted:
9904+
event: <event_type>
9905+
data: <json-encoded data>
9906+
9907+
Yields:
9908+
AsyncGenerator[str, None]: A generator yielding SSE-formatted strings.
9909+
9910+
Raises:
9911+
asyncio.CancelledError: If the SSE stream or background tasks are cancelled.
9912+
Exception: Any unexpected exception in the main loop is logged but not re-raised.
9913+
9914+
Notes:
9915+
This function expects the following names to exist in the outer scope:
9916+
- `request`: A FastAPI/Starlette Request object.
9917+
- `event_queue`: An asyncio.Queue instance where events are dispatched.
9918+
- `gateway_service` and `tool_service`: Services exposing async subscribe_events().
9919+
- `stream_to_queue`: Coroutine to pipe service streams into the queue.
9920+
- `LOGGER`: Logger instance.
9921+
9922+
Example:
9923+
Basic doctest demonstrating SSE formatting from mock data:
9924+
9925+
>>> import json, asyncio
9926+
>>> class DummyRequest:
9927+
... async def is_disconnected(self):
9928+
... return False
9929+
>>> async def dummy_gen():
9930+
... # Simulate an event queue and minimal environment
9931+
... global request, event_queue
9932+
... request = DummyRequest()
9933+
... event_queue = asyncio.Queue()
9934+
... # Minimal stubs to satisfy references
9935+
... class DummyService:
9936+
... async def subscribe_events(self):
9937+
... async def gen():
9938+
... yield {"type": "test", "data": {"a": 1}}
9939+
... return gen()
9940+
... global gateway_service, tool_service, stream_to_queue, LOGGER
9941+
... gateway_service = tool_service = DummyService()
9942+
... async def stream_to_queue(gen, tag):
9943+
... async for e in gen:
9944+
... await event_queue.put(e)
9945+
... class DummyLogger:
9946+
... def debug(self, *args, **kwargs): pass
9947+
... def error(self, *args, **kwargs): pass
9948+
... LOGGER = DummyLogger()
9949+
...
9950+
... agen = event_generator()
9951+
... # Startup requires allowing tasks to enqueue
9952+
... async def get_one():
9953+
... async for msg in agen:
9954+
... return msg
9955+
... return (await get_one()).startswith("event: test")
9956+
>>> asyncio.run(dummy_gen())
9957+
True
9958+
"""
9959+
# Create background tasks for each service subscription
9960+
# This allows them to run concurrently
9961+
tasks = [asyncio.create_task(stream_to_queue(gateway_service.subscribe_events(), "gateway")), asyncio.create_task(stream_to_queue(tool_service.subscribe_events(), "tool"))]
9962+
9963+
try:
9964+
while True:
9965+
# Check for client disconnection
9966+
if await request.is_disconnected():
9967+
LOGGER.debug("SSE Client disconnected")
9968+
break
9969+
9970+
# Wait for the next event from EITHER service
9971+
# We use asyncio.wait_for to allow checking request.is_disconnected periodically
9972+
# or simply rely on queue.get() which is efficient.
9973+
try:
9974+
# Wait for an event
9975+
event = await event_queue.get()
9976+
9977+
# SSE format
9978+
event_type = event.get("type", "message")
9979+
event_data = json.dumps(event.get("data", {}))
9980+
9981+
yield f"event: {event_type}\ndata: {event_data}\n\n"
9982+
9983+
# Mark task as done in queue (good practice)
9984+
event_queue.task_done()
9985+
9986+
except asyncio.CancelledError:
9987+
LOGGER.debug("SSE Event generator task cancelled")
9988+
raise
9989+
9990+
except asyncio.CancelledError:
9991+
LOGGER.debug("SSE Stream cancelled")
9992+
except Exception as e:
9993+
LOGGER.error(f"SSE Stream error: {e}")
9994+
finally:
9995+
# Cleanup: Cancel all background subscription tasks
9996+
# This is crucial to close Redis connections/listeners in the EventService
9997+
for task in tasks:
9998+
task.cancel()
9999+
10000+
# Wait for tasks to clean up
10001+
await asyncio.gather(*tasks, return_exceptions=True)
10002+
LOGGER.debug("Background event subscription tasks cleaned up")
10003+
10004+
return StreamingResponse(
10005+
event_generator(),
10006+
media_type="text/event-stream",
10007+
headers={
10008+
"Cache-Control": "no-cache",
10009+
"Connection": "keep-alive",
10010+
"X-Accel-Buffering": "no",
10011+
},
10012+
)
10013+
10014+
977610015
####################
977710016
# Admin Tag Routes #
977810017
####################

mcpgateway/alembic/versions/191a2def08d7_resource_rename_template_to_uri_template.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# -*- coding: utf-8 -*-
12
"""resource_rename_template_to_uri_template
23
34
Revision ID: 191a2def08d7

mcpgateway/alembic/versions/z1a2b3c4d5e6_add_password_change_required.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# -*- coding: utf-8 -*-
12
"""Add password_change_required field to EmailUser
23
34
Revision ID: z1a2b3c4d5e6

mcpgateway/auth.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from mcpgateway.config import settings
2727
from mcpgateway.db import EmailUser, SessionLocal
2828
from mcpgateway.plugins.framework import get_plugin_manager, GlobalContext, HttpAuthResolveUserPayload, HttpHeaderPayload, HttpHookType, PluginViolationError
29-
from mcpgateway.services.team_management_service import TeamManagementService # pylint: disable=import-outside-toplevel
29+
from mcpgateway.services.team_management_service import TeamManagementService
3030
from mcpgateway.utils.verify_credentials import verify_jwt_token
3131

3232
# Security scheme

mcpgateway/main.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3095,21 +3095,20 @@ async def delete_resource(resource_id: str, db: Session = Depends(get_db), user=
30953095
raise HTTPException(status_code=400, detail=str(e))
30963096

30973097

3098-
@resource_router.post("/subscribe/{resource_id}")
3098+
@resource_router.post("/subscribe")
30993099
@require_permission("resources.read")
3100-
async def subscribe_resource(resource_id: str, user=Depends(get_current_user_with_permissions)) -> StreamingResponse:
3100+
async def subscribe_resource(user=Depends(get_current_user_with_permissions)) -> StreamingResponse:
31013101
"""
31023102
Subscribe to server-sent events (SSE) for a specific resource.
31033103
31043104
Args:
3105-
resource_id (str): ID of the resource to subscribe to.
31063105
user (str): Authenticated user.
31073106
31083107
Returns:
31093108
StreamingResponse: A streaming response with event updates.
31103109
"""
3111-
logger.debug(f"User {user} is subscribing to resource with resource_id {resource_id}")
3112-
return StreamingResponse(resource_service.subscribe_events(resource_id), media_type="text/event-stream")
3110+
logger.debug(f"User {user} is subscribing to resource")
3111+
return StreamingResponse(resource_service.subscribe_events(), media_type="text/event-stream")
31133112

31143113

31153114
###############

0 commit comments

Comments
 (0)