Skip to content

Commit 08950ca

Browse files
committed
elastic.py
1 parent c9e9860 commit 08950ca

File tree

4 files changed

+1758
-0
lines changed

4 files changed

+1758
-0
lines changed

jetstreampcg/src/jetstreampcg/__init__.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,76 @@
1414
"""JetStream partitioned consumer groups is an implementation of a client-side partitioned consumer group feature for NATS streams."""
1515

1616
from jetstreampcg._version import __version__
17+
from jetstreampcg.common import (
18+
ConsumerGroupConsumeContext,
19+
ConsumerGroupMsg,
20+
MemberMapping,
21+
compose_key,
22+
generate_partition_filters,
23+
)
24+
from jetstreampcg.elastic import (
25+
ElasticConsumerGroupConfig,
26+
ElasticConsumerGroupConsumerInstance,
27+
add_members,
28+
create_elastic,
29+
delete_elastic,
30+
delete_member_mappings,
31+
delete_members,
32+
elastic_consume,
33+
elastic_get_partition_filters,
34+
elastic_is_in_membership_and_active,
35+
elastic_member_step_down,
36+
get_elastic_consumer_group_config,
37+
list_elastic_active_members,
38+
list_elastic_consumer_groups,
39+
set_member_mappings,
40+
)
41+
from jetstreampcg.static import (
42+
StaticConsumerGroupConfig,
43+
StaticConsumerGroupConsumerInstance,
44+
create_static,
45+
delete_static,
46+
get_static_consumer_group_config,
47+
list_static_active_members,
48+
list_static_consumer_groups,
49+
static_consume,
50+
static_member_step_down,
51+
validate_static_config,
52+
)
1753

1854
__all__ = [
55+
# Common
56+
"ConsumerGroupConsumeContext",
57+
"ConsumerGroupMsg",
58+
# Elastic
59+
"ElasticConsumerGroupConfig",
60+
"ElasticConsumerGroupConsumerInstance",
61+
"MemberMapping",
62+
# Static
63+
"StaticConsumerGroupConfig",
64+
"StaticConsumerGroupConsumerInstance",
1965
"__version__",
66+
"add_members",
67+
"compose_key",
68+
"create_elastic",
69+
"create_static",
70+
"delete_elastic",
71+
"delete_member_mappings",
72+
"delete_members",
73+
"delete_static",
74+
"elastic_consume",
75+
"elastic_get_partition_filters",
76+
"elastic_is_in_membership_and_active",
77+
"elastic_member_step_down",
78+
"generate_partition_filters",
79+
"get_elastic_consumer_group_config",
80+
"get_static_consumer_group_config",
81+
"list_elastic_active_members",
82+
"list_elastic_consumer_groups",
83+
"list_static_active_members",
84+
"list_static_consumer_groups",
85+
"set_member_mappings",
86+
"static_consume",
87+
"static_member_step_down",
88+
"validate_static_config",
2089
]
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
"""Minimal example of graceful exit implementation for reference in development of the consumers"""
2+
3+
import asyncio
4+
import logging
5+
import random
6+
7+
8+
async def watch_done(future: asyncio.Future[None]) -> None:
9+
"""A task that will complete when the done future result is set"""
10+
logger = logging.getLogger("watch_done")
11+
12+
try:
13+
logger.info("awaiting future...")
14+
await future
15+
logger.info("future set")
16+
except asyncio.CancelledError:
17+
logger.info("graceful shutdown complete")
18+
raise
19+
20+
21+
async def watch_cancel(event: asyncio.Event) -> None:
22+
"""A task that will complete when the cancel event is set"""
23+
logger = logging.getLogger("watch_cancel")
24+
25+
try:
26+
logger.info("awaiting event...")
27+
_ = await event.wait()
28+
logger.info("event set")
29+
except asyncio.CancelledError:
30+
logger.info("graceful shutdown complete")
31+
raise
32+
33+
34+
async def main_loop() -> None:
35+
"""Where the real work actually happens"""
36+
logger = logging.getLogger("main_loop")
37+
38+
try:
39+
while True:
40+
logger.info("doing work")
41+
_ = await asyncio.sleep(1)
42+
except asyncio.CancelledError:
43+
logger.info("received cancellation signal...")
44+
await asyncio.sleep(random.uniform(2, 4)) # noqa: S311
45+
logger.info("graceful shutdown complete")
46+
raise
47+
48+
49+
async def supervisor(done: asyncio.Future[None], cancel: asyncio.Event) -> None:
50+
"""Randomly sets done or cancel"""
51+
logger = logging.getLogger("supervisor")
52+
53+
try:
54+
while True:
55+
logger.info("supervising")
56+
_ = await asyncio.sleep(1)
57+
x = random.random() # noqa: S311
58+
if x < 0.8:
59+
logger.info("continue")
60+
elif 0.8 <= x < 0.9:
61+
if not done.done():
62+
done.set_result(None)
63+
logger.info("set done result")
64+
elif 0.9 <= x < 1.0:
65+
if not cancel.is_set():
66+
cancel.set()
67+
logger.info("set cancel")
68+
69+
except asyncio.CancelledError:
70+
logger.info("received cancellation signal...")
71+
await asyncio.sleep(1)
72+
logger.info("graceful shutdown complete")
73+
raise
74+
75+
76+
async def main() -> None:
77+
"""Creates all concurrent tasks"""
78+
logger = logging.getLogger("main")
79+
80+
done_future = asyncio.Future[None]()
81+
cancel_event = asyncio.Event()
82+
83+
tasks = [
84+
asyncio.create_task(watch_done(done_future), name="watch_done"),
85+
asyncio.create_task(watch_cancel(cancel_event), name="watch_cancel"),
86+
asyncio.create_task(supervisor(done_future, cancel_event), name="supervisor"),
87+
asyncio.create_task(main_loop(), name="main_loop"),
88+
]
89+
logger.info("tasks created")
90+
try:
91+
logger.info("waiting for first task to complete")
92+
_, pending = await asyncio.wait(
93+
tasks,
94+
return_when=asyncio.FIRST_COMPLETED,
95+
)
96+
97+
if pending:
98+
try:
99+
_ = await asyncio.wait_for(asyncio.gather(*pending), timeout=3)
100+
except asyncio.TimeoutError:
101+
logger.warning("graceful shutdown timeout - forcing exit")
102+
103+
except asyncio.CancelledError:
104+
logger.info("graceful shutdown requested, waiting...")
105+
_, pending = await asyncio.wait(tasks, timeout=3)
106+
if pending:
107+
for p in pending:
108+
_ = p.cancel()
109+
logger.warning("%s didn't complete graceful shutdown", p.get_name())
110+
111+
112+
def entrypoint() -> None:
113+
"""Program entrypoint"""
114+
logging.basicConfig(
115+
level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s::%(message)s"
116+
)
117+
logger = logging.getLogger("entrypoint")
118+
119+
logger.info("launching main")
120+
try:
121+
asyncio.run(main())
122+
except KeyboardInterrupt:
123+
logger.info("goodbye.")
124+
125+
126+
if __name__ == "__main__":
127+
entrypoint()

0 commit comments

Comments
 (0)