Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions src/ess/livedata/config/instruments/bifrost/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
BifrostElasticQMapParams,
BifrostQMapParams,
BifrostWorkflowParams,
BraggPeakQMapParams,
DetectorRatemeterParams,
DetectorRatemeterRegionParams,
)
Expand Down Expand Up @@ -212,6 +213,93 @@ def _custom_elastic_qmap_workflow(
wf[CutAxis2] = axis2
return _make_cut_stream_processor(wf)

# Bragg peak monitor Q-map workflow
from ess.bifrost.single_crystal import BifrostBraggPeakMonitorWorkflow
from ess.bifrost.single_crystal.types import (
IntensityQparQperp,
QParallelBins,
QPerpendicularBins,
)
from ess.livedata.handlers.detector_data_handler import get_nexus_geometry_filename
from ess.reduce.nexus.types import (
EmptyDetector,
GravityVector,
Position,
RawDetector,
RunType,
)
from ess.reduce.nexus.workflow import assemble_detector_data
from ess.reduce.time_of_flight import DetectorLtotal

def _bragg_peak_monitor_ltotal(
detector_beamline: RawDetector[RunType],
source_position: Position[snx.NXsource, RunType],
sample_position: Position[snx.NXsample, RunType],
gravity: GravityVector,
) -> DetectorLtotal[RunType]:
"""Compute detector Ltotal using straight line approximation."""
from ess.reduce.time_of_flight import eto_to_tof

return eto_to_tof.detector_ltotal_from_straight_line_approximation(
detector_beamline, # type: ignore[arg-type]
source_position=source_position,
sample_position=sample_position,
gravity=gravity,
)

def _make_bragg_peak_monitor_empty_detector() -> sc.DataArray:
"""
Create EmptyDetector geometry for the Bragg peak monitor.

Hard-coded position approximately 1m upstream of the sample along the beam.
This is a placeholder until the actual geometry is in the NeXus file.
"""
# Position: 1m upstream of sample (sample is at origin, beam along -z)
position = sc.vector([0.0, 0.0, -1.0], unit='m')
return sc.DataArray(
data=sc.zeros(sizes={}, unit='counts'),
coords={
'position': position,
'detector_number': sc.scalar(0, unit=None),
},
)

@specs.bragg_peak_qmap_handle.attach_factory()
def _bragg_peak_qmap_workflow(
params: BraggPeakQMapParams,
) -> StreamProcessorWorkflow:
wf = BifrostBraggPeakMonitorWorkflow()
wf[Filename[SampleRun]] = get_nexus_geometry_filename('bifrost')
wf[TimeOfFlightLookupTableFilename] = tof_lookup_table_simulation()
wf[PreopenNeXusFile] = PreopenNeXusFile(True)

# Provide hard-coded Bragg peak monitor geometry
# (bypass NeXus loading for detector position)
wf[EmptyDetector[SampleRun]] = _make_bragg_peak_monitor_empty_detector()

# Insert custom providers:
# - assemble_detector_data: bypasses McStas chain (NeXusData → RawDetector)
# - _bragg_peak_monitor_ltotal: straight-line Ltotal for monitor geometry
wf.insert(assemble_detector_data)
wf.insert(_bragg_peak_monitor_ltotal)

# Set histogram bins from params
wf[QParallelBins] = params.q_parallel_bins.get_edges().rename(Q='Q_parallel')
wf[QPerpendicularBins] = params.q_perpendicular_bins.get_edges().rename(
Q='Q_perpendicular'
)

return StreamProcessorWorkflow(
wf,
dynamic_keys={'bragg_peak_monitor': NeXusData[NXdetector, SampleRun]},
context_keys={
'detector_rotation': InstrumentAngle[SampleRun],
'sample_rotation': SampleAngle[SampleRun],
},
target_keys={'q_map': IntensityQparQperp[SampleRun]},
accumulators=(IntensityQparQperp[SampleRun],),
)


def _transpose_with_coords(data: sc.DataArray, dims: tuple[str, ...]) -> sc.DataArray:
"""
Expand Down
49 changes: 48 additions & 1 deletion src/ess/livedata/config/instruments/bifrost/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
DetectorViewOutputs,
DetectorViewParams,
)
from ess.livedata.parameter_models import EnergyEdges, QEdges
from ess.livedata.parameter_models import EnergyEdges, QEdges, QUnit


# Arc energies in meV
Expand Down Expand Up @@ -212,6 +212,41 @@ class BifrostAuxSources(AuxSourcesBase):
)


class BraggPeakAuxSources(AuxSourcesBase):
"""Auxiliary source for Bragg peak monitor Q-map workflow."""

detector_rotation: Literal['detector_rotation'] = pydantic.Field(
default='detector_rotation',
description='Detector tank rotation angle (a4). '
'Required because the monitor is mounted on the detector tank.',
)
sample_rotation: Literal['sample_rotation'] = pydantic.Field(
default='sample_rotation', description='Sample rotation angle (a3).'
)


class BraggPeakQMapParams(pydantic.BaseModel):
"""Parameters for Bragg peak monitor Q-map workflow."""

q_parallel_bins: QEdges = pydantic.Field(
default=QEdges(start=-3.0, stop=3.0, num_bins=100, unit=QUnit.INVERSE_ANGSTROM),
description="Q_parallel bin edges.",
)
q_perpendicular_bins: QEdges = pydantic.Field(
default=QEdges(start=-3.0, stop=3.0, num_bins=100, unit=QUnit.INVERSE_ANGSTROM),
description="Q_perpendicular bin edges.",
)


class BraggPeakQMapOutputs(WorkflowOutputsBase):
"""Outputs for Bragg peak monitor Q-map workflow."""

q_map: sc.DataArray = pydantic.Field(
title='Q Map',
description='2D histogram in Q_parallel vs Q_perpendicular.',
)


class QMapOutputs(WorkflowOutputsBase):
"""Outputs for Bifrost Q-map workflows."""

Expand Down Expand Up @@ -315,3 +350,15 @@ class QMapOutputs(WorkflowOutputsBase):
aux_sources=BifrostAuxSources,
outputs=QMapOutputs,
)

# Bragg peak monitor Q-map workflow
bragg_peak_qmap_handle = instrument.register_spec(
name='bragg_peak_qmap',
version=1,
title='Bragg peak Q map',
description='Q-map from Bragg peak monitor in Q_parallel vs Q_perpendicular.',
source_names=['bragg_peak_monitor'],
params=BraggPeakQMapParams,
aux_sources=BraggPeakAuxSources,
outputs=BraggPeakQMapOutputs,
)
4 changes: 4 additions & 0 deletions src/ess/livedata/config/instruments/bifrost/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ def _bifrost_generator() -> Generator[tuple[str, tuple[int, int]]]:

detector_fakes = dict(_bifrost_generator())

# Bragg peak monitor: single pixel detector with detector_number range 1-1
# (conceptually similar to a monitor but uses detector event format)
detector_fakes['bragg_peak_monitor'] = (1, 1)

# Detector number configuration
detector_number = sc.arange('detector_number', 1, 5 * 3 * 9 * 100 + 1, unit=None).fold(
dim='detector_number', sizes={'arc': 5, 'tube': 3, 'channel': 9, 'pixel': 100}
Expand Down
13 changes: 11 additions & 2 deletions src/ess/livedata/kafka/message_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,12 @@ def adapt(self, message: KafkaMessage) -> Message[MonitorEvents]:
class Ev44ToDetectorEventsAdapter(
MessageAdapter[Message[eventdata_ev44.EventData], Message[DetectorEvents]]
):
def __init__(self, *, merge_detectors: bool = False):
def __init__(
self,
*,
merge_detectors: bool = False,
exclude_from_merge: frozenset[str] | None = None,
):
"""
Parameters
----------
Expand All @@ -218,14 +223,18 @@ def __init__(self, *, merge_detectors: bool = False):
useful for instruments with many detector banks that should be treated as a
single bank. Note that event_id/detector_number must be unique across all
detectors.
exclude_from_merge
Set of detector names to exclude from merging. These detectors will keep
their original stream name even when merge_detectors is True.
"""
self._merge_detectors = merge_detectors
self._exclude_from_merge = exclude_from_merge or frozenset()

def adapt(
self, message: Message[eventdata_ev44.EventData]
) -> Message[DetectorEvents]:
stream = message.stream
if self._merge_detectors:
if self._merge_detectors and stream.name not in self._exclude_from_merge:
stream = replace(stream, name='unified_detector')
return Message(
timestamp=message.timestamp,
Expand Down
8 changes: 7 additions & 1 deletion src/ess/livedata/kafka/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,19 @@ def with_beam_monitor_route(self) -> Self:

def with_detector_route(self) -> Self:
"""Adds the detector route."""
# Bifrost merges all detector banks into unified_detector, except for
# standalone detectors like bragg_peak_monitor that need separate streams.
is_bifrost = self._stream_mapping.instrument == 'bifrost'
adapter = ChainedAdapter(
first=KafkaToEv44Adapter(
stream_lut=self._stream_mapping.detectors,
stream_kind=StreamKind.DETECTOR_EVENTS,
),
second=Ev44ToDetectorEventsAdapter(
merge_detectors=self._stream_mapping.instrument == 'bifrost'
merge_detectors=is_bifrost,
exclude_from_merge=frozenset({'bragg_peak_monitor'})
if is_bifrost
else None,
),
)
for topic in self._stream_mapping.detector_topics:
Expand Down
100 changes: 100 additions & 0 deletions tests/bifrost_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,103 @@ def test_ratemeter_includes_time_coordinate(self, bifrost_workflow):

assert 'time' in result.coords
assert result.coords['time'].unit == 'ns'


class TestBraggPeakQMap:
"""Tests for the Bragg peak monitor Q-map workflow."""

def test_bragg_peak_qmap_workflow_can_be_created(self):
"""Test that the workflow factory can create the workflow."""
from ess.livedata.config import workflow_spec
from ess.livedata.config.instruments.bifrost import factories
from ess.livedata.config.instruments.bifrost.specs import instrument

# Load factories
factories.setup_factories(instrument)

# Get the workflow spec
workflow_id = workflow_spec.WorkflowId(
instrument='bifrost',
namespace='data_reduction',
name='bragg_peak_qmap',
version=1,
)
config = workflow_spec.WorkflowConfig(
identifier=workflow_id,
aux_source_names={
'detector_rotation': 'detector_rotation',
'sample_rotation': 'sample_rotation',
},
)

# Try to create the workflow
workflow = instrument.workflow_factory.create(
source_name='bragg_peak_monitor', config=config
)
assert workflow is not None

def test_bragg_peak_qmap_workflow_can_accumulate_data(self):
"""Test that the workflow can accumulate data without errors."""
from ess.livedata.config import workflow_spec
from ess.livedata.config.instruments.bifrost import factories
from ess.livedata.config.instruments.bifrost.specs import instrument

# Load factories
factories.setup_factories(instrument)

# Get the workflow spec
workflow_id = workflow_spec.WorkflowId(
instrument='bifrost',
namespace='data_reduction',
name='bragg_peak_qmap',
version=1,
)
config = workflow_spec.WorkflowConfig(
identifier=workflow_id,
aux_source_names={
'detector_rotation': 'detector_rotation',
'sample_rotation': 'sample_rotation',
},
)

# Create the workflow
workflow = instrument.workflow_factory.create(
source_name='bragg_peak_monitor', config=config
)

# Create minimal test data for bragg_peak_monitor
# Single pixel detector with some events
epoch = sc.epoch(unit='ns')
event_time_zero = epoch + sc.array(
dims=['event_time_zero'], values=[1_000_000], unit='ns', dtype='int64'
)
event_id = sc.array(dims=['event'], values=[1], unit=None, dtype='int32')
event_time_offset = sc.array(
dims=['event'], values=[100], unit='ns', dtype='int64'
)
weights = sc.ones(sizes={'event': 1}, dtype='float64', unit='counts')
events = sc.DataArray(
data=weights, coords={'event_time_offset': event_time_offset}
)
events.coords['event_id'] = event_id

sizes = sc.array(dims=['event_time_zero'], values=[1], unit=None, dtype='int64')
begin = sc.cumsum(sizes, mode='exclusive')
detector_data = sc.DataArray(sc.bins(begin=begin, dim='event', data=events))
detector_data.coords['event_time_zero'] = event_time_zero

# Create rotation data (as DataArrays, not scalars)
detector_rotation = sc.DataArray(
data=sc.scalar(90.0, unit='deg')
) # a4 - detector tank at 90 deg
sample_rotation = sc.DataArray(
data=sc.scalar(45.0, unit='deg')
) # a3 - sample rotation

# Try to accumulate
data = {
'bragg_peak_monitor': detector_data,
'detector_rotation': detector_rotation,
'sample_rotation': sample_rotation,
}
workflow.accumulate(data, start_time=0, end_time=1_000_000_000)
38 changes: 38 additions & 0 deletions tests/kafka/message_adapter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,44 @@ def test_adapter_merge_detectors(self) -> None:
assert result.stream.name == "unified_detector"
assert isinstance(result.value, DetectorEvents)

def test_adapter_exclude_from_merge(self) -> None:
adapter = Ev44ToDetectorEventsAdapter(
merge_detectors=True,
exclude_from_merge=frozenset({'special_detector'}),
)

# Regular detector should be merged
regular_message = Message(
timestamp=1234,
stream=StreamId(kind=StreamKind.DETECTOR_EVENTS, name="detector1"),
value=eventdata_ev44.EventData(
source_name="detector1",
message_id=0,
reference_time=np.array([1234]),
reference_time_index=[0],
time_of_flight=np.array([123456]),
pixel_id=np.array([1]),
),
)
result = adapter.adapt(regular_message)
assert result.stream.name == "unified_detector"

# Excluded detector should keep its name
excluded_message = Message(
timestamp=1234,
stream=StreamId(kind=StreamKind.DETECTOR_EVENTS, name="special_detector"),
value=eventdata_ev44.EventData(
source_name="special_detector",
message_id=0,
reference_time=np.array([1234]),
reference_time_index=[0],
time_of_flight=np.array([123456]),
pixel_id=np.array([1]),
),
)
result = adapter.adapt(excluded_message)
assert result.stream.name == "special_detector"


def message_with_schema(schema: str) -> KafkaMessage:
"""
Expand Down
Loading