Skip to content

Commit 1a25e59

Browse files
committed
integration test
1 parent 08950ca commit 1a25e59

File tree

1 file changed

+326
-0
lines changed

1 file changed

+326
-0
lines changed
Lines changed: 326 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,326 @@
1+
# Copyright 2025 Oliver Lambson
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
"""Integration tests for jetstreampcg.
15+
16+
These tests are ported from orbit.go/pcgroups/test/stream_consumer_group_test.go
17+
"""
18+
19+
import asyncio
20+
21+
import pytest
22+
from nats.js import JetStreamContext
23+
from nats.js.api import AckPolicy, ConsumerConfig, StreamConfig, SubjectTransform
24+
25+
from jetstreampcg.elastic import (
26+
add_members,
27+
create_elastic,
28+
delete_elastic,
29+
delete_members,
30+
)
31+
from jetstreampcg.static import create_static, delete_static, static_consume
32+
33+
34+
@pytest.mark.asyncio
35+
class TestStaticIntegration:
36+
"""Integration tests for static consumer groups.
37+
38+
Ported from orbit.go/pcgroups/test/stream_consumer_group_test.go:TestStatic
39+
"""
40+
41+
async def test_static_consumer_group(self, js_client: JetStreamContext):
42+
"""Test static consumer group with two members consuming messages in parallel."""
43+
stream_name = "test-static"
44+
cg_name = "group"
45+
c1_count = 0
46+
c2_count = 0
47+
48+
# Create a stream with subject transform for partitioning
49+
await js_client.add_stream(
50+
StreamConfig(
51+
name=stream_name,
52+
subjects=["bar.*"],
53+
subject_transform=SubjectTransform(
54+
src="bar.*",
55+
dest="{{partition(2,1)}}.bar.{{wildcard(1)}}",
56+
),
57+
)
58+
)
59+
60+
# Publish 10 messages
61+
for i in range(10):
62+
await js_client.publish(f"bar.{i}", b"payload")
63+
64+
# Consumer config
65+
consumer_config = ConsumerConfig(
66+
max_ack_pending=1,
67+
ack_wait=1.0,
68+
ack_policy=AckPolicy.EXPLICIT,
69+
)
70+
71+
# Create static consumer group with 2 members
72+
await create_static(
73+
js_client,
74+
stream_name,
75+
cg_name,
76+
max_members=2,
77+
filter="bar.*",
78+
members=["m1", "m2"],
79+
member_mappings=[],
80+
)
81+
82+
# Track when to stop consuming
83+
stop_event = asyncio.Event()
84+
85+
# Consumer 1
86+
async def consume_m1():
87+
nonlocal c1_count
88+
89+
async def m1_handler(msg):
90+
nonlocal c1_count
91+
c1_count += 1
92+
await msg.ack()
93+
94+
ctx = await static_consume(
95+
js_client,
96+
stream_name,
97+
cg_name,
98+
"m1",
99+
m1_handler,
100+
consumer_config,
101+
)
102+
103+
# Wait for stop signal
104+
await stop_event.wait()
105+
ctx.stop()
106+
await ctx.done()
107+
108+
# Consumer 2
109+
async def consume_m2():
110+
nonlocal c2_count
111+
112+
async def m2_handler(msg):
113+
nonlocal c2_count
114+
c2_count += 1
115+
await msg.ack()
116+
117+
ctx = await static_consume(
118+
js_client,
119+
stream_name,
120+
cg_name,
121+
"m2",
122+
m2_handler,
123+
consumer_config,
124+
)
125+
126+
# Wait for stop signal
127+
await stop_event.wait()
128+
ctx.stop()
129+
await ctx.done()
130+
131+
# Start both consumers
132+
task1 = asyncio.create_task(consume_m1())
133+
task2 = asyncio.create_task(consume_m2())
134+
135+
# Wait for all messages to be consumed (with timeout)
136+
start_time = asyncio.get_event_loop().time()
137+
while c1_count + c2_count < 10:
138+
await asyncio.sleep(0.1)
139+
if asyncio.get_event_loop().time() - start_time > 5:
140+
pytest.fail("Timeout waiting for messages to be consumed")
141+
142+
# Signal consumers to stop
143+
stop_event.set()
144+
145+
# Wait for consumers to finish
146+
await asyncio.gather(task1, task2)
147+
148+
# Verify all messages were consumed
149+
assert c1_count + c2_count == 10
150+
151+
# Clean up
152+
await delete_static(js_client, stream_name, cg_name)
153+
154+
155+
@pytest.mark.asyncio
156+
class TestElasticIntegration:
157+
"""Integration tests for elastic consumer groups.
158+
159+
Ported from orbit.go/pcgroups/test/stream_consumer_group_test.go:TestElastic
160+
"""
161+
162+
async def test_elastic_consumer_group_with_membership_changes(
163+
self, js_client: JetStreamContext
164+
):
165+
"""Test elastic consumer group with dynamic member addition and removal."""
166+
stream_name = "test-elastic"
167+
cg_name = "group"
168+
c1_count = 0
169+
c2_count = 0
170+
171+
# Create a stream
172+
await js_client.add_stream(
173+
StreamConfig(
174+
name=stream_name,
175+
subjects=["bar.*"],
176+
)
177+
)
178+
179+
# Publish 10 messages
180+
for i in range(10):
181+
await js_client.publish(f"bar.{i}", b"payload")
182+
183+
# Consumer config
184+
consumer_config = ConsumerConfig(
185+
max_ack_pending=1,
186+
ack_wait=1.0,
187+
ack_policy=AckPolicy.EXPLICIT,
188+
)
189+
190+
# Create elastic consumer group with max 2 members
191+
await create_elastic(
192+
js_client,
193+
stream_name,
194+
cg_name,
195+
max_num_members=2,
196+
filter="bar.*",
197+
partitioning_wildcards=[1],
198+
)
199+
200+
# Track when to stop consuming
201+
stop_event_m1 = asyncio.Event()
202+
stop_event_m2 = asyncio.Event()
203+
204+
# Consumer 1
205+
async def consume_m1():
206+
nonlocal c1_count
207+
208+
async def m1_handler(msg):
209+
nonlocal c1_count
210+
c1_count += 1
211+
await msg.ack()
212+
213+
from jetstreampcg.elastic import elastic_consume
214+
215+
ctx = await elastic_consume(
216+
js_client,
217+
stream_name,
218+
cg_name,
219+
"m1",
220+
m1_handler,
221+
consumer_config,
222+
)
223+
224+
# Wait for stop signal
225+
await stop_event_m1.wait()
226+
ctx.stop()
227+
await ctx.done()
228+
229+
# Consumer 2
230+
async def consume_m2():
231+
nonlocal c2_count
232+
233+
async def m2_handler(msg):
234+
nonlocal c2_count
235+
c2_count += 1
236+
await msg.ack()
237+
238+
from jetstreampcg.elastic import elastic_consume
239+
240+
ctx = await elastic_consume(
241+
js_client,
242+
stream_name,
243+
cg_name,
244+
"m2",
245+
m2_handler,
246+
consumer_config,
247+
)
248+
249+
# Wait for stop signal
250+
await stop_event_m2.wait()
251+
ctx.stop()
252+
await ctx.done()
253+
254+
# Start both consumers
255+
task1 = asyncio.create_task(consume_m1())
256+
task2 = asyncio.create_task(consume_m2())
257+
258+
# Add only m1 to membership
259+
await add_members(js_client, stream_name, cg_name, ["m1"])
260+
261+
# Wait for m1 to consume all 10 messages (m2 should not consume any)
262+
start_time = asyncio.get_event_loop().time()
263+
while c1_count != 10 or c2_count != 0:
264+
await asyncio.sleep(0.1)
265+
if asyncio.get_event_loop().time() - start_time > 5:
266+
pytest.fail(
267+
f"Timeout: expected c1=10, c2=0, got c1={c1_count}, c2={c2_count}"
268+
)
269+
270+
assert c1_count == 10
271+
assert c2_count == 0
272+
273+
# Add m2 to membership
274+
await add_members(js_client, stream_name, cg_name, ["m2"])
275+
276+
# Wait a bit for m2 to be effectively added
277+
await asyncio.sleep(0.05)
278+
279+
# Publish 10 more messages
280+
for i in range(10):
281+
await js_client.publish(f"bar.{i}", b"payload")
282+
283+
# Wait for messages to be split between m1 and m2
284+
start_time = asyncio.get_event_loop().time()
285+
while c1_count + c2_count < 20:
286+
await asyncio.sleep(0.1)
287+
if asyncio.get_event_loop().time() - start_time > 10:
288+
pytest.fail(
289+
f"Timeout: expected total=20, got c1={c1_count}, c2={c2_count}"
290+
)
291+
292+
# Both should have consumed some messages (split between them)
293+
assert c1_count == 15
294+
assert c2_count == 5
295+
296+
# Remove m1 from membership
297+
await delete_members(js_client, stream_name, cg_name, ["m1"])
298+
299+
# Wait a bit for m1 to be effectively deleted
300+
await asyncio.sleep(0.05)
301+
302+
# Publish 10 more messages
303+
for i in range(10):
304+
await js_client.publish(f"bar.{i}", b"payload")
305+
306+
# Wait for m2 to consume all new messages (m1 should not consume any more)
307+
start_time = asyncio.get_event_loop().time()
308+
while c1_count != 15 or c2_count != 15:
309+
await asyncio.sleep(0.1)
310+
if asyncio.get_event_loop().time() - start_time > 10:
311+
pytest.fail(
312+
f"Timeout: expected c1=15, c2=15, got c1={c1_count}, c2={c2_count}"
313+
)
314+
315+
assert c1_count == 15
316+
assert c2_count == 15
317+
318+
# Signal consumers to stop
319+
stop_event_m1.set()
320+
stop_event_m2.set()
321+
322+
# Wait for consumers to finish
323+
await asyncio.gather(task1, task2)
324+
325+
# Clean up
326+
await delete_elastic(js_client, stream_name, cg_name)

0 commit comments

Comments
 (0)