diff --git a/src/ess/livedata/config/instruments/bifrost/factories.py b/src/ess/livedata/config/instruments/bifrost/factories.py index 5402fcc62..36a928823 100644 --- a/src/ess/livedata/config/instruments/bifrost/factories.py +++ b/src/ess/livedata/config/instruments/bifrost/factories.py @@ -18,6 +18,7 @@ BifrostElasticQMapParams, BifrostQMapParams, BifrostWorkflowParams, + BraggPeakQMapParams, DetectorRatemeterParams, DetectorRatemeterRegionParams, ) @@ -212,6 +213,93 @@ def _custom_elastic_qmap_workflow( wf[CutAxis2] = axis2 return _make_cut_stream_processor(wf) + # Bragg peak monitor Q-map workflow + from ess.bifrost.single_crystal import BifrostBraggPeakMonitorWorkflow + from ess.bifrost.single_crystal.types import ( + IntensityQparQperp, + QParallelBins, + QPerpendicularBins, + ) + from ess.livedata.handlers.detector_data_handler import get_nexus_geometry_filename + from ess.reduce.nexus.types import ( + EmptyDetector, + GravityVector, + Position, + RawDetector, + RunType, + ) + from ess.reduce.nexus.workflow import assemble_detector_data + from ess.reduce.time_of_flight import DetectorLtotal + + def _bragg_peak_monitor_ltotal( + detector_beamline: RawDetector[RunType], + source_position: Position[snx.NXsource, RunType], + sample_position: Position[snx.NXsample, RunType], + gravity: GravityVector, + ) -> DetectorLtotal[RunType]: + """Compute detector Ltotal using straight line approximation.""" + from ess.reduce.time_of_flight import eto_to_tof + + return eto_to_tof.detector_ltotal_from_straight_line_approximation( + detector_beamline, # type: ignore[arg-type] + source_position=source_position, + sample_position=sample_position, + gravity=gravity, + ) + + def _make_bragg_peak_monitor_empty_detector() -> sc.DataArray: + """ + Create EmptyDetector geometry for the Bragg peak monitor. + + Hard-coded position approximately 1m upstream of the sample along the beam. + This is a placeholder until the actual geometry is in the NeXus file. + """ + # Position: 1m upstream of sample (sample is at origin, beam along -z) + position = sc.vector([0.0, 0.0, -1.0], unit='m') + return sc.DataArray( + data=sc.zeros(sizes={}, unit='counts'), + coords={ + 'position': position, + 'detector_number': sc.scalar(0, unit=None), + }, + ) + + @specs.bragg_peak_qmap_handle.attach_factory() + def _bragg_peak_qmap_workflow( + params: BraggPeakQMapParams, + ) -> StreamProcessorWorkflow: + wf = BifrostBraggPeakMonitorWorkflow() + wf[Filename[SampleRun]] = get_nexus_geometry_filename('bifrost') + wf[TimeOfFlightLookupTableFilename] = tof_lookup_table_simulation() + wf[PreopenNeXusFile] = PreopenNeXusFile(True) + + # Provide hard-coded Bragg peak monitor geometry + # (bypass NeXus loading for detector position) + wf[EmptyDetector[SampleRun]] = _make_bragg_peak_monitor_empty_detector() + + # Insert custom providers: + # - assemble_detector_data: bypasses McStas chain (NeXusData → RawDetector) + # - _bragg_peak_monitor_ltotal: straight-line Ltotal for monitor geometry + wf.insert(assemble_detector_data) + wf.insert(_bragg_peak_monitor_ltotal) + + # Set histogram bins from params + wf[QParallelBins] = params.q_parallel_bins.get_edges().rename(Q='Q_parallel') + wf[QPerpendicularBins] = params.q_perpendicular_bins.get_edges().rename( + Q='Q_perpendicular' + ) + + return StreamProcessorWorkflow( + wf, + dynamic_keys={'bragg_peak_monitor': NeXusData[NXdetector, SampleRun]}, + context_keys={ + 'detector_rotation': InstrumentAngle[SampleRun], + 'sample_rotation': SampleAngle[SampleRun], + }, + target_keys={'q_map': IntensityQparQperp[SampleRun]}, + accumulators=(IntensityQparQperp[SampleRun],), + ) + def _transpose_with_coords(data: sc.DataArray, dims: tuple[str, ...]) -> sc.DataArray: """ diff --git a/src/ess/livedata/config/instruments/bifrost/specs.py b/src/ess/livedata/config/instruments/bifrost/specs.py index b8a3ebb61..7deef3ce5 100644 --- a/src/ess/livedata/config/instruments/bifrost/specs.py +++ b/src/ess/livedata/config/instruments/bifrost/specs.py @@ -23,7 +23,7 @@ DetectorViewOutputs, DetectorViewParams, ) -from ess.livedata.parameter_models import EnergyEdges, QEdges +from ess.livedata.parameter_models import EnergyEdges, QEdges, QUnit # Arc energies in meV @@ -212,6 +212,41 @@ class BifrostAuxSources(AuxSourcesBase): ) +class BraggPeakAuxSources(AuxSourcesBase): + """Auxiliary source for Bragg peak monitor Q-map workflow.""" + + detector_rotation: Literal['detector_rotation'] = pydantic.Field( + default='detector_rotation', + description='Detector tank rotation angle (a4). ' + 'Required because the monitor is mounted on the detector tank.', + ) + sample_rotation: Literal['sample_rotation'] = pydantic.Field( + default='sample_rotation', description='Sample rotation angle (a3).' + ) + + +class BraggPeakQMapParams(pydantic.BaseModel): + """Parameters for Bragg peak monitor Q-map workflow.""" + + q_parallel_bins: QEdges = pydantic.Field( + default=QEdges(start=-3.0, stop=3.0, num_bins=100, unit=QUnit.INVERSE_ANGSTROM), + description="Q_parallel bin edges.", + ) + q_perpendicular_bins: QEdges = pydantic.Field( + default=QEdges(start=-3.0, stop=3.0, num_bins=100, unit=QUnit.INVERSE_ANGSTROM), + description="Q_perpendicular bin edges.", + ) + + +class BraggPeakQMapOutputs(WorkflowOutputsBase): + """Outputs for Bragg peak monitor Q-map workflow.""" + + q_map: sc.DataArray = pydantic.Field( + title='Q Map', + description='2D histogram in Q_parallel vs Q_perpendicular.', + ) + + class QMapOutputs(WorkflowOutputsBase): """Outputs for Bifrost Q-map workflows.""" @@ -315,3 +350,15 @@ class QMapOutputs(WorkflowOutputsBase): aux_sources=BifrostAuxSources, outputs=QMapOutputs, ) + +# Bragg peak monitor Q-map workflow +bragg_peak_qmap_handle = instrument.register_spec( + name='bragg_peak_qmap', + version=1, + title='Bragg peak Q map', + description='Q-map from Bragg peak monitor in Q_parallel vs Q_perpendicular.', + source_names=['bragg_peak_monitor'], + params=BraggPeakQMapParams, + aux_sources=BraggPeakAuxSources, + outputs=BraggPeakQMapOutputs, +) diff --git a/src/ess/livedata/config/instruments/bifrost/streams.py b/src/ess/livedata/config/instruments/bifrost/streams.py index f457b4545..a65229183 100644 --- a/src/ess/livedata/config/instruments/bifrost/streams.py +++ b/src/ess/livedata/config/instruments/bifrost/streams.py @@ -38,6 +38,10 @@ def _bifrost_generator() -> Generator[tuple[str, tuple[int, int]]]: detector_fakes = dict(_bifrost_generator()) +# Bragg peak monitor: single pixel detector with detector_number range 1-1 +# (conceptually similar to a monitor but uses detector event format) +detector_fakes['bragg_peak_monitor'] = (1, 1) + # Detector number configuration detector_number = sc.arange('detector_number', 1, 5 * 3 * 9 * 100 + 1, unit=None).fold( dim='detector_number', sizes={'arc': 5, 'tube': 3, 'channel': 9, 'pixel': 100} diff --git a/src/ess/livedata/kafka/message_adapter.py b/src/ess/livedata/kafka/message_adapter.py index aab2f45d1..a3ba6f02a 100644 --- a/src/ess/livedata/kafka/message_adapter.py +++ b/src/ess/livedata/kafka/message_adapter.py @@ -209,7 +209,12 @@ def adapt(self, message: KafkaMessage) -> Message[MonitorEvents]: class Ev44ToDetectorEventsAdapter( MessageAdapter[Message[eventdata_ev44.EventData], Message[DetectorEvents]] ): - def __init__(self, *, merge_detectors: bool = False): + def __init__( + self, + *, + merge_detectors: bool = False, + exclude_from_merge: frozenset[str] | None = None, + ): """ Parameters ---------- @@ -218,14 +223,18 @@ def __init__(self, *, merge_detectors: bool = False): useful for instruments with many detector banks that should be treated as a single bank. Note that event_id/detector_number must be unique across all detectors. + exclude_from_merge + Set of detector names to exclude from merging. These detectors will keep + their original stream name even when merge_detectors is True. """ self._merge_detectors = merge_detectors + self._exclude_from_merge = exclude_from_merge or frozenset() def adapt( self, message: Message[eventdata_ev44.EventData] ) -> Message[DetectorEvents]: stream = message.stream - if self._merge_detectors: + if self._merge_detectors and stream.name not in self._exclude_from_merge: stream = replace(stream, name='unified_detector') return Message( timestamp=message.timestamp, diff --git a/src/ess/livedata/kafka/routes.py b/src/ess/livedata/kafka/routes.py index 7cdf065e8..9ccf04db0 100644 --- a/src/ess/livedata/kafka/routes.py +++ b/src/ess/livedata/kafka/routes.py @@ -53,13 +53,19 @@ def with_beam_monitor_route(self) -> Self: def with_detector_route(self) -> Self: """Adds the detector route.""" + # Bifrost merges all detector banks into unified_detector, except for + # standalone detectors like bragg_peak_monitor that need separate streams. + is_bifrost = self._stream_mapping.instrument == 'bifrost' adapter = ChainedAdapter( first=KafkaToEv44Adapter( stream_lut=self._stream_mapping.detectors, stream_kind=StreamKind.DETECTOR_EVENTS, ), second=Ev44ToDetectorEventsAdapter( - merge_detectors=self._stream_mapping.instrument == 'bifrost' + merge_detectors=is_bifrost, + exclude_from_merge=frozenset({'bragg_peak_monitor'}) + if is_bifrost + else None, ), ) for topic in self._stream_mapping.detector_topics: diff --git a/tests/bifrost_test.py b/tests/bifrost_test.py index 9f6e96f16..97926962e 100644 --- a/tests/bifrost_test.py +++ b/tests/bifrost_test.py @@ -222,3 +222,103 @@ def test_ratemeter_includes_time_coordinate(self, bifrost_workflow): assert 'time' in result.coords assert result.coords['time'].unit == 'ns' + + +class TestBraggPeakQMap: + """Tests for the Bragg peak monitor Q-map workflow.""" + + def test_bragg_peak_qmap_workflow_can_be_created(self): + """Test that the workflow factory can create the workflow.""" + from ess.livedata.config import workflow_spec + from ess.livedata.config.instruments.bifrost import factories + from ess.livedata.config.instruments.bifrost.specs import instrument + + # Load factories + factories.setup_factories(instrument) + + # Get the workflow spec + workflow_id = workflow_spec.WorkflowId( + instrument='bifrost', + namespace='data_reduction', + name='bragg_peak_qmap', + version=1, + ) + config = workflow_spec.WorkflowConfig( + identifier=workflow_id, + aux_source_names={ + 'detector_rotation': 'detector_rotation', + 'sample_rotation': 'sample_rotation', + }, + ) + + # Try to create the workflow + workflow = instrument.workflow_factory.create( + source_name='bragg_peak_monitor', config=config + ) + assert workflow is not None + + def test_bragg_peak_qmap_workflow_can_accumulate_data(self): + """Test that the workflow can accumulate data without errors.""" + from ess.livedata.config import workflow_spec + from ess.livedata.config.instruments.bifrost import factories + from ess.livedata.config.instruments.bifrost.specs import instrument + + # Load factories + factories.setup_factories(instrument) + + # Get the workflow spec + workflow_id = workflow_spec.WorkflowId( + instrument='bifrost', + namespace='data_reduction', + name='bragg_peak_qmap', + version=1, + ) + config = workflow_spec.WorkflowConfig( + identifier=workflow_id, + aux_source_names={ + 'detector_rotation': 'detector_rotation', + 'sample_rotation': 'sample_rotation', + }, + ) + + # Create the workflow + workflow = instrument.workflow_factory.create( + source_name='bragg_peak_monitor', config=config + ) + + # Create minimal test data for bragg_peak_monitor + # Single pixel detector with some events + epoch = sc.epoch(unit='ns') + event_time_zero = epoch + sc.array( + dims=['event_time_zero'], values=[1_000_000], unit='ns', dtype='int64' + ) + event_id = sc.array(dims=['event'], values=[1], unit=None, dtype='int32') + event_time_offset = sc.array( + dims=['event'], values=[100], unit='ns', dtype='int64' + ) + weights = sc.ones(sizes={'event': 1}, dtype='float64', unit='counts') + events = sc.DataArray( + data=weights, coords={'event_time_offset': event_time_offset} + ) + events.coords['event_id'] = event_id + + sizes = sc.array(dims=['event_time_zero'], values=[1], unit=None, dtype='int64') + begin = sc.cumsum(sizes, mode='exclusive') + detector_data = sc.DataArray(sc.bins(begin=begin, dim='event', data=events)) + detector_data.coords['event_time_zero'] = event_time_zero + + # Create rotation data (as DataArrays, not scalars) + detector_rotation = sc.DataArray( + data=sc.scalar(90.0, unit='deg') + ) # a4 - detector tank at 90 deg + sample_rotation = sc.DataArray( + data=sc.scalar(45.0, unit='deg') + ) # a3 - sample rotation + + # Try to accumulate + data = { + 'bragg_peak_monitor': detector_data, + 'detector_rotation': detector_rotation, + 'sample_rotation': sample_rotation, + } + workflow.accumulate(data, start_time=0, end_time=1_000_000_000) diff --git a/tests/kafka/message_adapter_test.py b/tests/kafka/message_adapter_test.py index d53cec27f..2d068d8f6 100644 --- a/tests/kafka/message_adapter_test.py +++ b/tests/kafka/message_adapter_test.py @@ -285,6 +285,44 @@ def test_adapter_merge_detectors(self) -> None: assert result.stream.name == "unified_detector" assert isinstance(result.value, DetectorEvents) + def test_adapter_exclude_from_merge(self) -> None: + adapter = Ev44ToDetectorEventsAdapter( + merge_detectors=True, + exclude_from_merge=frozenset({'special_detector'}), + ) + + # Regular detector should be merged + regular_message = Message( + timestamp=1234, + stream=StreamId(kind=StreamKind.DETECTOR_EVENTS, name="detector1"), + value=eventdata_ev44.EventData( + source_name="detector1", + message_id=0, + reference_time=np.array([1234]), + reference_time_index=[0], + time_of_flight=np.array([123456]), + pixel_id=np.array([1]), + ), + ) + result = adapter.adapt(regular_message) + assert result.stream.name == "unified_detector" + + # Excluded detector should keep its name + excluded_message = Message( + timestamp=1234, + stream=StreamId(kind=StreamKind.DETECTOR_EVENTS, name="special_detector"), + value=eventdata_ev44.EventData( + source_name="special_detector", + message_id=0, + reference_time=np.array([1234]), + reference_time_index=[0], + time_of_flight=np.array([123456]), + pixel_id=np.array([1]), + ), + ) + result = adapter.adapt(excluded_message) + assert result.stream.name == "special_detector" + def message_with_schema(schema: str) -> KafkaMessage: """