Skip to content

Commit 35d94ea

Browse files
committed
Add integration test for OpenStack events WebSocket endpoint
Cover the /v1/events/openstack WebSocket and the in-process websocket_manager via the FastAPI TestClient: connection accept, filter-message acknowledgment, delivery of a matching event, filtering-out of a non-matching event, and connection-count cleanup on disconnect. The TestClient fixture is module-scoped because the manager's module-level asyncio primitives bind to the first event loop that touches them; sharing one loop across the module keeps them valid. Events are pushed onto the loop-bound queue via the app's portal. Assisted-by: Claude:claude-opus-4-8 Signed-off-by: Christian Berendt <berendt@osism.tech>
1 parent 480bb1b commit 35d94ea

3 files changed

Lines changed: 144 additions & 1 deletion

File tree

Pipfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,4 @@ httpx = "==0.28.1"
4646
pytest = "==9.1.1"
4747
pytest-cov = "==7.1.0"
4848
pytest-mock = "==3.15.1"
49+
pytest-timeout = "==2.4.0"

Pipfile.lock

Lines changed: 10 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
3+
"""Integration tests for the OpenStack events WebSocket endpoint.
4+
5+
``GET /v1/events/openstack`` upgrades to a WebSocket served by the in-process
6+
``websocket_manager`` (``osism/services/websocket_manager.py``). Driving it
7+
through ``fastapi.testclient.TestClient`` exercises the connect / set-filters /
8+
broadcast / disconnect path end-to-end. The manager broadcasts in-process via
9+
an ``asyncio`` queue, so no service beyond the ``TestClient`` is needed; the
10+
module is marked ``integration`` only to share the FastAPI / ``httpx`` setup
11+
with the sibling facts test and stay in the same Tier 2 batch.
12+
"""
13+
14+
import uuid
15+
16+
import pytest
17+
18+
from osism.services.websocket_manager import EventMessage, websocket_manager
19+
20+
# Most tests block on ``ws.receive_json()``, which bottoms out in Starlette's
21+
# untimed ``queue.get()``. A regression that stops the server from emitting the
22+
# ack or the matching event would otherwise hang forever and only die on the CI
23+
# wall-clock timeout, disguising the failure. Cap every test so the hang turns
24+
# into a 30-second failure at the exact ``receive_json()`` call instead.
25+
pytestmark = [pytest.mark.integration, pytest.mark.timeout(30)]
26+
27+
28+
@pytest.fixture(scope="module")
29+
def client():
30+
"""A module-scoped ``TestClient`` bound to the FastAPI app.
31+
32+
``osism.api`` is imported lazily because importing it wires the event
33+
bridge to Redis at module load -- safe only in the integration environment
34+
where Redis is up. The fixture is module-scoped on purpose: the global
35+
``websocket_manager`` owns module-level ``asyncio`` primitives
36+
(``event_queue``, ``_lock``) that bind to the first event loop that touches
37+
them and raise "bound to a different event loop" on any other. Sharing one
38+
``TestClient`` (one loop) across this module keeps them valid.
39+
"""
40+
from fastapi.testclient import TestClient
41+
42+
from osism import api
43+
44+
with TestClient(api.app) as test_client:
45+
yield test_client
46+
47+
48+
def test_websocket_connect_is_accepted(client):
49+
"""The endpoint accepts the WebSocket upgrade."""
50+
with client.websocket_connect("/v1/events/openstack"):
51+
pass
52+
53+
54+
def test_set_filters_is_acknowledged(client):
55+
"""A ``set_filters`` message is processed and acknowledged verbatim."""
56+
with client.websocket_connect("/v1/events/openstack") as ws:
57+
ws.send_json(
58+
{
59+
"action": "set_filters",
60+
"event_filters": ["compute.instance.create.end"],
61+
"node_filters": ["server-01"],
62+
"service_filters": ["compute"],
63+
}
64+
)
65+
66+
ack = ws.receive_json()
67+
68+
assert ack["type"] == "filter_update"
69+
assert ack["status"] == "success"
70+
assert ack["event_filters"] == ["compute.instance.create.end"]
71+
assert ack["node_filters"] == ["server-01"]
72+
assert ack["service_filters"] == ["compute"]
73+
74+
75+
def test_matching_event_is_delivered(client):
76+
"""An event matching the connection's filters is delivered intact."""
77+
with client.websocket_connect("/v1/events/openstack") as ws:
78+
ws.send_json(
79+
{"action": "set_filters", "event_filters": ["compute.instance.create.end"]}
80+
)
81+
ws.receive_json() # filter acknowledgment
82+
83+
event = EventMessage(
84+
"compute.instance.create.end", "openstack", {"server": str(uuid.uuid4())}
85+
)
86+
# Push onto the in-process queue from the app's event loop: the queue is
87+
# loop-bound, so enqueuing from the test thread would be unsafe.
88+
client.portal.call(websocket_manager.add_event, event)
89+
90+
received = ws.receive_json()
91+
92+
assert received == event.to_dict()
93+
94+
95+
def test_non_matching_event_is_filtered_out(client):
96+
"""An event that does not match the filters is not delivered."""
97+
with client.websocket_connect("/v1/events/openstack") as ws:
98+
ws.send_json(
99+
{"action": "set_filters", "event_filters": ["compute.instance.create.end"]}
100+
)
101+
ws.receive_json() # filter acknowledgment
102+
103+
non_matching = EventMessage(
104+
"baremetal.node.power_set.end", "openstack", {"node": str(uuid.uuid4())}
105+
)
106+
sentinel = EventMessage(
107+
"compute.instance.create.end", "openstack", {"server": str(uuid.uuid4())}
108+
)
109+
# Both are queued FIFO; the broadcaster skips the non-matching event, so
110+
# the first (and only) message received is the matching sentinel. This
111+
# proves the non-matching event was dropped without an absence/timeout.
112+
client.portal.call(websocket_manager.add_event, non_matching)
113+
client.portal.call(websocket_manager.add_event, sentinel)
114+
115+
received = ws.receive_json()
116+
117+
assert received["id"] == sentinel.id
118+
assert received["event_type"] == "compute.instance.create.end"
119+
120+
121+
def test_disconnect_drops_connection_count(client):
122+
"""Disconnecting runs the ``finally`` cleanup and drops the count."""
123+
before = len(websocket_manager.connections)
124+
125+
with client.websocket_connect("/v1/events/openstack") as ws:
126+
# Set filters and consume the ack to force the handler past
127+
# ``connect()``'s registration before asserting the count.
128+
ws.send_json({"action": "set_filters", "event_filters": ["compute.x"]})
129+
ws.receive_json()
130+
131+
assert len(websocket_manager.connections) == before + 1
132+
133+
assert len(websocket_manager.connections) == before

0 commit comments

Comments
 (0)