Skip to content

Commit 503418c

Browse files
committed
Add ADR
1 parent a1f678e commit 503418c

File tree

3 files changed

+52
-14
lines changed

3 files changed

+52
-14
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# 9. Kafka streaming
2+
3+
## Status
4+
5+
Current
6+
7+
## Context
8+
9+
Many facilities stream bluesky documents to an event-bus for consumption by out-of-process listeners.
10+
Event buses used for this purpose at other facilities include ZeroMQ, RabbitMQ, Kafka, Redis, NATS, and
11+
others.
12+
13+
The capability this provides is that callbacks can be run in different processes or on other computers,
14+
without holding up or interfering with the local `RunEngine`. Other groups at ISIS have expressed some
15+
interest in being able to subscribe to bluesky documents.
16+
17+
## Decision
18+
19+
- We will stream our messages to Kafka, as opposed to some other message bus. This is because we already
20+
have Kafka infrastructure available for other purposes (e.g. event data & sample-environment data).
21+
- At the time of writing, we will not **depend** on Kafka for anything critical. This is because the
22+
central Kafka instance is not currently considered "reliable" in an experiment controls context. However,
23+
streaming the documents will allow testing to be done. Kafka will eventually be deployed in a "reliable"
24+
way accessible to each instrument.
25+
- We will encode messages from bluesky using `msgpack` (with the `msgpack-numpy` extension), because:
26+
- It is the default encoder used by the upstream `bluesky-kafka` integration
27+
- It is a schema-less encoder, meaning we do not have to write/maintain fixed schemas for all the
28+
documents allowed by `event-model`
29+
- It has reasonable performance in terms of encoding speed and message size
30+
- `msgpack` is very widely supported in a range of programming languages
31+
- Kafka brokers will be configurable via an environment variable, `IBEX_BLUESKY_CORE_KAFKA_BROKER`
32+
33+
```{note}
34+
Wherever Kafka is mentioned above, the actual implementation may be a Kafka-like (e.g. RedPanda).
35+
```
36+
37+
## Justification & Consequences
38+
39+
We will stream bluesky documents to Kafka, encoded using `msgpack-numpy`.
40+
41+
At the time of writing this is purely to enable testing, and will not be used for "production" workflows.

pyproject.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,13 @@ dependencies = [
4343
"bluesky", # Bluesky framework
4444
"bluesky-kafka", # Bluesky-kafka integration
4545
"ophyd-async[ca] == 0.12.3", # Device abstraction
46-
"matplotlib", # Plotting
4746
"lmfit", # Fitting
48-
"scipy", # Definitions of erf/erfc functions
47+
"matplotlib", # Plotting
48+
"msgpack-numpy", # Encoding kafka messages
4949
"numpy", # General array support
5050
"orjson", # json module which handles numpy arrays transparently
5151
"scipp", # support for arrays with variances/units
52+
"scipy", # Definitions of erf/erfc functions
5253
"scippneutron", # neutron-specific utilities for scipp
5354
"typing-extensions", # TypeVar with default-arg support
5455
"tzdata", # Windows timezone support

src/ibex_bluesky_core/run_engine/__init__.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,32 +3,28 @@
33
import asyncio
44
import functools
55
import logging
6+
import os
7+
import socket
68
from collections.abc import Generator
79
from functools import cache
810
from threading import Event, Lock
911
from typing import Any, cast
1012

1113
import bluesky.preprocessors as bpp
12-
import msgpack
14+
import msgpack_numpy
1315
from bluesky.run_engine import RunEngine, RunEngineResult
1416
from bluesky.utils import DuringTask, Msg, RunEngineControlException, RunEngineInterrupted
15-
16-
from ibex_bluesky_core.callbacks import DocLoggingCallback
17-
from ibex_bluesky_core.preprocessors import add_rb_number_processor
18-
19-
__all__ = ["get_kafka_topic_name", "get_run_engine", "run_plan"]
20-
21-
22-
import os
23-
import socket
24-
2517
from bluesky_kafka import Publisher
2618

19+
from ibex_bluesky_core.callbacks import DocLoggingCallback
2720
from ibex_bluesky_core.plan_stubs import CALL_QT_AWARE_MSG_KEY, CALL_SYNC_MSG_KEY
21+
from ibex_bluesky_core.preprocessors import add_rb_number_processor
2822
from ibex_bluesky_core.run_engine._msg_handlers import call_qt_aware_handler, call_sync_handler
2923
from ibex_bluesky_core.utils import is_matplotlib_backend_qt
3024
from ibex_bluesky_core.version import version
3125

26+
__all__ = ["get_kafka_topic_name", "get_run_engine", "run_plan"]
27+
3228
logger = logging.getLogger(__name__)
3329

3430

@@ -128,7 +124,7 @@ def get_run_engine() -> RunEngine:
128124
topic=get_kafka_topic_name(),
129125
bootstrap_servers=os.environ.get("IBEX_BLUESKY_CORE_KAFKA_BROKER", DEFAULT_KAFKA_BROKER),
130126
key="doc",
131-
serializer=msgpack.dumps,
127+
serializer=msgpack_numpy.dumps,
132128
producer_config={"enable.idempotence": True},
133129
)
134130
RE.subscribe(kafka_callback)

0 commit comments

Comments
 (0)