Skip to content

Commit 6af83d8

Browse files
[codex][app-server] introduce codex/event/raw_item events (#5578)
1 parent e2e1b65 commit 6af83d8

File tree

13 files changed

+300
-28
lines changed

13 files changed

+300
-28
lines changed

codex-rs/app-server-protocol/src/protocol.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -717,6 +717,8 @@ pub struct SendUserMessageResponse {}
717717
#[serde(rename_all = "camelCase")]
718718
pub struct AddConversationListenerParams {
719719
pub conversation_id: ConversationId,
720+
#[serde(default)]
721+
pub experimental_raw_events: bool,
720722
}
721723

722724
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]

codex-rs/app-server/src/codex_message_processor.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1256,7 +1256,10 @@ impl CodexMessageProcessor {
12561256
request_id: RequestId,
12571257
params: AddConversationListenerParams,
12581258
) {
1259-
let AddConversationListenerParams { conversation_id } = params;
1259+
let AddConversationListenerParams {
1260+
conversation_id,
1261+
experimental_raw_events,
1262+
} = params;
12601263
let Ok(conversation) = self
12611264
.conversation_manager
12621265
.get_conversation(conversation_id)
@@ -1293,6 +1296,11 @@ impl CodexMessageProcessor {
12931296
}
12941297
};
12951298

1299+
if let EventMsg::RawResponseItem(_) = &event.msg
1300+
&& !experimental_raw_events {
1301+
continue;
1302+
}
1303+
12961304
// For now, we send a notification for every event,
12971305
// JSON-serializing the `Event` as-is, but these should
12981306
// be migrated to be variants of `ServerNotification`

codex-rs/app-server/tests/suite/codex_message_processor_flow.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,10 @@ async fn test_codex_jsonrpc_conversation_flow() {
103103

104104
// 2) addConversationListener
105105
let add_listener_id = mcp
106-
.send_add_conversation_listener_request(AddConversationListenerParams { conversation_id })
106+
.send_add_conversation_listener_request(AddConversationListenerParams {
107+
conversation_id,
108+
experimental_raw_events: false,
109+
})
107110
.await
108111
.expect("send addConversationListener");
109112
let add_listener_resp: JSONRPCResponse = timeout(
@@ -252,7 +255,10 @@ async fn test_send_user_turn_changes_approval_policy_behavior() {
252255

253256
// 2) addConversationListener
254257
let add_listener_id = mcp
255-
.send_add_conversation_listener_request(AddConversationListenerParams { conversation_id })
258+
.send_add_conversation_listener_request(AddConversationListenerParams {
259+
conversation_id,
260+
experimental_raw_events: false,
261+
})
256262
.await
257263
.expect("send addConversationListener");
258264
let _: AddConversationSubscriptionResponse =
@@ -459,7 +465,10 @@ async fn test_send_user_turn_updates_sandbox_and_cwd_between_turns() {
459465
.expect("deserialize newConversation response");
460466

461467
let add_listener_id = mcp
462-
.send_add_conversation_listener_request(AddConversationListenerParams { conversation_id })
468+
.send_add_conversation_listener_request(AddConversationListenerParams {
469+
conversation_id,
470+
experimental_raw_events: false,
471+
})
463472
.await
464473
.expect("send addConversationListener");
465474
timeout(

codex-rs/app-server/tests/suite/create_conversation.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,10 @@ async fn test_conversation_create_and_send_message_ok() {
6767

6868
// Add a listener so we receive notifications for this conversation (not strictly required for this test).
6969
let add_listener_id = mcp
70-
.send_add_conversation_listener_request(AddConversationListenerParams { conversation_id })
70+
.send_add_conversation_listener_request(AddConversationListenerParams {
71+
conversation_id,
72+
experimental_raw_events: false,
73+
})
7174
.await
7275
.expect("send addConversationListener");
7376
let _sub: AddConversationSubscriptionResponse =

codex-rs/app-server/tests/suite/interrupt.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,10 @@ async fn shell_command_interruption() -> anyhow::Result<()> {
8888

8989
// 2) addConversationListener
9090
let add_listener_id = mcp
91-
.send_add_conversation_listener_request(AddConversationListenerParams { conversation_id })
91+
.send_add_conversation_listener_request(AddConversationListenerParams {
92+
conversation_id,
93+
experimental_raw_events: false,
94+
})
9295
.await?;
9396
let _add_listener_resp: JSONRPCResponse = timeout(
9497
DEFAULT_READ_TIMEOUT,

codex-rs/app-server/tests/suite/send_message.rs

Lines changed: 210 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use codex_app_server_protocol::RequestId;
1515
use codex_app_server_protocol::SendUserMessageParams;
1616
use codex_app_server_protocol::SendUserMessageResponse;
1717
use codex_protocol::ConversationId;
18+
use codex_protocol::models::ContentItem;
19+
use codex_protocol::models::ResponseItem;
1820
use pretty_assertions::assert_eq;
1921
use tempfile::TempDir;
2022
use tokio::time::timeout;
@@ -62,7 +64,10 @@ async fn test_send_message_success() {
6264

6365
// 2) addConversationListener
6466
let add_listener_id = mcp
65-
.send_add_conversation_listener_request(AddConversationListenerParams { conversation_id })
67+
.send_add_conversation_listener_request(AddConversationListenerParams {
68+
conversation_id,
69+
experimental_raw_events: false,
70+
})
6671
.await
6772
.expect("send addConversationListener");
6873
let add_listener_resp: JSONRPCResponse = timeout(
@@ -124,6 +129,105 @@ async fn send_message(message: &str, conversation_id: ConversationId, mcp: &mut
124129
.expect("should have conversationId"),
125130
&serde_json::Value::String(conversation_id.to_string())
126131
);
132+
133+
let raw_attempt = tokio::time::timeout(
134+
std::time::Duration::from_millis(200),
135+
mcp.read_stream_until_notification_message("codex/event/raw_response_item"),
136+
)
137+
.await;
138+
assert!(
139+
raw_attempt.is_err(),
140+
"unexpected raw item notification when not opted in"
141+
);
142+
}
143+
144+
#[tokio::test]
145+
async fn test_send_message_raw_notifications_opt_in() {
146+
let responses = vec![
147+
create_final_assistant_message_sse_response("Done").expect("build mock assistant message"),
148+
];
149+
let server = create_mock_chat_completions_server(responses).await;
150+
151+
let codex_home = TempDir::new().expect("create temp dir");
152+
create_config_toml(codex_home.path(), &server.uri()).expect("write config.toml");
153+
154+
let mut mcp = McpProcess::new(codex_home.path())
155+
.await
156+
.expect("spawn mcp process");
157+
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
158+
.await
159+
.expect("init timed out")
160+
.expect("init failed");
161+
162+
let new_conv_id = mcp
163+
.send_new_conversation_request(NewConversationParams::default())
164+
.await
165+
.expect("send newConversation");
166+
let new_conv_resp: JSONRPCResponse = timeout(
167+
DEFAULT_READ_TIMEOUT,
168+
mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)),
169+
)
170+
.await
171+
.expect("newConversation timeout")
172+
.expect("newConversation resp");
173+
let NewConversationResponse {
174+
conversation_id, ..
175+
} = to_response::<_>(new_conv_resp).expect("deserialize newConversation response");
176+
177+
let add_listener_id = mcp
178+
.send_add_conversation_listener_request(AddConversationListenerParams {
179+
conversation_id,
180+
experimental_raw_events: true,
181+
})
182+
.await
183+
.expect("send addConversationListener");
184+
let add_listener_resp: JSONRPCResponse = timeout(
185+
DEFAULT_READ_TIMEOUT,
186+
mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)),
187+
)
188+
.await
189+
.expect("addConversationListener timeout")
190+
.expect("addConversationListener resp");
191+
let AddConversationSubscriptionResponse { subscription_id: _ } =
192+
to_response::<_>(add_listener_resp).expect("deserialize addConversationListener response");
193+
194+
let send_id = mcp
195+
.send_send_user_message_request(SendUserMessageParams {
196+
conversation_id,
197+
items: vec![InputItem::Text {
198+
text: "Hello".to_string(),
199+
}],
200+
})
201+
.await
202+
.expect("send sendUserMessage");
203+
204+
let instructions = read_raw_response_item(&mut mcp, conversation_id).await;
205+
assert_instructions_message(&instructions);
206+
207+
let environment = read_raw_response_item(&mut mcp, conversation_id).await;
208+
assert_environment_message(&environment);
209+
210+
let response: JSONRPCResponse = timeout(
211+
DEFAULT_READ_TIMEOUT,
212+
mcp.read_stream_until_response_message(RequestId::Integer(send_id)),
213+
)
214+
.await
215+
.expect("sendUserMessage response timeout")
216+
.expect("sendUserMessage response error");
217+
let _ok: SendUserMessageResponse = to_response::<SendUserMessageResponse>(response)
218+
.expect("deserialize sendUserMessage response");
219+
220+
let user_message = read_raw_response_item(&mut mcp, conversation_id).await;
221+
assert_user_message(&user_message, "Hello");
222+
223+
let assistant_message = read_raw_response_item(&mut mcp, conversation_id).await;
224+
assert_assistant_message(&assistant_message, "Done");
225+
226+
let _ = tokio::time::timeout(
227+
std::time::Duration::from_millis(250),
228+
mcp.read_stream_until_notification_message("codex/event/task_complete"),
229+
)
230+
.await;
127231
}
128232

129233
#[tokio::test]
@@ -184,3 +288,108 @@ stream_max_retries = 0
184288
),
185289
)
186290
}
291+
292+
#[expect(clippy::expect_used)]
293+
async fn read_raw_response_item(
294+
mcp: &mut McpProcess,
295+
conversation_id: ConversationId,
296+
) -> ResponseItem {
297+
let raw_notification: JSONRPCNotification = timeout(
298+
DEFAULT_READ_TIMEOUT,
299+
mcp.read_stream_until_notification_message("codex/event/raw_response_item"),
300+
)
301+
.await
302+
.expect("codex/event/raw_response_item notification timeout")
303+
.expect("codex/event/raw_response_item notification resp");
304+
305+
let serde_json::Value::Object(params) = raw_notification
306+
.params
307+
.expect("codex/event/raw_response_item should have params")
308+
else {
309+
panic!("codex/event/raw_response_item should have params");
310+
};
311+
312+
let conversation_id_value = params
313+
.get("conversationId")
314+
.and_then(|value| value.as_str())
315+
.expect("raw response item should include conversationId");
316+
317+
assert_eq!(
318+
conversation_id_value,
319+
conversation_id.to_string(),
320+
"raw response item conversation mismatch"
321+
);
322+
323+
let msg_value = params
324+
.get("msg")
325+
.cloned()
326+
.expect("raw response item should include msg payload");
327+
328+
serde_json::from_value(msg_value).expect("deserialize raw response item")
329+
}
330+
331+
fn assert_instructions_message(item: &ResponseItem) {
332+
match item {
333+
ResponseItem::Message { role, content, .. } => {
334+
assert_eq!(role, "user");
335+
let texts = content_texts(content);
336+
assert!(
337+
texts
338+
.iter()
339+
.any(|text| text.contains("<user_instructions>")),
340+
"expected instructions message, got {texts:?}"
341+
);
342+
}
343+
other => panic!("expected instructions message, got {other:?}"),
344+
}
345+
}
346+
347+
fn assert_environment_message(item: &ResponseItem) {
348+
match item {
349+
ResponseItem::Message { role, content, .. } => {
350+
assert_eq!(role, "user");
351+
let texts = content_texts(content);
352+
assert!(
353+
texts
354+
.iter()
355+
.any(|text| text.contains("<environment_context>")),
356+
"expected environment context message, got {texts:?}"
357+
);
358+
}
359+
other => panic!("expected environment message, got {other:?}"),
360+
}
361+
}
362+
363+
fn assert_user_message(item: &ResponseItem, expected_text: &str) {
364+
match item {
365+
ResponseItem::Message { role, content, .. } => {
366+
assert_eq!(role, "user");
367+
let texts = content_texts(content);
368+
assert_eq!(texts, vec![expected_text]);
369+
}
370+
other => panic!("expected user message, got {other:?}"),
371+
}
372+
}
373+
374+
fn assert_assistant_message(item: &ResponseItem, expected_text: &str) {
375+
match item {
376+
ResponseItem::Message { role, content, .. } => {
377+
assert_eq!(role, "assistant");
378+
let texts = content_texts(content);
379+
assert_eq!(texts, vec![expected_text]);
380+
}
381+
other => panic!("expected assistant message, got {other:?}"),
382+
}
383+
}
384+
385+
fn content_texts(content: &[ContentItem]) -> Vec<&str> {
386+
content
387+
.iter()
388+
.filter_map(|item| match item {
389+
ContentItem::InputText { text } | ContentItem::OutputText { text } => {
390+
Some(text.as_str())
391+
}
392+
_ => None,
393+
})
394+
.collect()
395+
}

0 commit comments

Comments
 (0)