feat: increase granularity of scenario events, removing exponential convo size growth and enabling streaming support#267
feat: increase granularity of scenario events, removing exponential convo size growth and enabling streaming support#2670xdeafcafe wants to merge 9 commits intomainfrom
Conversation
Replace xksuid with @langwatch/ksuid for ID generation, using the new prefix-aware generate() API. Bump ai, @ag-ui/core, @ai-sdk/openai, and @openai/agents to latest versions.
Add TEXT_MESSAGE_START, TEXT_MESSAGE_END, TEXT_MESSAGE_CONTENT, TOOL_CALL_START, TOOL_CALL_ARGS, and TOOL_CALL_END event types with their Zod schemas and TypeScript types. These replace the coarse-grained MESSAGE_SNAPSHOT for per-message lifecycle tracking.
Introduce AgentStreamPart discriminated union and optional stream() method on AgentAdapter. Add createStreamLLMInvoker factory wrapping ai SDK's streamText for streaming LLM responses.
Extract buildLLMParams helper and add stream() method that yields AgentStreamPart events from the ai SDK's streamText response, mapping text-delta, tool-input-start/delta, and tool-call parts.
Split event processing into two paths: lifecycle events (RUN_STARTED, TEXT_MESSAGE_START/END, RUN_FINISHED) are grouped by scenarioRunId for sequential ordering, while streaming events (TEXT_MESSAGE_CONTENT, TOOL_CALL_*) are fire-and-forget with no ordering guarantees. Update EventReporter to process TEXT_MESSAGE_END content instead of MESSAGE_SNAPSHOT messages array.
Emit TEXT_MESSAGE_START/END (and streaming CONTENT/TOOL_CALL_* deltas) instead of full MESSAGE_SNAPSHOT events. Add handleStreamingAgent for agents that implement stream(). Support pre-generated scenarioRunId via __UNSAFE__scenarioRunId config. Track message ordering with messageCounter and pass messageId through state.
Replace MESSAGE_SNAPSHOT handling with TEXT_MESSAGE_END events in the vitest reporter. Add stream() stubs to test agent implementations.
The subscription to state.events$ for MESSAGE_SNAPSHOT was removed but the finally block still referenced it, causing a TS2304 error.
|
Automated low-risk assessment This PR was evaluated against the repository's Low-Risk Pull Requests procedure and does not qualify as low risk.
This PR requires a manual review before merging. |
|
@rogeriochaves converting to draft as don't want to risk it getting merged before backend is deployed and tested with a bit of load. |
There was a problem hiding this comment.
Pull request overview
This PR refactors scenario event emission to be more granular (start/content/end + tool-call events) to avoid exponential conversation growth, and introduces first-class agent streaming support. It also updates the Vitest integration/reporting and migrates ID generation to @langwatch/ksuid.
Changes:
- Replace
SCENARIO_MESSAGE_SNAPSHOT-style broadcasting with granularTEXT_MESSAGE_*+TOOL_CALL_*scenario events and update the Vitest reporter accordingly. - Add agent streaming support (
AgentAdapter.stream()+ execution-side handling) to emit deltas/events while assembling final messages into state. - Migrate ID generation to
@langwatch/ksuidand bump related JavaScript dependencies.
Reviewed changes
Copilot reviewed 18 out of 19 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| javascript/src/utils/ids.ts | Switches ID generation to @langwatch/ksuid and updates batch-run ID generation behavior. |
| javascript/src/utils/convert-model-message-to-agui-events.ts | Adds a new utility for converting ModelMessage objects into granular scenario events. |
| javascript/src/runner/tests/run.test.ts | Updates test agents to satisfy the new stream() method shape. |
| javascript/src/integrations/vitest/setup-global.ts | Updates Vitest global setup to use @langwatch/ksuid and sets ksuid environment. |
| javascript/src/integrations/vitest/reporter.ts | Updates Vitest reporter to build chat logs from SCENARIO_TEXT_MESSAGE_END events instead of snapshots. |
| javascript/src/execution/scenario-execution.ts | Core refactor: emits granular message/tool-call events, supports optional streaming via agent.stream(), adds message indexing, and supports pre-generated run IDs. |
| javascript/src/execution/scenario-execution-state.ts | Extends message-added state events to include messageId and the full message payload; supports passing a caller-provided messageId. |
| javascript/src/events/schema.ts | Adds new event types and Zod schemas for granular message/tool-call events. |
| javascript/src/events/event-reporter.ts | Updates API event processing to focus on TEXT_MESSAGE_END content serialization behavior. |
| javascript/src/events/event-bus.ts | Splits processing into ordered (lifecycle) vs fire-and-forget (streaming) posting, grouped per scenarioRunId. |
| javascript/src/domain/scenarios/index.ts | Adds internal __UNSAFE__scenarioRunId override for platform-provided run IDs. |
| javascript/src/domain/agents/types/agent-stream.types.ts | Introduces AgentStreamPart discriminated union for streaming payloads. |
| javascript/src/domain/agents/index.ts | Exposes AgentStreamPart and adds an overridable stream() default implementation to AgentAdapter. |
| javascript/src/agents/user-simulator-agent.ts | Adds streaming support to the user simulator via streamText/fullStream mapping into AgentStreamPart. |
| javascript/src/agents/types.ts | Adds types for streaming invocation params/results (InvokeStreamLLMParams, StreamLLMResult). |
| javascript/src/agents/llm-invoker.factory.ts | Adds a streaming invoker factory (createStreamLLMInvoker) that wraps streamText. |
| javascript/pnpm-lock.yaml | Locks updated for dependency bumps and new packages. |
| javascript/package.json | Bumps SDK dependencies to newer versions. |
| javascript/examples/vitest/package.json | Adds @langwatch/ksuid to the Vitest example package dependencies. |
Files not reviewed (1)
- javascript/pnpm-lock.yaml: Language not supported
Comments suppressed due to low confidence (1)
javascript/package.json:65
- The code now imports
@langwatch/ksuid(e.g.,src/utils/ids.ts,integrations/vitest/setup-global.ts), but this package is not listed in this package’s dependencies. Conversely,xksuidremains listed but appears unused after the refactor. Add@langwatch/ksuidtodependencies(and update the lockfile), and removexksuidif it’s no longer needed to avoid runtime/module-resolution failures under pnpm’s strict node_modules layout.
"dependencies": {
"@opentelemetry/sdk-node": "0.212.0",
"@ag-ui/core": "^0.0.47",
"@ai-sdk/openai": "^3.0.41",
"@openai/agents": "^0.3.9",
"ai": "^6.0.116",
"chalk": "^5.6.2",
"langwatch": "0.16.1",
"open": "11.0.0",
"rxjs": "^7.8.2",
"xksuid": "^0.0.4",
"zod": "^3.25.76"
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Emit START → END for each message, then addMessage + broadcast | ||
| for (const message of messages) { | ||
| this.state.addMessage({ | ||
| ...message, | ||
| const messageId = generateMessageId(); | ||
| const messageIndex = this.messageCounter++; | ||
| const aguiMessages = convertModelMessagesToAguiMessages([ | ||
| { ...message, id: messageId }, | ||
| ]); | ||
| const aguiMessage = aguiMessages[0]; | ||
|
|
||
| this.emitTextMessageStart({ | ||
| scenarioRunId, | ||
| messageId, | ||
| role: message.role, | ||
| messageIndex, | ||
| }); | ||
| this.emitTextMessageEnd({ | ||
| scenarioRunId, | ||
| messageId, | ||
| role: message.role, | ||
| content: | ||
| aguiMessage && typeof aguiMessage.content === "string" | ||
| ? aguiMessage.content | ||
| : typeof message.content === "string" | ||
| ? message.content | ||
| : JSON.stringify(message.content), | ||
| message: aguiMessage ?? {}, | ||
| traceId, | ||
| messageIndex, | ||
| }); | ||
|
|
||
| this.state.addMessage({ ...message, traceId }, messageId); | ||
| this.broadcastMessage(message, idx); |
There was a problem hiding this comment.
messageCounter/TEXT_MESSAGE_* emission is now done manually per message in this block. There are still other code paths in ScenarioExecution that call this.state.addMessage(...) without supplying a messageId and without emitting TEXT_MESSAGE_START/END, which can lead to missing events and inconsistent messageIndex ordering. Consider centralizing “emit + addMessage” into a single helper and ensuring all message insertions go through it.
| // Fire-and-forget path: streaming events posted concurrently, best-effort | ||
| this.events$ | ||
| .pipe( | ||
| // Post events and get results | ||
| concatMap(async (event: ScenarioEvent) => { | ||
| this.logger.debug(`[${event.type}] Processing event`, { event }); | ||
| const result = await this.eventReporter.postEvent(event); | ||
| return { event, result }; | ||
| }), | ||
|
|
||
| // Handle watch messages reactively | ||
| tap(async ({ event, result }) => { | ||
| if (event.type === ScenarioEventType.RUN_STARTED && result.setUrl) { | ||
| await this.eventAlertMessageLogger.handleWatchMessage({ | ||
| scenarioSetId: event.scenarioSetId, | ||
| scenarioRunId: event.scenarioRunId, | ||
| setUrl: result.setUrl, | ||
| }); | ||
| } | ||
| }), | ||
| filter((event) => FIRE_AND_FORGET_EVENTS.has(event.type)), | ||
| mergeMap((event) => | ||
| this.eventReporter.postEvent(event).catch((err) => { | ||
| this.logger.debug(`[${event.type}] Fire-and-forget POST failed (non-fatal)`, err); | ||
| }), | ||
| ), | ||
| ) |
There was a problem hiding this comment.
The fire-and-forget path uses mergeMap with unbounded concurrency. During streaming, a large number of TEXT_MESSAGE_CONTENT / TOOL_CALL_* events could trigger an unlimited number of concurrent fetch() calls, increasing memory/FD pressure and potentially throttling/overwhelming the endpoint. Consider adding a reasonable concurrency limit to mergeMap (or buffering/throttling deltas) so event posting remains stable under high-volume streams.
| import { generate, parse } from "@langwatch/ksuid"; | ||
| import crypto from "node:crypto"; | ||
| import process from "node:process"; |
There was a problem hiding this comment.
This module has migrated from xksuid to @langwatch/ksuid, but the exported helper safeParseXKsuid (and its docstring) still references xksuid. To avoid confusion for SDK consumers, update the naming/docs to reflect KSUID (or add a deprecated alias while keeping backward compatibility).
| // Scenario event type enum | ||
| export enum ScenarioEventType { | ||
| RUN_STARTED = "SCENARIO_RUN_STARTED", | ||
| RUN_FINISHED = "SCENARIO_RUN_FINISHED", | ||
| MESSAGE_SNAPSHOT = "SCENARIO_MESSAGE_SNAPSHOT", | ||
| TEXT_MESSAGE_START = "SCENARIO_TEXT_MESSAGE_START", | ||
| TEXT_MESSAGE_END = "SCENARIO_TEXT_MESSAGE_END", | ||
| TEXT_MESSAGE_CONTENT = "SCENARIO_TEXT_MESSAGE_CONTENT", | ||
| TOOL_CALL_START = "SCENARIO_TOOL_CALL_START", | ||
| TOOL_CALL_ARGS = "SCENARIO_TOOL_CALL_ARGS", | ||
| TOOL_CALL_END = "SCENARIO_TOOL_CALL_END", | ||
| } |
There was a problem hiding this comment.
This file header states it is synced from the backend source of truth and “DO NOT EDIT MANUALLY”. Since this PR changes the event enum/schemas here, ensure the corresponding backend schema/enums have been updated and that this file was regenerated via the normal sync process; otherwise these changes risk being overwritten or diverging from server validation.
| /** | ||
| * Handles a streaming agent response: emits START, CONTENT deltas, and END events, | ||
| * then assembles the final message and adds it to state. | ||
| */ | ||
| private async handleStreamingAgent({ | ||
| stream, | ||
| idx, | ||
| roleString, | ||
| scenarioRunId, | ||
| agentSpan, | ||
| startTime, | ||
| }: { | ||
| stream: AsyncIterable<import("../domain").AgentStreamPart>; | ||
| idx: number; | ||
| roleString: "user" | "assistant"; | ||
| scenarioRunId: string; | ||
| agentSpan: { setOutput: (type: string, data: unknown) => void; setMetrics: (m: Record<string, number>) => void; spanContext: () => { traceId: { toString: () => string } } }; | ||
| startTime: number; | ||
| }): Promise<void> { |
There was a problem hiding this comment.
Streaming support is introduced here (START/CONTENT/TOOL_CALL_*/END emission plus final message assembly), but there don’t appear to be unit tests exercising the streaming path (e.g., an agent with stream() yielding text/tool-call parts and assertions over emitted events + final state). Adding coverage would help prevent regressions in ordering, message assembly, and event posting semantics.
| events.push({ | ||
| ...common, | ||
| type: ScenarioEventType.TEXT_MESSAGE_END, | ||
| messageId: message.id, |
There was a problem hiding this comment.
In the assistant content-array branch, the emitted TEXT_MESSAGE_END event is missing required fields (at least role, and likely content/traceId depending on intended contract). This produces events that don’t conform to scenarioTextMessageEndSchema and will likely be rejected/ignored downstream. Include the required fields on TEXT_MESSAGE_END (and consider emitting a single START + multiple CONTENT + single END per messageId rather than repeating START/END per part).
| messageId: message.id, | |
| messageId: message.id, | |
| role: "assistant", | |
| content: part.text, | |
| traceId: message.traceId, |
| console.log(`${roleLabel}:\n\n${indent(parsedJson)}\n`); | ||
| continue; | ||
| } else roleLabel = chalk.yellow(role); | ||
| let roleLabel = role; |
No description provided.