diff --git a/scripts/screenshot_dashboard.py b/scripts/screenshot_dashboard.py new file mode 100644 index 000000000..dd052e1e2 --- /dev/null +++ b/scripts/screenshot_dashboard.py @@ -0,0 +1,253 @@ +#!/usr/bin/env python +""" +Take screenshots of the dashboard for visual verification. + +This script starts the dashboard with HTTP transport (no Kafka needed), +loads pre-configured workflows and plot grids from fixtures, injects +test data via the HTTP API, and captures screenshots using Playwright. + +Requirements: + - playwright (with chromium installed: python -m playwright install chromium) + +Usage: + python scripts/screenshot_dashboard.py --output-dir screenshots/ + + # With specific viewport width + python scripts/screenshot_dashboard.py --output-dir screenshots/ --width 1400 + +How it works: + 1. Sets LIVEDATA_CONFIG_DIR to point to fixture configs + 2. Starts dashboard with --transport=http (no Kafka needed) + 3. Fixture configs restore: + - Workflows with known job_numbers (from workflow_configs.yaml) + - Plot grids already subscribed to those workflows (from plot_grids.yaml) + 4. Injects test data with matching ResultKeys + 5. Captures screenshots via Playwright +""" + +import argparse +import logging +import os +import subprocess +import sys +import time +from collections.abc import Callable +from pathlib import Path + +# Configure logging +logging.basicConfig( + level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +# Timing constants +PAGE_RENDER_WAIT_MS = 3000 # milliseconds +DATA_INJECTION_WAIT_MS = 2000 # wait for data to propagate to plots + + +def wait_for_dashboard(port: int, timeout: float = 30.0) -> bool: + """Wait for dashboard to be ready.""" + import urllib.request + + start = time.time() + while time.time() - start < timeout: + try: + with urllib.request.urlopen( + f'http://localhost:{port}/', timeout=1.0 + ) as response: + if response.status == 200: + return True + except Exception: + time.sleep(0.5) + return False + + +def start_dashboard(instrument: str, port: int, config_dir: Path) -> subprocess.Popen: + """Start the dashboard in a subprocess with fixture config dir.""" + env = os.environ.copy() + env['LIVEDATA_CONFIG_DIR'] = str(config_dir) + + cmd = [ + sys.executable, + '-m', + 'ess.livedata.dashboard.reduction', + '--instrument', + instrument, + '--transport', + 'http', + '--log-level', + 'INFO', + ] + logger.info("Starting dashboard: %s", ' '.join(cmd)) + logger.info("LIVEDATA_CONFIG_DIR=%s", config_dir) + return subprocess.Popen( # noqa: S603 + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env + ) + + +def capture_screenshots( + port: int, + output_dir: Path, + width: int = 1200, + inject_data: Callable[[int], dict] | None = None, +) -> list[Path]: + """Capture screenshots of the dashboard using Playwright.""" + from playwright.sync_api import sync_playwright + + output_dir.mkdir(parents=True, exist_ok=True) + screenshots = [] + + with sync_playwright() as p: + browser = p.chromium.launch() + page = browser.new_page(viewport={'width': width, 'height': 900}) + + # Navigate to dashboard + logger.info("Loading dashboard...") + page.goto(f'http://localhost:{port}', wait_until='networkidle') + # Wait for periodic callback to start (triggered by browser connection) + page.wait_for_timeout(500) + + # Inject data NOW - after browser connects (so periodic callbacks run) + # but before navigating to plot tabs + if inject_data is not None: + logger.info("Injecting test data after browser connection...") + inject_data(port) + # Wait for data to be consumed and plots to be created + page.wait_for_timeout(PAGE_RENDER_WAIT_MS) + else: + page.wait_for_timeout(PAGE_RENDER_WAIT_MS) + + # Take screenshot of initial state (Jobs tab) + screenshot_path = output_dir / 'dashboard_jobs_tab.png' + page.screenshot(path=str(screenshot_path), full_page=True) + logger.info("Saved: %s", screenshot_path) + screenshots.append(screenshot_path) + + # Click on "Manage Plots" tab + manage_tab = page.locator('text=Manage Plots') + if manage_tab.count() > 0: + manage_tab.click() + page.wait_for_timeout(1000) + + screenshot_path = output_dir / 'dashboard_manage_tab.png' + page.screenshot(path=str(screenshot_path), full_page=True) + logger.info("Saved: %s", screenshot_path) + screenshots.append(screenshot_path) + + # Look for existing grid tabs loaded from config + # Panel tabs use a specific structure - look for the clickable tab element + # The tabs appear as divs with the title text, not as proper tab roles + detectors_tab = page.locator('.bk-tab:has-text("Detectors")') + if detectors_tab.count() > 0: + logger.info("Found 'Detectors' grid tab, clicking...") + detectors_tab.click() + page.wait_for_timeout(DATA_INJECTION_WAIT_MS) + + screenshot_path = output_dir / 'dashboard_detectors_grid.png' + page.screenshot(path=str(screenshot_path), full_page=True) + logger.info("Saved: %s", screenshot_path) + screenshots.append(screenshot_path) + else: + logger.info("No 'Detectors' tab found - grid may not have been loaded") + + browser.close() + + return screenshots + + +def main(): + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + '--output-dir', + type=Path, + default=Path('screenshots'), + help='Output directory for screenshots (default: screenshots/)', + ) + parser.add_argument( + '--port', + type=int, + default=5009, + help='Dashboard port (default: 5009)', + ) + parser.add_argument( + '--width', + type=int, + default=1200, + help='Viewport width in pixels (default: 1200)', + ) + parser.add_argument( + '--instrument', + type=str, + default='dummy', + help='Instrument name (default: dummy)', + ) + + args = parser.parse_args() + + # Get the fixtures directory (relative to this script) + script_dir = Path(__file__).parent + fixtures_dir = script_dir / 'screenshot_fixtures' + + if not fixtures_dir.exists(): + logger.error("Fixtures directory not found: %s", fixtures_dir) + return 1 + + # Start dashboard with fixture config dir + process = start_dashboard(args.instrument, args.port, fixtures_dir) + + try: + # Wait for dashboard to be ready + logger.info("Waiting for dashboard to start...") + if not wait_for_dashboard(args.port, timeout=30.0): + logger.error("Dashboard failed to start") + # Print any stderr output + stderr = process.stderr.read().decode('utf-8', errors='replace') + if stderr: + logger.error("Dashboard stderr:\n%s", stderr) + process.terminate() + return 1 + + logger.info("Dashboard is ready") + + # Give a moment for configs to load + time.sleep(1) + + # Import fixture modules to register data generators + # The import triggers @fixture_registry.register decorators + sys.path.insert(0, str(script_dir)) + import screenshot_fixtures.dummy # noqa: F401 + from screenshot_fixtures import inject_fixtures + + # Create injection function bound to the instrument and fixtures_dir + def inject_data(port: int) -> dict: + return inject_fixtures( + port=port, fixtures_dir=fixtures_dir, instrument=args.instrument + ) + + # Capture screenshots (data is injected after browser connects) + screenshots = capture_screenshots( + port=args.port, + output_dir=args.output_dir, + width=args.width, + inject_data=inject_data, + ) + + logger.info("Screenshots saved to %s", args.output_dir) + logger.info("Captured %d screenshot(s)", len(screenshots)) + return 0 + + finally: + # Clean up + logger.info("Shutting down dashboard...") + process.terminate() + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + + +if __name__ == '__main__': + sys.exit(main() or 0) diff --git a/scripts/screenshot_fixtures/README.md b/scripts/screenshot_fixtures/README.md new file mode 100644 index 000000000..263aa64c9 --- /dev/null +++ b/scripts/screenshot_fixtures/README.md @@ -0,0 +1,123 @@ +# Screenshot Fixtures + +This directory contains fixture configurations and data generators for screenshot testing. +The system allows testing the dashboard with pre-configured plots and synthetic data, +without requiring Kafka. + +## Directory Structure + +``` +screenshot_fixtures/ +├── __init__.py # Core registry and injection logic +├── README.md # This file +└── / # Per-instrument fixtures + ├── __init__.py # Data generators (register with @fixture_registry.register) + ├── workflow_configs.yaml # Workflow configurations with fixed job_numbers + └── plot_configs.yaml # Plot grid configurations +``` + +## How It Works + +1. **workflow_configs.yaml** defines workflows with fixed job_numbers (UUIDs) +2. **plot_configs.yaml** defines plot grids that subscribe to those workflows +3. **Data generators** are Python functions that create `sc.DataArray` test data +4. The **fixture registry** matches generators to workflows by workflow_id +5. **inject_fixtures()** reads configs, calls generators, and POSTs data to the dashboard + +## Adding a New Screenshot Scenario + +### Step 1: Define the workflow in workflow_configs.yaml + +```yaml +# Use a fixed UUID so data injection matches +dummy/detector_data/my_workflow/1: + source_names: + - my_detector + params: {} + aux_source_names: {} + current_job: + job_number: "00000000-0000-0000-0000-000000000003" + jobs: + my_detector: + params: {} + aux_source_names: {} +``` + +### Step 2: Add the plot to plot_configs.yaml + +```yaml +plot_grids: + grids: + - title: My Plot + nrows: 1 + ncols: 1 + cells: + - geometry: + row: 0 + col: 0 + config: + workflow_id: dummy/detector_data/my_workflow/1 + output_name: current + source_names: + - my_detector + plot_name: image + params: + # ... plot parameters +``` + +### Step 3: Register a data generator + +In `/__init__.py`: + +```python +from screenshot_fixtures import fixture_registry +import numpy as np +import scipp as sc + +# Example: 2D image data (for plot_name: image) +@fixture_registry.register('dummy/detector_data/my_workflow/1') +def make_my_detector_data() -> sc.DataArray: + """Create 2D test data for an image plot.""" + data = np.random.default_rng(42).poisson(100, size=(64, 64)).astype(float) + return sc.DataArray( + sc.array(dims=['y', 'x'], values=data, unit='counts'), + coords={ + 'x': sc.arange('x', 0.0, 64.0, unit=None), + 'y': sc.arange('y', 0.0, 64.0, unit=None), + }, + ) + +# Example: 1D timeseries data (for plot_name: line) +@fixture_registry.register('dummy/data_reduction/my_timeseries/1') +def make_my_timeseries_data() -> sc.DataArray: + """Create 1D test data for a line plot.""" + time = np.linspace(0, 100, 100) + counts = 500 * np.exp(-time / 30) + 50 + return sc.DataArray( + sc.array(dims=['time'], values=counts, unit='counts'), + coords={ + 'time': sc.array(dims=['time'], values=time, unit='s'), + }, + ) +``` + +### Step 4: Import the fixture module + +In `screenshot_dashboard.py`, ensure the instrument's fixture module is imported: + +```python +import screenshot_fixtures. # noqa: F401 +``` + +## Running Screenshots + +```bash +python scripts/screenshot_dashboard.py --output-dir screenshots/ --instrument dummy +``` + +## Key Points + +- **Single source of truth**: Job numbers are defined only in workflow_configs.yaml +- **No boilerplate**: Just write a function that returns a DataArray and decorate it +- **Automatic matching**: The registry matches generators to workflows by workflow_id string +- **Extensible**: Add new instruments by creating a new subdirectory with the same structure diff --git a/scripts/screenshot_fixtures/__init__.py b/scripts/screenshot_fixtures/__init__.py new file mode 100644 index 000000000..3395c612d --- /dev/null +++ b/scripts/screenshot_fixtures/__init__.py @@ -0,0 +1,276 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +""" +Screenshot test fixtures for the HTTP transport. + +This module provides a registry-based system for defining test data fixtures. +Data generators are registered by workflow_id and output_name, and the system +automatically matches them with workflow configurations to inject data. + +Usage: + # In your fixture module (e.g., dummy/__init__.py): + from scripts.screenshot_fixtures import fixture_registry + + @fixture_registry.register('dummy/detector_data/panel_0_xy/1', output='current') + def make_panel_0_data() -> sc.DataArray: + return sc.DataArray(...) + + # To inject all registered fixtures: + from scripts.screenshot_fixtures import inject_fixtures + inject_fixtures(port=5009, fixtures_dir=Path(...)) +""" + +from __future__ import annotations + +import base64 +import json +import logging +import time +import urllib.request +from collections.abc import Callable +from dataclasses import dataclass, field +from pathlib import Path +from typing import TYPE_CHECKING +from uuid import UUID + +import yaml +from streaming_data_types import dataarray_da00 + +from ess.livedata.config.workflow_spec import JobId, ResultKey, WorkflowId +from ess.livedata.kafka.scipp_da00_compat import scipp_to_da00 + +if TYPE_CHECKING: + import scipp as sc + +logger = logging.getLogger(__name__) + +# Directory containing the fixture config files +FIXTURES_DIR = Path(__file__).parent + +DataGenerator = Callable[[], 'sc.DataArray'] + + +@dataclass +class FixtureKey: + """Key for looking up a data generator in the registry.""" + + workflow_id: str # e.g., 'dummy/detector_data/panel_0_xy/1' + output_name: str = 'current' + + def __hash__(self) -> int: + return hash((self.workflow_id, self.output_name)) + + +@dataclass +class FixtureRegistry: + """ + Registry for screenshot data generators. + + Data generators are functions that return sc.DataArray. They are registered + by workflow_id and output_name. When fixtures are injected, the registry + looks up the appropriate generator for each workflow/output combination. + """ + + _generators: dict[FixtureKey, DataGenerator] = field(default_factory=dict) + + def register( + self, workflow_id: str, *, output: str = 'current' + ) -> Callable[[DataGenerator], DataGenerator]: + """ + Decorator to register a data generator. + + Parameters + ---------- + workflow_id: + The workflow ID string, e.g., 'dummy/detector_data/panel_0_xy/1' + output: + The output name, e.g., 'current'. Defaults to 'current'. + + Example + ------- + @fixture_registry.register('dummy/detector_data/panel_0_xy/1') + def make_panel_0_data() -> sc.DataArray: + return sc.DataArray(...) + """ + key = FixtureKey(workflow_id=workflow_id, output_name=output) + + def decorator(func: DataGenerator) -> DataGenerator: + self._generators[key] = func + return func + + return decorator + + def get_generator( + self, workflow_id: str, output_name: str = 'current' + ) -> DataGenerator | None: + """Get a registered generator, or None if not found.""" + key = FixtureKey(workflow_id=workflow_id, output_name=output_name) + return self._generators.get(key) + + def clear(self) -> None: + """Clear all registered generators (useful for testing).""" + self._generators.clear() + + +# Global registry instance +fixture_registry = FixtureRegistry() + + +@dataclass +class WorkflowFixtureInfo: + """Information extracted from workflow_configs.yaml for a single workflow.""" + + workflow_id: WorkflowId + job_number: UUID + source_names: list[str] + + +def load_workflow_configs( + fixtures_dir: Path, instrument: str +) -> list[WorkflowFixtureInfo]: + """ + Load workflow configurations from YAML and extract fixture-relevant info. + + Parameters + ---------- + fixtures_dir: + Directory containing instrument subdirectories with config files. + instrument: + Instrument name (subdirectory name). + + Returns + ------- + : + List of workflow fixture info extracted from workflow_configs.yaml. + """ + config_path = fixtures_dir / instrument / 'workflow_configs.yaml' + if not config_path.exists(): + logger.warning("No workflow_configs.yaml found at %s", config_path) + return [] + + with open(config_path) as f: + configs = yaml.safe_load(f) or {} + + results = [] + for workflow_id_str, config in configs.items(): + try: + workflow_id = WorkflowId.from_string(workflow_id_str) + current_job = config.get('current_job', {}) + job_number_str = current_job.get('job_number') + if job_number_str is None: + logger.warning( + "No job_number in current_job for %s, skipping", workflow_id_str + ) + continue + + # Get source_names from current_job.jobs keys (the actual running jobs) + jobs = current_job.get('jobs', {}) + source_names = list(jobs.keys()) + + results.append( + WorkflowFixtureInfo( + workflow_id=workflow_id, + job_number=UUID(job_number_str), + source_names=source_names, + ) + ) + except Exception as e: + logger.warning("Failed to parse workflow config %s: %s", workflow_id_str, e) + + return results + + +def serialize_to_da00(result_key: ResultKey, data: sc.DataArray) -> bytes: + """Serialize a DataArray to da00 format with the given ResultKey.""" + return dataarray_da00.serialise_da00( + source_name=result_key.model_dump_json(), + timestamp_ns=int(time.time_ns()), + data=scipp_to_da00(data), + ) + + +def inject_data(port: int, payload: bytes) -> dict: + """Inject data via HTTP POST to the dashboard.""" + url = f'http://localhost:{port}/api/data' + body = json.dumps({'payload_base64': base64.b64encode(payload).decode('utf-8')}) + req = urllib.request.Request( # noqa: S310 + url, + data=body.encode('utf-8'), + headers={'Content-Type': 'application/json'}, + method='POST', + ) + with urllib.request.urlopen(req, timeout=5.0) as response: # noqa: S310 + return json.loads(response.read().decode('utf-8')) + + +def inject_fixtures( + port: int = 5009, + fixtures_dir: Path = FIXTURES_DIR, + instrument: str = 'dummy', + registry: FixtureRegistry | None = None, +) -> dict[str, dict]: + """ + Inject all registered fixtures for the given instrument. + + This function: + 1. Loads workflow_configs.yaml to get job numbers and source names + 2. For each workflow, looks up registered data generators + 3. Generates data and injects it via the HTTP API + + Parameters + ---------- + port: + Dashboard port number. + fixtures_dir: + Directory containing instrument subdirectories with config files. + instrument: + Instrument name. + registry: + Fixture registry to use. Defaults to the global fixture_registry. + + Returns + ------- + : + Dict mapping "{workflow_id}/{source_name}/{output}" to injection results. + """ + if registry is None: + registry = fixture_registry + + workflow_infos = load_workflow_configs(fixtures_dir, instrument) + results: dict[str, dict] = {} + + for info in workflow_infos: + workflow_id_str = str(info.workflow_id) + + # For now, we only support 'current' output, but the system is extensible + output_name = 'current' + generator = registry.get_generator(workflow_id_str, output_name) + + if generator is None: + logger.debug( + "No generator registered for %s/%s, skipping", + workflow_id_str, + output_name, + ) + continue + + # Generate data once, inject for each source_name + data = generator() + + for source_name in info.source_names: + result_key = ResultKey( + workflow_id=info.workflow_id, + job_id=JobId(job_number=info.job_number, source_name=source_name), + output_name=output_name, + ) + payload = serialize_to_da00(result_key, data) + + key = f"{workflow_id_str}/{source_name}/{output_name}" + try: + results[key] = inject_data(port, payload) + logger.info("Injected data for %s", key) + except Exception as e: + logger.error("Failed to inject data for %s: %s", key, e) + results[key] = {'error': str(e)} + + return results diff --git a/scripts/screenshot_fixtures/dummy/__init__.py b/scripts/screenshot_fixtures/dummy/__init__.py new file mode 100644 index 000000000..85f9041a2 --- /dev/null +++ b/scripts/screenshot_fixtures/dummy/__init__.py @@ -0,0 +1,93 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +""" +Data generators for the dummy instrument screenshot fixtures. + +This module registers data generators for each workflow defined in +workflow_configs.yaml. The generators create synthetic test data +for screenshot testing. +""" + +import numpy as np +import scipp as sc + +from screenshot_fixtures import fixture_registry + + +def make_gaussian_blob( + shape: tuple[int, int] = (64, 64), + center: tuple[float, float] | None = None, + sigma: float | None = None, + amplitude: float = 500.0, + noise_level: float = 5.0, + seed: int = 42, +) -> np.ndarray: + """Create a 2D Gaussian blob with Poisson noise.""" + if center is None: + center = (shape[0] / 2, shape[1] / 2) + if sigma is None: + sigma = min(shape) / 6 + + y, x = np.ogrid[: shape[0], : shape[1]] + blob = np.exp(-((y - center[0]) ** 2 + (x - center[1]) ** 2) / (2 * sigma**2)) + rng = np.random.default_rng(seed) + noise = rng.poisson(lam=noise_level, size=shape) + return (blob * amplitude + noise).astype(np.float64) + + +@fixture_registry.register('dummy/detector_data/panel_0_xy/1') +def make_panel_0_data(shape: tuple[int, int] = (64, 64)) -> sc.DataArray: + """Create test data for panel_0 detector.""" + data = make_gaussian_blob(shape, center=(shape[0] * 0.4, shape[1] * 0.6), seed=42) + return sc.DataArray( + sc.array(dims=['y', 'x'], values=data, unit='counts'), + coords={ + 'x': sc.arange('x', 0.0, float(shape[1]), unit=None), + 'y': sc.arange('y', 0.0, float(shape[0]), unit=None), + }, + ) + + +@fixture_registry.register('dummy/detector_data/area_panel_xy/1') +def make_area_panel_data(shape: tuple[int, int] = (128, 128)) -> sc.DataArray: + """Create test data for area_panel detector (larger with multiple blobs).""" + blob1 = make_gaussian_blob( + shape, center=(shape[0] * 0.3, shape[1] * 0.3), sigma=15, seed=42 + ) + blob2 = make_gaussian_blob( + shape, center=(shape[0] * 0.7, shape[1] * 0.6), sigma=20, seed=43 + ) + blob3 = make_gaussian_blob( + shape, center=(shape[0] * 0.5, shape[1] * 0.8), sigma=10, amplitude=300, seed=44 + ) + data = blob1 + blob2 + blob3 + + return sc.DataArray( + sc.array(dims=['y', 'x'], values=data, unit='counts'), + coords={ + 'x': sc.arange('x', 0.0, float(shape[1]), unit=None), + 'y': sc.arange('y', 0.0, float(shape[0]), unit=None), + }, + ) + + +@fixture_registry.register('dummy/data_reduction/monitor_timeseries/1') +def make_monitor_timeseries_data(npoints: int = 100) -> sc.DataArray: + """Create test timeseries data for monitor1 with exponential decay pattern.""" + # Create time points from 0 to 100 seconds + time = np.linspace(0, 100, npoints) + + # Create exponential decay pattern: starts at 1000 counts, decays to ~100 + counts = 900 * np.exp(-time / 30) + 100 + + # Add some realistic noise (Poisson-like) + rng = np.random.default_rng(42) + noise = rng.normal(0, np.sqrt(counts) * 0.05) + counts = counts + noise + + return sc.DataArray( + sc.array(dims=['time'], values=counts, unit='counts'), + coords={ + 'time': sc.array(dims=['time'], values=time, unit='s'), + }, + ) diff --git a/scripts/screenshot_fixtures/dummy/plot_configs.yaml b/scripts/screenshot_fixtures/dummy/plot_configs.yaml new file mode 100644 index 000000000..1796caf73 --- /dev/null +++ b/scripts/screenshot_fixtures/dummy/plot_configs.yaml @@ -0,0 +1,64 @@ +plot_grids: + grids: + - title: Detectors + nrows: 2 + ncols: 2 + cells: + - geometry: + row: 0 + col: 0 + row_span: 2 + col_span: 1 + config: + workflow_id: dummy/detector_data/area_panel_xy/1 + output_name: current + source_names: + - area_panel + plot_name: image + params: + layout: + combine_mode: layout + layout_columns: 2 + plot_aspect: + aspect_type: Square + ratio: 1.0 + stretch_mode: Fill width + window: + mode: latest + window_duration_seconds: 1.0 + aggregation: auto + plot_scale: + x_scale: linear + y_scale: linear + color_scale: log + - geometry: + row: 0 + col: 1 + row_span: 2 + col_span: 1 + config: + workflow_id: dummy/detector_data/panel_0_xy/1 + output_name: current + source_names: + - panel_0 + plot_name: image + params: + layout: + combine_mode: layout + layout_columns: 2 + plot_aspect: + aspect_type: Square + ratio: 1.0 + stretch_mode: Fill width + window: + mode: latest + window_duration_seconds: 1.0 + aggregation: auto + plot_scale: + x_scale: linear + y_scale: linear + color_scale: log + - title: Monitor Timeseries + nrows: 1 + ncols: 1 + cells: [] diff --git a/scripts/screenshot_fixtures/dummy/workflow_configs.yaml b/scripts/screenshot_fixtures/dummy/workflow_configs.yaml new file mode 100644 index 000000000..3829caec5 --- /dev/null +++ b/scripts/screenshot_fixtures/dummy/workflow_configs.yaml @@ -0,0 +1,44 @@ +# Workflow configs for screenshot testing. +# Uses a fixed, well-known job_number so we can inject matching data. +# +# The job_number is a deterministic UUID derived from a constant string. +# This same UUID must be used in the test data injection. + +# Panel 0 XY detector workflow +dummy/detector_data/panel_0_xy/1: + source_names: + - panel_0 + params: {} + aux_source_names: {} + current_job: + job_number: "00000000-0000-0000-0000-000000000001" + jobs: + panel_0: + params: {} + aux_source_names: {} + +# Area Panel XY detector workflow +dummy/detector_data/area_panel_xy/1: + source_names: + - area_panel + params: {} + aux_source_names: {} + current_job: + job_number: "00000000-0000-0000-0000-000000000002" + jobs: + area_panel: + params: {} + aux_source_names: {} + +# Monitor timeseries workflow +dummy/data_reduction/monitor_timeseries/1: + source_names: + - monitor1 + params: {} + aux_source_names: {} + current_job: + job_number: "00000000-0000-0000-0000-000000000003" + jobs: + monitor1: + params: {} + aux_source_names: {} diff --git a/src/ess/livedata/dashboard/dashboard.py b/src/ess/livedata/dashboard/dashboard.py index c9ec0db7b..d45a72834 100644 --- a/src/ess/livedata/dashboard/dashboard.py +++ b/src/ess/livedata/dashboard/dashboard.py @@ -13,6 +13,7 @@ from .config_store import ConfigStoreManager from .dashboard_services import DashboardServices +from .http_transport import HttpTransport from .kafka_transport import DashboardKafkaTransport from .transport import NullTransport, Transport @@ -38,11 +39,13 @@ def __init__( self._instrument = instrument self._port = port self._dev = dev + self._transport_type = transport self._exit_stack = ExitStack() self._exit_stack.__enter__() self._callback = None + self._transport = self._create_transport(transport) # Config store manager for file-backed persistent UI state (GUI dashboards) config_manager = ConfigStoreManager(instrument=instrument, store_type='file') @@ -54,7 +57,7 @@ def __init__( exit_stack=self._exit_stack, logger=self._logger, pipe_factory=streams.Pipe, - transport=self._create_transport(transport), + transport=self._transport, config_manager=config_manager, ) @@ -70,7 +73,7 @@ def _create_transport(self, transport: str) -> Transport: Parameters ---------- transport: - Transport type ('kafka' or 'none') + Transport type ('kafka', 'none', or 'http') Returns ------- @@ -83,6 +86,8 @@ def _create_transport(self, transport: str) -> Transport: ) elif transport == 'none': return NullTransport() + elif transport == 'http': + return HttpTransport(instrument=self._instrument, logger=self._logger) else: raise ValueError(f"Unknown transport type: {transport}") @@ -154,6 +159,35 @@ def create_layout(self) -> pn.template.MaterialTemplate: self.start_periodic_updates() return template + def _get_extra_patterns(self) -> list: + """ + Get extra Tornado URL patterns for the server. + + Returns patterns for the HTTP data injection endpoint when using + HTTP transport. + """ + if self._transport_type != 'http': + return [] + + from tornado.web import RequestHandler + + # Capture transport reference for the handler + http_transport = self._transport + + class DataInjectionHandler(RequestHandler): + """Tornado handler for POST /api/data endpoint.""" + + def set_default_headers(self): + self.set_header('Content-Type', 'application/json') + + def post(self): + result = http_transport.handle_post_request(self.request.body) + if result.get('status') == 'error': + self.set_status(400) + self.write(result) + + return [(r'/api/data', DataInjectionHandler)] + @property def server(self): """Get the Panel server for WSGI deployment.""" @@ -163,6 +197,7 @@ def server(self): show=False, autoreload=False, dev=self._dev, + extra_patterns=self._get_extra_patterns(), ) def _start_impl(self) -> None: @@ -181,6 +216,7 @@ def run_forever(self) -> None: show=False, autoreload=True, dev=self._dev, + extra_patterns=self._get_extra_patterns(), ) except KeyboardInterrupt: self._logger.info("Keyboard interrupt received, shutting down...") diff --git a/src/ess/livedata/dashboard/http_transport.py b/src/ess/livedata/dashboard/http_transport.py new file mode 100644 index 000000000..e0517becf --- /dev/null +++ b/src/ess/livedata/dashboard/http_transport.py @@ -0,0 +1,199 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +"""HTTP-based transport implementation for the dashboard. + +This transport allows injecting data via HTTP POST requests instead of consuming +from Kafka. Useful for testing, development, and screenshot generation without +requiring a Kafka broker. + +The HTTP endpoint accepts da00-serialized messages, using the same format as Kafka. +""" + +import base64 +import json +import logging +import threading +from collections.abc import Sequence +from types import TracebackType + +import scipp as sc +from streaming_data_types import dataarray_da00 + +from ..core.message import Message, StreamId, StreamKind +from ..kafka.scipp_da00_compat import da00_to_scipp +from .transport import DashboardResources, NullMessageSink, Transport + + +class QueueableMessageSource: + """ + Message source that returns messages from an internal queue. + + Messages can be injected via the `queue_message` method and will be + returned by subsequent calls to `get_messages`. + """ + + def __init__(self) -> None: + self._messages: list[Message[sc.DataArray]] = [] + self._lock = threading.Lock() + + def queue_message(self, message: Message[sc.DataArray]) -> None: + """Add a message to the queue.""" + with self._lock: + self._messages.append(message) + + def queue_messages(self, messages: Sequence[Message[sc.DataArray]]) -> None: + """Add multiple messages to the queue.""" + with self._lock: + self._messages.extend(messages) + + def get_messages(self) -> Sequence[Message[sc.DataArray]]: + """Return and clear all queued messages.""" + with self._lock: + messages = self._messages + self._messages = [] + return messages + + +def deserialize_da00_to_message( + payload: bytes, timestamp_ns: int | None = None +) -> Message[sc.DataArray]: + """ + Deserialize a da00 payload to a Message with sc.DataArray value. + + Parameters + ---------- + payload: + Raw da00 bytes as received from Kafka or HTTP. + timestamp_ns: + Optional timestamp override. If None, uses timestamp from da00. + + Returns + ------- + : + Message with stream ID derived from da00 source_name and sc.DataArray value. + """ + da00: dataarray_da00.da00_DataArray_t + da00 = dataarray_da00.deserialise_da00(payload) # type: ignore[reportAssignmentType] + + timestamp = timestamp_ns if timestamp_ns is not None else da00.timestamp_ns + stream_id = StreamId(kind=StreamKind.LIVEDATA_DATA, name=da00.source_name) + value = da00_to_scipp(da00.data) + + return Message(timestamp=timestamp, stream=stream_id, value=value) + + +class HttpTransport(Transport[DashboardResources]): + """ + HTTP-based transport for the dashboard. + + Provides a message source that can receive data via HTTP POST requests. + The HTTP endpoint is set up when the Panel server starts, using Panel's + REST endpoint mechanism. + + Parameters + ---------- + instrument: + Instrument name (e.g., 'dummy', 'dream', 'bifrost') + logger: + Logger instance for logging + """ + + def __init__( + self, + *, + instrument: str, + logger: logging.Logger | None = None, + ): + self._instrument = instrument + self._logger = logger or logging.getLogger(__name__) + self._message_source = QueueableMessageSource() + + def __enter__(self) -> DashboardResources: + """Set up HTTP transport and return dashboard resources.""" + self._logger.info("HttpTransport initialized for %s", self._instrument) + + return DashboardResources( + message_source=self._message_source, + command_sink=NullMessageSink(), + roi_sink=NullMessageSink(), + ) + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + """Clean up HTTP transport resources.""" + self._logger.info("HttpTransport cleaned up") + + def start(self) -> None: + """Start the transport (no-op for HTTP, endpoint is always available).""" + pass + + def stop(self) -> None: + """Stop the transport (no-op for HTTP).""" + pass + + @property + def message_source(self) -> QueueableMessageSource: + """Get the message source for direct injection.""" + return self._message_source + + def inject_da00(self, payload: bytes, timestamp_ns: int | None = None) -> None: + """ + Inject a da00-serialized message directly. + + Parameters + ---------- + payload: + Raw da00 bytes. + timestamp_ns: + Optional timestamp override. + """ + message = deserialize_da00_to_message(payload, timestamp_ns) + self._message_source.queue_message(message) + self._logger.debug("Injected da00 message for stream %s", message.stream) + + def inject_from_json(self, json_data: dict) -> None: + """ + Inject a message from JSON payload. + + Expected format: + { + "payload_base64": "", + "timestamp_ns": + } + + Parameters + ---------- + json_data: + JSON dict with payload_base64 and optional timestamp_ns. + """ + payload = base64.b64decode(json_data['payload_base64']) + timestamp_ns = json_data.get('timestamp_ns') + self.inject_da00(payload, timestamp_ns) + + def handle_post_request(self, body: bytes) -> dict: + """ + Handle an HTTP POST request body. + + This method is called by the Panel REST endpoint handler. + + Parameters + ---------- + body: + Raw request body (JSON). + + Returns + ------- + : + Response dict with status. + """ + try: + json_data = json.loads(body) + self.inject_from_json(json_data) + return {'status': 'ok'} + except Exception as e: + self._logger.exception("Error handling POST request") + return {'status': 'error', 'message': str(e)} diff --git a/src/ess/livedata/dashboard/reduction.py b/src/ess/livedata/dashboard/reduction.py index 597b1a7da..c7ad4e1f6 100644 --- a/src/ess/livedata/dashboard/reduction.py +++ b/src/ess/livedata/dashboard/reduction.py @@ -101,9 +101,9 @@ def get_arg_parser() -> argparse.ArgumentParser: parser = Service.setup_arg_parser(description='ESSlivedata Dashboard') parser.add_argument( '--transport', - choices=['kafka', 'none'], + choices=['kafka', 'none', 'http'], default='kafka', - help='Transport backend for message handling', + help='Transport backend for message handling (http enables POST /api/data)', ) return parser diff --git a/tests/dashboard/http_transport_test.py b/tests/dashboard/http_transport_test.py new file mode 100644 index 000000000..22cd390ab --- /dev/null +++ b/tests/dashboard/http_transport_test.py @@ -0,0 +1,235 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +"""Tests for HttpTransport.""" + +import base64 +import time +import uuid + +import numpy as np +import scipp as sc +from streaming_data_types import dataarray_da00 + +from ess.livedata.config.workflow_spec import JobId, ResultKey, WorkflowId +from ess.livedata.core.message import StreamKind +from ess.livedata.dashboard.http_transport import ( + HttpTransport, + QueueableMessageSource, + deserialize_da00_to_message, +) +from ess.livedata.kafka.scipp_da00_compat import scipp_to_da00 + + +def make_test_dataarray() -> sc.DataArray: + """Create a simple test DataArray.""" + return sc.DataArray( + sc.array(dims=['x', 'y'], values=np.arange(6).reshape(2, 3), unit='counts'), + coords={ + 'x': sc.array(dims=['x'], values=[0.0, 1.0], unit='m'), + 'y': sc.array(dims=['y'], values=[0.0, 1.0, 2.0], unit='m'), + }, + ) + + +def serialize_dataarray_to_da00(source_name: str, data: sc.DataArray) -> bytes: + """Serialize a DataArray to da00 format using the same method as KafkaSink.""" + return dataarray_da00.serialise_da00( + source_name=source_name, + timestamp_ns=int(time.time_ns()), + data=scipp_to_da00(data), + ) + + +class TestQueueableMessageSource: + def test_empty_source_returns_empty_list(self) -> None: + source = QueueableMessageSource() + assert source.get_messages() == [] + + def test_queue_message_returns_on_get(self) -> None: + source = QueueableMessageSource() + + # Create a test message using da00 deserialization + result_key = ResultKey( + workflow_id=WorkflowId( + instrument='test', namespace='ns', name='workflow', version=1 + ), + job_id=JobId(job_number=uuid.uuid4(), source_name='detector1'), + output_name='result', + ) + payload = serialize_dataarray_to_da00( + source_name=result_key.model_dump_json(), + data=make_test_dataarray(), + ) + message = deserialize_da00_to_message(payload) + source.queue_message(message) + + messages = source.get_messages() + assert len(messages) == 1 + assert messages[0].stream.kind == StreamKind.LIVEDATA_DATA + + def test_get_messages_clears_queue(self) -> None: + source = QueueableMessageSource() + + result_key = ResultKey( + workflow_id=WorkflowId( + instrument='test', namespace='ns', name='workflow', version=1 + ), + job_id=JobId(job_number=uuid.uuid4(), source_name='detector1'), + output_name='result', + ) + payload = serialize_dataarray_to_da00( + source_name=result_key.model_dump_json(), + data=make_test_dataarray(), + ) + message = deserialize_da00_to_message(payload) + source.queue_message(message) + + # First call returns messages + messages = source.get_messages() + assert len(messages) == 1 + + # Second call returns empty + messages = source.get_messages() + assert len(messages) == 0 + + +class TestDeserializeDa00ToMessage: + def test_deserialize_creates_valid_message(self) -> None: + result_key = ResultKey( + workflow_id=WorkflowId( + instrument='dummy', namespace='detector', name='view', version=1 + ), + job_id=JobId(job_number=uuid.uuid4(), source_name='panel_0'), + output_name='current', + ) + data = make_test_dataarray() + payload = serialize_dataarray_to_da00( + source_name=result_key.model_dump_json(), + data=data, + ) + + message = deserialize_da00_to_message(payload) + + assert message.stream.kind == StreamKind.LIVEDATA_DATA + assert message.stream.name == result_key.model_dump_json() + assert isinstance(message.value, sc.DataArray) + assert message.value.dims == data.dims + + def test_timestamp_override(self) -> None: + result_key = ResultKey( + workflow_id=WorkflowId( + instrument='test', namespace='ns', name='workflow', version=1 + ), + job_id=JobId(job_number=uuid.uuid4(), source_name='detector1'), + output_name='result', + ) + payload = serialize_dataarray_to_da00( + source_name=result_key.model_dump_json(), + data=make_test_dataarray(), + ) + + custom_timestamp = 123456789 + message = deserialize_da00_to_message(payload, timestamp_ns=custom_timestamp) + + assert message.timestamp == custom_timestamp + + +class TestHttpTransport: + def test_context_manager_returns_resources(self) -> None: + transport = HttpTransport(instrument='dummy') + + with transport as resources: + assert resources.message_source is not None + assert resources.command_sink is not None + assert resources.roi_sink is not None + + def test_inject_da00(self) -> None: + transport = HttpTransport(instrument='dummy') + + with transport as resources: + result_key = ResultKey( + workflow_id=WorkflowId( + instrument='dummy', namespace='detector', name='view', version=1 + ), + job_id=JobId(job_number=uuid.uuid4(), source_name='panel_0'), + output_name='current', + ) + payload = serialize_dataarray_to_da00( + source_name=result_key.model_dump_json(), + data=make_test_dataarray(), + ) + + transport.inject_da00(payload) + + messages = resources.message_source.get_messages() + assert len(messages) == 1 + assert messages[0].stream.kind == StreamKind.LIVEDATA_DATA + + def test_inject_from_json(self) -> None: + transport = HttpTransport(instrument='dummy') + + with transport as resources: + result_key = ResultKey( + workflow_id=WorkflowId( + instrument='dummy', namespace='detector', name='view', version=1 + ), + job_id=JobId(job_number=uuid.uuid4(), source_name='panel_0'), + output_name='current', + ) + payload = serialize_dataarray_to_da00( + source_name=result_key.model_dump_json(), + data=make_test_dataarray(), + ) + + json_data = { + 'payload_base64': base64.b64encode(payload).decode('utf-8'), + 'timestamp_ns': 999999, + } + transport.inject_from_json(json_data) + + messages = resources.message_source.get_messages() + assert len(messages) == 1 + assert messages[0].timestamp == 999999 + + def test_handle_post_request_success(self) -> None: + transport = HttpTransport(instrument='dummy') + + with transport: + result_key = ResultKey( + workflow_id=WorkflowId( + instrument='dummy', namespace='detector', name='view', version=1 + ), + job_id=JobId(job_number=uuid.uuid4(), source_name='panel_0'), + output_name='current', + ) + payload = serialize_dataarray_to_da00( + source_name=result_key.model_dump_json(), + data=make_test_dataarray(), + ) + + body = ( + b'{"payload_base64": "' + + base64.b64encode(payload) + + b'", "timestamp_ns": 12345}' + ) + result = transport.handle_post_request(body) + + assert result == {'status': 'ok'} + + def test_handle_post_request_invalid_json(self) -> None: + transport = HttpTransport(instrument='dummy') + + with transport: + result = transport.handle_post_request(b'not valid json') + + assert result['status'] == 'error' + assert 'message' in result + + def test_handle_post_request_invalid_payload(self) -> None: + transport = HttpTransport(instrument='dummy') + + with transport: + body = b'{"payload_base64": "aW52YWxpZA=="}' # "invalid" in base64 + result = transport.handle_post_request(body) + + assert result['status'] == 'error'