From dde002b92c9cb3b1d2016c399b0133899910d645 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Sat, 25 Oct 2025 13:43:19 -0700 Subject: [PATCH 01/12] Add feedback upload request handling --- codex-rs/Cargo.lock | 1 + codex-rs/app-server-protocol/src/protocol.rs | 26 ++++ codex-rs/app-server/Cargo.toml | 1 + .../app-server/src/codex_message_processor.rs | 138 ++++++++++++++++++ codex-rs/app-server/src/lib.rs | 13 ++ codex-rs/app-server/src/message_processor.rs | 3 + .../app-server/tests/common/mcp_process.rs | 10 ++ codex-rs/app-server/tests/suite/feedback.rs | 133 +++++++++++++++++ codex-rs/app-server/tests/suite/mod.rs | 1 + 9 files changed, 326 insertions(+) create mode 100644 codex-rs/app-server/tests/suite/feedback.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index a2e49fc506c..5025c82e7ad 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -843,6 +843,7 @@ dependencies = [ "codex-backend-client", "codex-common", "codex-core", + "codex-feedback", "codex-file-search", "codex-login", "codex-protocol", diff --git a/codex-rs/app-server-protocol/src/protocol.rs b/codex-rs/app-server-protocol/src/protocol.rs index 71a4d77f8d0..3bb1467ad8b 100644 --- a/codex-rs/app-server-protocol/src/protocol.rs +++ b/codex-rs/app-server-protocol/src/protocol.rs @@ -124,6 +124,13 @@ client_request_definitions! { response: GetAccountRateLimitsResponse, }, + #[serde(rename = "feedback/upload")] + #[ts(rename = "feedback/upload")] + UploadFeedback { + params: UploadFeedbackParams, + response: UploadFeedbackResponse, + }, + #[serde(rename = "account/read")] #[ts(rename = "account/read")] GetAccount { @@ -378,6 +385,25 @@ pub struct ListModelsResponse { pub next_cursor: Option, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +pub struct UploadFeedbackParams { + pub classification: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub reason: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub conversation_id: Option, + pub include_logs: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub rollout_path: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +pub struct UploadFeedbackResponse { + pub thread_id: String, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(tag = "type")] #[ts(tag = "type")] diff --git a/codex-rs/app-server/Cargo.toml b/codex-rs/app-server/Cargo.toml index c1efb2ef93b..d693e7bb7f8 100644 --- a/codex-rs/app-server/Cargo.toml +++ b/codex-rs/app-server/Cargo.toml @@ -24,6 +24,7 @@ codex-file-search = { workspace = true } codex-login = { workspace = true } codex-protocol = { workspace = true } codex-app-server-protocol = { workspace = true } +codex-feedback = { workspace = true } codex-utils-json-to-toml = { workspace = true } chrono = { workspace = true } serde = { workspace = true, features = ["derive"] } diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index b2b242fb297..bfe20bf3902 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -52,6 +52,8 @@ use codex_app_server_protocol::ServerRequestPayload; use codex_app_server_protocol::SessionConfiguredNotification; use codex_app_server_protocol::SetDefaultModelParams; use codex_app_server_protocol::SetDefaultModelResponse; +use codex_app_server_protocol::UploadFeedbackParams; +use codex_app_server_protocol::UploadFeedbackResponse; use codex_app_server_protocol::UserInfoResponse; use codex_app_server_protocol::UserSavedConfig; use codex_backend_client::Client as BackendClient; @@ -85,6 +87,7 @@ use codex_core::protocol::EventMsg; use codex_core::protocol::ExecApprovalRequestEvent; use codex_core::protocol::Op; use codex_core::protocol::ReviewDecision; +use codex_feedback::CodexFeedback; use codex_login::ServerOptions as LoginServerOptions; use codex_login::ShutdownHandle; use codex_login::run_login_server; @@ -103,6 +106,7 @@ use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::time::Duration; +use tokio::fs; use tokio::select; use tokio::sync::Mutex; use tokio::sync::oneshot; @@ -136,6 +140,7 @@ pub(crate) struct CodexMessageProcessor { // Queue of pending interrupt requests per conversation. We reply when TurnAborted arrives. pending_interrupts: Arc>>>, pending_fuzzy_searches: Arc>>>, + feedback: CodexFeedback, } impl CodexMessageProcessor { @@ -145,6 +150,7 @@ impl CodexMessageProcessor { outgoing: Arc, codex_linux_sandbox_exe: Option, config: Arc, + feedback: CodexFeedback, ) -> Self { Self { auth_manager, @@ -156,6 +162,7 @@ impl CodexMessageProcessor { active_login: Arc::new(Mutex::new(None)), pending_interrupts: Arc::new(Mutex::new(HashMap::new())), pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())), + feedback, } } @@ -275,6 +282,9 @@ impl CodexMessageProcessor { } => { self.get_account_rate_limits(request_id).await; } + ClientRequest::UploadFeedback { request_id, params } => { + self.upload_feedback(request_id, params).await; + } } } @@ -1418,6 +1428,134 @@ impl CodexMessageProcessor { let response = FuzzyFileSearchResponse { files: results }; self.outgoing.send_response(request_id, response).await; } + + async fn upload_feedback(&self, request_id: RequestId, params: UploadFeedbackParams) { + let UploadFeedbackParams { + classification, + reason, + conversation_id, + include_logs, + rollout_path, + } = params; + + let snapshot = self.feedback.snapshot(conversation_id.clone()); + let thread_id = snapshot.thread_id.clone(); + + let validated_rollout_path = if include_logs { + if let Some(path) = rollout_path { + match self + .validate_rollout_path(path, conversation_id.clone()) + .await + { + Ok(validated) => Some(validated), + Err(err) => { + self.outgoing.send_error(request_id, err).await; + return; + } + } + } else { + None + } + } else { + None + }; + + let cli_version = env!("CARGO_PKG_VERSION").to_string(); + let upload_result = tokio::task::spawn_blocking(move || { + let rollout_path_ref = validated_rollout_path + .as_ref() + .map(std::path::PathBuf::as_path); + snapshot.upload_feedback( + &classification, + reason.as_deref(), + cli_version.as_str(), + include_logs, + rollout_path_ref, + ) + }) + .await; + + let upload_result = match upload_result { + Ok(result) => result, + Err(join_err) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to upload feedback: {join_err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + match upload_result { + Ok(()) => { + let response = UploadFeedbackResponse { thread_id }; + self.outgoing.send_response(request_id, response).await; + } + Err(err) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to upload feedback: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + } + } + } + + async fn validate_rollout_path( + &self, + path: PathBuf, + conversation_id: Option, + ) -> Result { + let rollout_folder = self.config.codex_home.join(codex_core::SESSIONS_SUBDIR); + let canonical = match fs::canonicalize(&path).await { + Ok(c) => c, + Err(err) => { + return Err(JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("failed to resolve rollout path `{}`: {err}", path.display()), + data: None, + }); + } + }; + + if !canonical.starts_with(&rollout_folder) { + return Err(JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!( + "rollout path `{}` must be in sessions directory", + path.display() + ), + data: None, + }); + } + + if let Some(conversation_id) = conversation_id { + let expected_suffix = format!("{conversation_id}.jsonl"); + let Some(file_name) = canonical.file_name().and_then(|name| name.to_str()) else { + return Err(JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("rollout path `{}` missing file name", path.display()), + data: None, + }); + }; + + if !file_name.ends_with(expected_suffix.as_str()) { + return Err(JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!( + "rollout path `{}` must end with `{expected_suffix}`", + path.display() + ), + data: None, + }); + } + } + + Ok(canonical) + } } async fn apply_bespoke_event_handling( diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index c4102f82627..6ef986919f3 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -12,16 +12,19 @@ use crate::message_processor::MessageProcessor; use crate::outgoing_message::OutgoingMessage; use crate::outgoing_message::OutgoingMessageSender; use codex_app_server_protocol::JSONRPCMessage; +use codex_feedback::CodexFeedback; use tokio::io::AsyncBufReadExt; use tokio::io::AsyncWriteExt; use tokio::io::BufReader; use tokio::io::{self}; use tokio::sync::mpsc; +use tracing::Level; use tracing::debug; use tracing::error; use tracing::info; use tracing_subscriber::EnvFilter; use tracing_subscriber::Layer; +use tracing_subscriber::filter::Targets; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; @@ -82,6 +85,8 @@ pub async fn run_main( std::io::Error::new(ErrorKind::InvalidData, format!("error loading config: {e}")) })?; + let feedback = CodexFeedback::new(); + let otel = codex_core::otel_init::build_provider(&config, env!("CARGO_PKG_VERSION")).map_err(|e| { std::io::Error::new( @@ -96,8 +101,15 @@ pub async fn run_main( .with_writer(std::io::stderr) .with_filter(EnvFilter::from_default_env()); + let feedback_layer = tracing_subscriber::fmt::layer() + .with_writer(feedback.make_writer()) + .with_ansi(false) + .with_target(false) + .with_filter(Targets::new().with_default(Level::TRACE)); + let _ = tracing_subscriber::registry() .with(stderr_fmt) + .with(feedback_layer) .with(otel.as_ref().map(|provider| { OpenTelemetryTracingBridge::new(&provider.logger).with_filter( tracing_subscriber::filter::filter_fn(codex_core::otel_init::codex_export_filter), @@ -112,6 +124,7 @@ pub async fn run_main( outgoing_message_sender, codex_linux_sandbox_exe, std::sync::Arc::new(config), + feedback.clone(), ); async move { while let Some(msg) = incoming_rx.recv().await { diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 15086c19e18..a2c192cf2aa 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -17,6 +17,7 @@ use codex_core::ConversationManager; use codex_core::config::Config; use codex_core::default_client::USER_AGENT_SUFFIX; use codex_core::default_client::get_codex_user_agent; +use codex_feedback::CodexFeedback; use codex_protocol::protocol::SessionSource; use std::sync::Arc; @@ -33,6 +34,7 @@ impl MessageProcessor { outgoing: OutgoingMessageSender, codex_linux_sandbox_exe: Option, config: Arc, + feedback: CodexFeedback, ) -> Self { let outgoing = Arc::new(outgoing); let auth_manager = AuthManager::shared(config.codex_home.clone(), false); @@ -46,6 +48,7 @@ impl MessageProcessor { outgoing.clone(), codex_linux_sandbox_exe, config, + feedback, ); Self { diff --git a/codex-rs/app-server/tests/common/mcp_process.rs b/codex-rs/app-server/tests/common/mcp_process.rs index f6d35b8d968..90c7645d584 100644 --- a/codex-rs/app-server/tests/common/mcp_process.rs +++ b/codex-rs/app-server/tests/common/mcp_process.rs @@ -30,6 +30,7 @@ use codex_app_server_protocol::SendUserMessageParams; use codex_app_server_protocol::SendUserTurnParams; use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::SetDefaultModelParams; +use codex_app_server_protocol::UploadFeedbackParams; use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCMessage; @@ -242,6 +243,15 @@ impl McpProcess { self.send_request("account/rateLimits/read", None).await } + /// Send a `feedback/upload` JSON-RPC request. + pub async fn send_upload_feedback_request( + &mut self, + params: UploadFeedbackParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("feedback/upload", params).await + } + /// Send a `userInfo` JSON-RPC request. pub async fn send_user_info_request(&mut self) -> anyhow::Result { self.send_request("userInfo", None).await diff --git a/codex-rs/app-server/tests/suite/feedback.rs b/codex-rs/app-server/tests/suite/feedback.rs new file mode 100644 index 00000000000..1ec37a415f2 --- /dev/null +++ b/codex-rs/app-server/tests/suite/feedback.rs @@ -0,0 +1,133 @@ +use std::path::Path; +use std::time::Duration; + +use anyhow::Result; +use app_test_support::McpProcess; +use app_test_support::create_final_assistant_message_sse_response; +use app_test_support::create_mock_chat_completions_server; +use app_test_support::to_response; +use codex_app_server_protocol::JSONRPCError; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::NewConversationParams; +use codex_app_server_protocol::NewConversationResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::UploadFeedbackParams; +use codex_app_server_protocol::UploadFeedbackResponse; +use tempfile::TempDir; +use tokio::time::timeout; + +const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10); +const INVALID_REQUEST_ERROR_CODE: i64 = -32600; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn upload_feedback_succeeds() -> Result<()> { + let responses = vec![create_final_assistant_message_sse_response("Done")?]; + let server = create_mock_chat_completions_server(responses).await; + + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let new_id = mcp + .send_new_conversation_request(NewConversationParams::default()) + .await?; + let new_response: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(new_id)), + ) + .await??; + let NewConversationResponse { + conversation_id, + rollout_path, + .. + } = to_response::(new_response)?; + + let upload_id = mcp + .send_upload_feedback_request(UploadFeedbackParams { + classification: "bug".to_string(), + reason: Some("it broke".to_string()), + conversation_id: Some(conversation_id), + include_logs: true, + rollout_path: Some(rollout_path), + }) + .await?; + + let upload_response: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(upload_id)), + ) + .await??; + let UploadFeedbackResponse { thread_id } = + to_response::(upload_response)?; + + assert!( + !thread_id.is_empty(), + "thread id should be returned by upload feedback" + ); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn upload_feedback_rejects_invalid_rollout_path() -> Result<()> { + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), "http://127.0.0.1")?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let invalid_dir = codex_home.path().join("invalid"); + std::fs::create_dir_all(&invalid_dir)?; + let invalid_path = invalid_dir.join("fake.jsonl"); + std::fs::write(&invalid_path, "[]")?; + + let request_id = mcp + .send_upload_feedback_request(UploadFeedbackParams { + classification: "bug".to_string(), + reason: None, + conversation_id: None, + include_logs: true, + rollout_path: Some(invalid_path.clone()), + }) + .await?; + + let error: JSONRPCError = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(request_id)), + ) + .await??; + + assert_eq!(error.error.code, INVALID_REQUEST_ERROR_CODE); + assert_eq!( + error.error.message, + format!( + "rollout path `{}` must be in sessions directory", + invalid_path.display() + ) + ); + Ok(()) +} + +fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { + let config_toml = codex_home.join("config.toml"); + std::fs::write( + config_toml, + format!( + r#" +model = "mock-model" +approval_policy = "never" +sandbox_mode = "danger-full-access" + +model_provider = "mock_provider" + +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "{server_uri}/v1" +wire_api = "chat" +request_max_retries = 0 +stream_max_retries = 0 +"# + ), + ) +} diff --git a/codex-rs/app-server/tests/suite/mod.rs b/codex-rs/app-server/tests/suite/mod.rs index c8763cd0c95..091261f7c10 100644 --- a/codex-rs/app-server/tests/suite/mod.rs +++ b/codex-rs/app-server/tests/suite/mod.rs @@ -3,6 +3,7 @@ mod auth; mod codex_message_processor_flow; mod config; mod create_conversation; +mod feedback; mod fuzzy_file_search; mod interrupt; mod list_resume; From e541aa1d43e9af6a50ca27a620252bee64106840 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Sat, 25 Oct 2025 14:12:52 -0700 Subject: [PATCH 02/12] changes --- codex-rs/app-server-protocol/src/protocol.rs | 2 - .../app-server/src/codex_message_processor.rs | 82 +++---------------- codex-rs/app-server/tests/suite/feedback.rs | 51 +----------- codex-rs/core/src/codex_conversation.rs | 10 ++- codex-rs/core/src/conversation_manager.rs | 5 +- 5 files changed, 28 insertions(+), 122 deletions(-) diff --git a/codex-rs/app-server-protocol/src/protocol.rs b/codex-rs/app-server-protocol/src/protocol.rs index 3bb1467ad8b..2c95719420a 100644 --- a/codex-rs/app-server-protocol/src/protocol.rs +++ b/codex-rs/app-server-protocol/src/protocol.rs @@ -394,8 +394,6 @@ pub struct UploadFeedbackParams { #[serde(skip_serializing_if = "Option::is_none")] pub conversation_id: Option, pub include_logs: bool, - #[serde(skip_serializing_if = "Option::is_none")] - pub rollout_path: Option, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index bfe20bf3902..aa584ec67a0 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -106,7 +106,6 @@ use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::time::Duration; -use tokio::fs; use tokio::select; use tokio::sync::Mutex; use tokio::sync::oneshot; @@ -1435,26 +1434,15 @@ impl CodexMessageProcessor { reason, conversation_id, include_logs, - rollout_path, } = params; - let snapshot = self.feedback.snapshot(conversation_id.clone()); + let snapshot = self.feedback.snapshot(conversation_id); let thread_id = snapshot.thread_id.clone(); let validated_rollout_path = if include_logs { - if let Some(path) = rollout_path { - match self - .validate_rollout_path(path, conversation_id.clone()) - .await - { - Ok(validated) => Some(validated), - Err(err) => { - self.outgoing.send_error(request_id, err).await; - return; - } - } - } else { - None + match conversation_id { + Some(conv_id) => self.resolve_rollout_path(conv_id).await, + None => None, } } else { None @@ -1462,9 +1450,7 @@ impl CodexMessageProcessor { let cli_version = env!("CARGO_PKG_VERSION").to_string(); let upload_result = tokio::task::spawn_blocking(move || { - let rollout_path_ref = validated_rollout_path - .as_ref() - .map(std::path::PathBuf::as_path); + let rollout_path_ref = validated_rollout_path.as_deref(); snapshot.upload_feedback( &classification, reason.as_deref(), @@ -1504,57 +1490,15 @@ impl CodexMessageProcessor { } } - async fn validate_rollout_path( - &self, - path: PathBuf, - conversation_id: Option, - ) -> Result { - let rollout_folder = self.config.codex_home.join(codex_core::SESSIONS_SUBDIR); - let canonical = match fs::canonicalize(&path).await { - Ok(c) => c, - Err(err) => { - return Err(JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!("failed to resolve rollout path `{}`: {err}", path.display()), - data: None, - }); - } - }; - - if !canonical.starts_with(&rollout_folder) { - return Err(JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!( - "rollout path `{}` must be in sessions directory", - path.display() - ), - data: None, - }); - } - - if let Some(conversation_id) = conversation_id { - let expected_suffix = format!("{conversation_id}.jsonl"); - let Some(file_name) = canonical.file_name().and_then(|name| name.to_str()) else { - return Err(JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!("rollout path `{}` missing file name", path.display()), - data: None, - }); - }; - - if !file_name.ends_with(expected_suffix.as_str()) { - return Err(JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!( - "rollout path `{}` must end with `{expected_suffix}`", - path.display() - ), - data: None, - }); - } + async fn resolve_rollout_path(&self, conversation_id: ConversationId) -> Option { + match self + .conversation_manager + .get_conversation(conversation_id) + .await + { + Ok(conv) => Some(conv.rollout_path()), + Err(_) => None, } - - Ok(canonical) } } diff --git a/codex-rs/app-server/tests/suite/feedback.rs b/codex-rs/app-server/tests/suite/feedback.rs index 1ec37a415f2..764daf9082c 100644 --- a/codex-rs/app-server/tests/suite/feedback.rs +++ b/codex-rs/app-server/tests/suite/feedback.rs @@ -3,10 +3,7 @@ use std::time::Duration; use anyhow::Result; use app_test_support::McpProcess; -use app_test_support::create_final_assistant_message_sse_response; -use app_test_support::create_mock_chat_completions_server; use app_test_support::to_response; -use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::NewConversationParams; use codex_app_server_protocol::NewConversationResponse; @@ -21,8 +18,7 @@ const INVALID_REQUEST_ERROR_CODE: i64 = -32600; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn upload_feedback_succeeds() -> Result<()> { - let responses = vec![create_final_assistant_message_sse_response("Done")?]; - let server = create_mock_chat_completions_server(responses).await; + let server = create_mock_chat_completions_server(vec![]).await; let codex_home = TempDir::new()?; create_config_toml(codex_home.path(), &server.uri())?; @@ -39,9 +35,7 @@ async fn upload_feedback_succeeds() -> Result<()> { ) .await??; let NewConversationResponse { - conversation_id, - rollout_path, - .. + conversation_id, .. } = to_response::(new_response)?; let upload_id = mcp @@ -50,7 +44,6 @@ async fn upload_feedback_succeeds() -> Result<()> { reason: Some("it broke".to_string()), conversation_id: Some(conversation_id), include_logs: true, - rollout_path: Some(rollout_path), }) .await?; @@ -69,45 +62,7 @@ async fn upload_feedback_succeeds() -> Result<()> { Ok(()) } -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn upload_feedback_rejects_invalid_rollout_path() -> Result<()> { - let codex_home = TempDir::new()?; - create_config_toml(codex_home.path(), "http://127.0.0.1")?; - - let mut mcp = McpProcess::new(codex_home.path()).await?; - timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; - - let invalid_dir = codex_home.path().join("invalid"); - std::fs::create_dir_all(&invalid_dir)?; - let invalid_path = invalid_dir.join("fake.jsonl"); - std::fs::write(&invalid_path, "[]")?; - - let request_id = mcp - .send_upload_feedback_request(UploadFeedbackParams { - classification: "bug".to_string(), - reason: None, - conversation_id: None, - include_logs: true, - rollout_path: Some(invalid_path.clone()), - }) - .await?; - - let error: JSONRPCError = timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_error_message(RequestId::Integer(request_id)), - ) - .await??; - - assert_eq!(error.error.code, INVALID_REQUEST_ERROR_CODE); - assert_eq!( - error.error.message, - format!( - "rollout path `{}` must be in sessions directory", - invalid_path.display() - ) - ); - Ok(()) -} +// Removed invalid rollout path test: rollout path is now resolved server-side. fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { let config_toml = codex_home.join("config.toml"); diff --git a/codex-rs/core/src/codex_conversation.rs b/codex-rs/core/src/codex_conversation.rs index d3b00046fc0..0ccbf24e6c2 100644 --- a/codex-rs/core/src/codex_conversation.rs +++ b/codex-rs/core/src/codex_conversation.rs @@ -3,16 +3,18 @@ use crate::error::Result as CodexResult; use crate::protocol::Event; use crate::protocol::Op; use crate::protocol::Submission; +use std::path::PathBuf; pub struct CodexConversation { codex: Codex, + rollout_path: PathBuf, } /// Conduit for the bidirectional stream of messages that compose a conversation /// in Codex. impl CodexConversation { - pub(crate) fn new(codex: Codex) -> Self { - Self { codex } + pub(crate) fn new(codex: Codex, rollout_path: PathBuf) -> Self { + Self { codex, rollout_path } } pub async fn submit(&self, op: Op) -> CodexResult { @@ -27,4 +29,8 @@ impl CodexConversation { pub async fn next_event(&self) -> CodexResult { self.codex.next_event().await } + + pub fn rollout_path(&self) -> PathBuf { + self.rollout_path.clone() + } } diff --git a/codex-rs/core/src/conversation_manager.rs b/codex-rs/core/src/conversation_manager.rs index a30038f2ebe..b911526f68e 100644 --- a/codex-rs/core/src/conversation_manager.rs +++ b/codex-rs/core/src/conversation_manager.rs @@ -98,7 +98,10 @@ impl ConversationManager { } }; - let conversation = Arc::new(CodexConversation::new(codex)); + let conversation = Arc::new(CodexConversation::new( + codex, + session_configured.rollout_path.clone(), + )); self.conversations .write() .await From 9a6bb5b4a8ea07545449b6c18d79b15480224bf3 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Sat, 25 Oct 2025 14:14:54 -0700 Subject: [PATCH 03/12] changes --- codex-rs/core/src/codex_conversation.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/codex-rs/core/src/codex_conversation.rs b/codex-rs/core/src/codex_conversation.rs index 0ccbf24e6c2..5bb9c97c5b9 100644 --- a/codex-rs/core/src/codex_conversation.rs +++ b/codex-rs/core/src/codex_conversation.rs @@ -14,7 +14,10 @@ pub struct CodexConversation { /// in Codex. impl CodexConversation { pub(crate) fn new(codex: Codex, rollout_path: PathBuf) -> Self { - Self { codex, rollout_path } + Self { + codex, + rollout_path, + } } pub async fn submit(&self, op: Op) -> CodexResult { From 4e5927ac999277ed6e6961e50ac3236fefa5c56a Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Sat, 25 Oct 2025 14:18:42 -0700 Subject: [PATCH 04/12] imports --- codex-rs/app-server/tests/suite/feedback.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/codex-rs/app-server/tests/suite/feedback.rs b/codex-rs/app-server/tests/suite/feedback.rs index 764daf9082c..feca01717f1 100644 --- a/codex-rs/app-server/tests/suite/feedback.rs +++ b/codex-rs/app-server/tests/suite/feedback.rs @@ -3,6 +3,7 @@ use std::time::Duration; use anyhow::Result; use app_test_support::McpProcess; +use app_test_support::create_mock_chat_completions_server; use app_test_support::to_response; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::NewConversationParams; From 3349b769e4647087504ad8f6a79e4b842fd4857f Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Sat, 25 Oct 2025 14:20:58 -0700 Subject: [PATCH 05/12] imports --- codex-rs/app-server/tests/suite/feedback.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/codex-rs/app-server/tests/suite/feedback.rs b/codex-rs/app-server/tests/suite/feedback.rs index feca01717f1..1b0bdb36e32 100644 --- a/codex-rs/app-server/tests/suite/feedback.rs +++ b/codex-rs/app-server/tests/suite/feedback.rs @@ -14,8 +14,7 @@ use codex_app_server_protocol::UploadFeedbackResponse; use tempfile::TempDir; use tokio::time::timeout; -const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10); -const INVALID_REQUEST_ERROR_CODE: i64 = -32600; +const DEFAULT_READ_TIMEOUT: Duration = Duration::from_millis(500); #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn upload_feedback_succeeds() -> Result<()> { From ec1781538eb182005b375e6e07285b0ea035b955 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Sat, 25 Oct 2025 14:21:28 -0700 Subject: [PATCH 06/12] imports --- codex-rs/app-server/tests/suite/feedback.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/codex-rs/app-server/tests/suite/feedback.rs b/codex-rs/app-server/tests/suite/feedback.rs index 1b0bdb36e32..f34a9555902 100644 --- a/codex-rs/app-server/tests/suite/feedback.rs +++ b/codex-rs/app-server/tests/suite/feedback.rs @@ -62,8 +62,6 @@ async fn upload_feedback_succeeds() -> Result<()> { Ok(()) } -// Removed invalid rollout path test: rollout path is now resolved server-side. - fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { let config_toml = codex_home.join("config.toml"); std::fs::write( From 74d62b662dcaec13c4da20fbcaad291744221c40 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Sat, 25 Oct 2025 14:50:44 -0700 Subject: [PATCH 07/12] remove test --- codex-rs/app-server/tests/suite/feedback.rs | 86 --------------------- 1 file changed, 86 deletions(-) delete mode 100644 codex-rs/app-server/tests/suite/feedback.rs diff --git a/codex-rs/app-server/tests/suite/feedback.rs b/codex-rs/app-server/tests/suite/feedback.rs deleted file mode 100644 index f34a9555902..00000000000 --- a/codex-rs/app-server/tests/suite/feedback.rs +++ /dev/null @@ -1,86 +0,0 @@ -use std::path::Path; -use std::time::Duration; - -use anyhow::Result; -use app_test_support::McpProcess; -use app_test_support::create_mock_chat_completions_server; -use app_test_support::to_response; -use codex_app_server_protocol::JSONRPCResponse; -use codex_app_server_protocol::NewConversationParams; -use codex_app_server_protocol::NewConversationResponse; -use codex_app_server_protocol::RequestId; -use codex_app_server_protocol::UploadFeedbackParams; -use codex_app_server_protocol::UploadFeedbackResponse; -use tempfile::TempDir; -use tokio::time::timeout; - -const DEFAULT_READ_TIMEOUT: Duration = Duration::from_millis(500); - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn upload_feedback_succeeds() -> Result<()> { - let server = create_mock_chat_completions_server(vec![]).await; - - let codex_home = TempDir::new()?; - create_config_toml(codex_home.path(), &server.uri())?; - - let mut mcp = McpProcess::new(codex_home.path()).await?; - timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; - - let new_id = mcp - .send_new_conversation_request(NewConversationParams::default()) - .await?; - let new_response: JSONRPCResponse = timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(new_id)), - ) - .await??; - let NewConversationResponse { - conversation_id, .. - } = to_response::(new_response)?; - - let upload_id = mcp - .send_upload_feedback_request(UploadFeedbackParams { - classification: "bug".to_string(), - reason: Some("it broke".to_string()), - conversation_id: Some(conversation_id), - include_logs: true, - }) - .await?; - - let upload_response: JSONRPCResponse = timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(upload_id)), - ) - .await??; - let UploadFeedbackResponse { thread_id } = - to_response::(upload_response)?; - - assert!( - !thread_id.is_empty(), - "thread id should be returned by upload feedback" - ); - Ok(()) -} - -fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { - let config_toml = codex_home.join("config.toml"); - std::fs::write( - config_toml, - format!( - r#" -model = "mock-model" -approval_policy = "never" -sandbox_mode = "danger-full-access" - -model_provider = "mock_provider" - -[model_providers.mock_provider] -name = "Mock provider for test" -base_url = "{server_uri}/v1" -wire_api = "chat" -request_max_retries = 0 -stream_max_retries = 0 -"# - ), - ) -} From 5c9700aa9bec033993cae8032254a88017cbae4f Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Sat, 25 Oct 2025 15:52:51 -0700 Subject: [PATCH 08/12] feedback --- codex-rs/app-server/src/codex_message_processor.rs | 2 -- codex-rs/app-server/tests/suite/mod.rs | 1 - codex-rs/feedback/src/lib.rs | 2 +- codex-rs/tui/src/bottom_pane/feedback_view.rs | 2 -- 4 files changed, 1 insertion(+), 6 deletions(-) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index aa584ec67a0..59dd0f63e5e 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -1448,13 +1448,11 @@ impl CodexMessageProcessor { None }; - let cli_version = env!("CARGO_PKG_VERSION").to_string(); let upload_result = tokio::task::spawn_blocking(move || { let rollout_path_ref = validated_rollout_path.as_deref(); snapshot.upload_feedback( &classification, reason.as_deref(), - cli_version.as_str(), include_logs, rollout_path_ref, ) diff --git a/codex-rs/app-server/tests/suite/mod.rs b/codex-rs/app-server/tests/suite/mod.rs index 091261f7c10..c8763cd0c95 100644 --- a/codex-rs/app-server/tests/suite/mod.rs +++ b/codex-rs/app-server/tests/suite/mod.rs @@ -3,7 +3,6 @@ mod auth; mod codex_message_processor_flow; mod config; mod create_conversation; -mod feedback; mod fuzzy_file_search; mod interrupt; mod list_resume; diff --git a/codex-rs/feedback/src/lib.rs b/codex-rs/feedback/src/lib.rs index b1adb3db3f1..e1ccc3aac9b 100644 --- a/codex-rs/feedback/src/lib.rs +++ b/codex-rs/feedback/src/lib.rs @@ -172,7 +172,6 @@ impl CodexLogSnapshot { &self, classification: &str, reason: Option<&str>, - cli_version: &str, include_logs: bool, rollout_path: Option<&std::path::Path>, ) -> Result<()> { @@ -198,6 +197,7 @@ impl CodexLogSnapshot { ..Default::default() }); + let cli_version = env!("CARGO_PKG_VERSION"); let mut tags = BTreeMap::from([ (String::from("thread_id"), self.thread_id.to_string()), (String::from("classification"), classification.to_string()), diff --git a/codex-rs/tui/src/bottom_pane/feedback_view.rs b/codex-rs/tui/src/bottom_pane/feedback_view.rs index 234dbbd253e..0d42c0bebcd 100644 --- a/codex-rs/tui/src/bottom_pane/feedback_view.rs +++ b/codex-rs/tui/src/bottom_pane/feedback_view.rs @@ -73,13 +73,11 @@ impl FeedbackNoteView { let rollout_path_ref = self.rollout_path.as_deref(); let classification = feedback_classification(self.category); - let cli_version = crate::version::CODEX_CLI_VERSION; let mut thread_id = self.snapshot.thread_id.clone(); let result = self.snapshot.upload_feedback( classification, reason_opt, - cli_version, self.include_logs, if self.include_logs { rollout_path_ref From 259536e99aeb5c381350df667e0fcf468e6202a3 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Sun, 26 Oct 2025 17:59:11 -0700 Subject: [PATCH 09/12] feedback --- codex-rs/core/src/codex.rs | 29 +------------------ codex-rs/core/src/rollout/policy.rs | 1 - .../core/tests/suite/compact_resume_fork.rs | 29 +++++-------------- .../core/tests/suite/fork_conversation.rs | 29 ++----------------- codex-rs/core/tests/suite/review.rs | 17 ++--------- .../src/event_processor_with_human_output.rs | 1 - codex-rs/mcp-server/src/codex_tool_runner.rs | 1 - codex-rs/protocol/src/protocol.rs | 6 ---- codex-rs/tui/src/app_backtrack.rs | 13 +++++++-- codex-rs/tui/src/chatwidget.rs | 8 ++--- 10 files changed, 27 insertions(+), 107 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index d89ce5f8868..d27f081703a 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -20,7 +20,6 @@ use async_channel::Sender; use codex_apply_patch::ApplyPatchAction; use codex_protocol::ConversationId; use codex_protocol::items::TurnItem; -use codex_protocol::protocol::ConversationPathResponseEvent; use codex_protocol::protocol::ExitedReviewModeEvent; use codex_protocol::protocol::ItemCompletedEvent; use codex_protocol::protocol::ItemStartedEvent; @@ -1401,33 +1400,7 @@ async fn submission_loop(sess: Arc, config: Arc, rx_sub: Receiv sess.send_event_raw(event).await; break; } - Op::GetPath => { - let sub_id = sub.id.clone(); - // Flush rollout writes before returning the path so readers observe a consistent file. - let (path, rec_opt) = { - let guard = sess.services.rollout.lock().await; - match guard.as_ref() { - Some(rec) => (rec.get_rollout_path(), Some(rec.clone())), - None => { - error!("rollout recorder not found"); - continue; - } - } - }; - if let Some(rec) = rec_opt - && let Err(e) = rec.flush().await - { - warn!("failed to flush rollout recorder before GetHistory: {e}"); - } - let event = Event { - id: sub_id.clone(), - msg: EventMsg::ConversationPath(ConversationPathResponseEvent { - conversation_id: sess.conversation_id, - path, - }), - }; - sess.send_event_raw(event).await; - } + Op::Review { review_request } => { let turn_context = sess .new_turn_with_sub_id(sub.id.clone(), SessionSettingsUpdate::default()) diff --git a/codex-rs/core/src/rollout/policy.rs b/codex-rs/core/src/rollout/policy.rs index ea2954fac52..64688de9a55 100644 --- a/codex-rs/core/src/rollout/policy.rs +++ b/codex-rs/core/src/rollout/policy.rs @@ -72,7 +72,6 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::PlanUpdate(_) | EventMsg::ShutdownComplete | EventMsg::ViewImageToolCall(_) - | EventMsg::ConversationPath(_) | EventMsg::ItemStarted(_) | EventMsg::ItemCompleted(_) => false, } diff --git a/codex-rs/core/tests/suite/compact_resume_fork.rs b/codex-rs/core/tests/suite/compact_resume_fork.rs index 4261a305103..cea03ee751e 100644 --- a/codex-rs/core/tests/suite/compact_resume_fork.rs +++ b/codex-rs/core/tests/suite/compact_resume_fork.rs @@ -18,7 +18,6 @@ use codex_core::built_in_model_providers; use codex_core::codex::compact::SUMMARIZATION_PROMPT; use codex_core::config::Config; use codex_core::config::OPENAI_DEFAULT_MODEL; -use codex_core::protocol::ConversationPathResponseEvent; use codex_core::protocol::EventMsg; use codex_core::protocol::Op; use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; @@ -61,7 +60,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() { user_turn(&base, "hello world").await; compact_conversation(&base).await; user_turn(&base, "AFTER_COMPACT").await; - let base_path = fetch_conversation_path(&base, "base conversation").await; + let base_path = fetch_conversation_path(&base).await; assert!( base_path.exists(), "compact+resume test expects base path {base_path:?} to exist", @@ -69,7 +68,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() { let resumed = resume_conversation(&manager, &config, base_path).await; user_turn(&resumed, "AFTER_RESUME").await; - let resumed_path = fetch_conversation_path(&resumed, "resumed conversation").await; + let resumed_path = fetch_conversation_path(&resumed).await; assert!( resumed_path.exists(), "compact+resume test expects resumed path {resumed_path:?} to exist", @@ -518,7 +517,7 @@ async fn compact_resume_after_second_compaction_preserves_history() { user_turn(&base, "hello world").await; compact_conversation(&base).await; user_turn(&base, "AFTER_COMPACT").await; - let base_path = fetch_conversation_path(&base, "base conversation").await; + let base_path = fetch_conversation_path(&base).await; assert!( base_path.exists(), "second compact test expects base path {base_path:?} to exist", @@ -526,7 +525,7 @@ async fn compact_resume_after_second_compaction_preserves_history() { let resumed = resume_conversation(&manager, &config, base_path).await; user_turn(&resumed, "AFTER_RESUME").await; - let resumed_path = fetch_conversation_path(&resumed, "resumed conversation").await; + let resumed_path = fetch_conversation_path(&resumed).await; assert!( resumed_path.exists(), "second compact test expects resumed path {resumed_path:?} to exist", @@ -537,7 +536,7 @@ async fn compact_resume_after_second_compaction_preserves_history() { compact_conversation(&forked).await; user_turn(&forked, "AFTER_COMPACT_2").await; - let forked_path = fetch_conversation_path(&forked, "forked conversation").await; + let forked_path = fetch_conversation_path(&forked).await; assert!( forked_path.exists(), "second compact test expects forked path {forked_path:?} to exist", @@ -792,22 +791,8 @@ async fn compact_conversation(conversation: &Arc) { wait_for_event(conversation, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; } -async fn fetch_conversation_path( - conversation: &Arc, - context: &str, -) -> std::path::PathBuf { - conversation - .submit(Op::GetPath) - .await - .expect("request conversation path"); - match wait_for_event(conversation, |ev| { - matches!(ev, EventMsg::ConversationPath(_)) - }) - .await - { - EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path, - _ => panic!("expected ConversationPath event for {context}"), - } +async fn fetch_conversation_path(conversation: &Arc) -> std::path::PathBuf { + conversation.rollout_path() } async fn resume_conversation( diff --git a/codex-rs/core/tests/suite/fork_conversation.rs b/codex-rs/core/tests/suite/fork_conversation.rs index da2ff8c3f6d..75b37ae7ef2 100644 --- a/codex-rs/core/tests/suite/fork_conversation.rs +++ b/codex-rs/core/tests/suite/fork_conversation.rs @@ -4,7 +4,6 @@ use codex_core::ModelProviderInfo; use codex_core::NewConversation; use codex_core::built_in_model_providers; use codex_core::parse_turn_item; -use codex_core::protocol::ConversationPathResponseEvent; use codex_core::protocol::EventMsg; use codex_core::protocol::Op; use codex_core::protocol::RolloutItem; @@ -79,13 +78,7 @@ async fn fork_conversation_twice_drops_to_first_message() { } // Request history from the base conversation to obtain rollout path. - codex.submit(Op::GetPath).await.unwrap(); - let base_history = - wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationPath(_))).await; - let base_path = match &base_history { - EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path.clone(), - _ => panic!("expected ConversationHistory event"), - }; + let base_path = codex.rollout_path(); // GetHistory flushes before returning the path; no wait needed. @@ -140,15 +133,7 @@ async fn fork_conversation_twice_drops_to_first_message() { .await .expect("fork 1"); - codex_fork1.submit(Op::GetPath).await.unwrap(); - let fork1_history = wait_for_event(&codex_fork1, |ev| { - matches!(ev, EventMsg::ConversationPath(_)) - }) - .await; - let fork1_path = match &fork1_history { - EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path.clone(), - _ => panic!("expected ConversationHistory event after first fork"), - }; + let fork1_path = codex_fork1.rollout_path(); // GetHistory on fork1 flushed; the file is ready. let fork1_items = read_items(&fork1_path); @@ -166,15 +151,7 @@ async fn fork_conversation_twice_drops_to_first_message() { .await .expect("fork 2"); - codex_fork2.submit(Op::GetPath).await.unwrap(); - let fork2_history = wait_for_event(&codex_fork2, |ev| { - matches!(ev, EventMsg::ConversationPath(_)) - }) - .await; - let fork2_path = match &fork2_history { - EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path.clone(), - _ => panic!("expected ConversationHistory event after second fork"), - }; + let fork2_path = codex_fork2.rollout_path(); // GetHistory on fork2 flushed; the file is ready. let fork1_items = read_items(&fork1_path); let fork1_user_inputs = find_user_input_positions(&fork1_items); diff --git a/codex-rs/core/tests/suite/review.rs b/codex-rs/core/tests/suite/review.rs index 422acd922f7..5577de7caa0 100644 --- a/codex-rs/core/tests/suite/review.rs +++ b/codex-rs/core/tests/suite/review.rs @@ -7,7 +7,6 @@ use codex_core::REVIEW_PROMPT; use codex_core::ResponseItem; use codex_core::built_in_model_providers; use codex_core::config::Config; -use codex_core::protocol::ConversationPathResponseEvent; use codex_core::protocol::ENVIRONMENT_CONTEXT_OPEN_TAG; use codex_core::protocol::EventMsg; use codex_core::protocol::ExitedReviewModeEvent; @@ -120,13 +119,7 @@ async fn review_op_emits_lifecycle_and_review_output() { // Also verify that a user message with the header and a formatted finding // was recorded back in the parent session's rollout. - codex.submit(Op::GetPath).await.unwrap(); - let history_event = - wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationPath(_))).await; - let path = match history_event { - EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path, - other => panic!("expected ConversationPath event, got {other:?}"), - }; + let path = codex.rollout_path(); let text = std::fs::read_to_string(&path).expect("read rollout file"); let mut saw_header = false; @@ -482,13 +475,7 @@ async fn review_input_isolated_from_parent_history() { assert_eq!(instructions, REVIEW_PROMPT); // Also verify that a user interruption note was recorded in the rollout. - codex.submit(Op::GetPath).await.unwrap(); - let history_event = - wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationPath(_))).await; - let path = match history_event { - EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path, - other => panic!("expected ConversationPath event, got {other:?}"), - }; + let path = codex.rollout_path(); let text = std::fs::read_to_string(&path).expect("read rollout file"); let mut saw_interruption_message = false; for line in text.lines() { diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index b07cd16d4bc..bd37a438fb1 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -510,7 +510,6 @@ impl EventProcessor for EventProcessorWithHumanOutput { } }, EventMsg::ShutdownComplete => return CodexStatus::Shutdown, - EventMsg::ConversationPath(_) => {} EventMsg::UserMessage(_) => {} EventMsg::EnteredReviewMode(_) => {} EventMsg::ExitedReviewMode(_) => {} diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index a6af754d21d..9f826f4e860 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -281,7 +281,6 @@ async fn run_codex_tool_session_inner( | EventMsg::GetHistoryEntryResponse(_) | EventMsg::PlanUpdate(_) | EventMsg::TurnAborted(_) - | EventMsg::ConversationPath(_) | EventMsg::UserMessage(_) | EventMsg::ShutdownComplete | EventMsg::ViewImageToolCall(_) diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 2334561e0e7..3cf16bb840e 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -166,10 +166,6 @@ pub enum Op { /// Request a single history entry identified by `log_id` + `offset`. GetHistoryEntryRequest { offset: usize, log_id: u64 }, - /// Request the full in-memory conversation transcript for the current session. - /// Reply is delivered via `EventMsg::ConversationHistory`. - GetPath, - /// Request the list of MCP tools available across all configured servers. /// Reply is delivered via `EventMsg::McpListToolsResponse`. ListMcpTools, @@ -519,8 +515,6 @@ pub enum EventMsg { /// Notification that the agent is shutting down. ShutdownComplete, - ConversationPath(ConversationPathResponseEvent), - /// Entered review mode. EnteredReviewMode(ReviewRequest), diff --git a/codex-rs/tui/src/app_backtrack.rs b/codex-rs/tui/src/app_backtrack.rs index b3e948ecb38..e2ded3d3ef5 100644 --- a/codex-rs/tui/src/app_backtrack.rs +++ b/codex-rs/tui/src/app_backtrack.rs @@ -103,9 +103,16 @@ impl App { nth_user_message: usize, ) { self.backtrack.pending = Some((base_id, nth_user_message, prefill)); - self.app_event_tx.send(crate::app_event::AppEvent::CodexOp( - codex_core::protocol::Op::GetPath, - )); + if let Some(path) = self.chat_widget.rollout_path() { + let ev = ConversationPathResponseEvent { + conversation_id: base_id, + path, + }; + self.app_event_tx + .send(crate::app_event::AppEvent::ConversationHistory(ev)); + } else { + tracing::error!("rollout path unavailable; cannot backtrack"); + } } /// Open transcript overlay (enters alternate screen and shows full transcript). diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index fdebe158261..64dc89d8841 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -1516,10 +1516,6 @@ impl ChatWidget { self.on_user_message_event(ev); } } - EventMsg::ConversationPath(ev) => { - self.app_event_tx - .send(crate::app_event::AppEvent::ConversationHistory(ev)); - } EventMsg::EnteredReviewMode(review_request) => { self.on_entered_review_mode(review_request) } @@ -2250,6 +2246,10 @@ impl ChatWidget { self.conversation_id } + pub(crate) fn rollout_path(&self) -> Option { + self.current_rollout_path.clone() + } + /// Return a reference to the widget's current config (includes any /// runtime overrides applied via TUI, e.g., model or approval policy). pub(crate) fn config_ref(&self) -> &Config { From 1a1a2257009c9512b7e8bdfb97763635ef59cd52 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Sun, 26 Oct 2025 18:03:51 -0700 Subject: [PATCH 10/12] feedback --- codex-rs/core/src/rollout/recorder.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/codex-rs/core/src/rollout/recorder.rs b/codex-rs/core/src/rollout/recorder.rs index 95f5d479b8d..93b614f9225 100644 --- a/codex-rs/core/src/rollout/recorder.rs +++ b/codex-rs/core/src/rollout/recorder.rs @@ -267,10 +267,6 @@ impl RolloutRecorder { })) } - pub(crate) fn get_rollout_path(&self) -> PathBuf { - self.rollout_path.clone() - } - pub async fn shutdown(&self) -> std::io::Result<()> { let (tx_done, rx_done) = oneshot::channel(); match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await { From 07dba9a5a30254defc27d553a85fb90d86b468d2 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Sun, 26 Oct 2025 20:32:57 -0700 Subject: [PATCH 11/12] tests --- codex-rs/core/src/codex.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index d27f081703a..9d76f186168 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -597,6 +597,18 @@ impl Session { self.tx_event.clone() } + /// Ensure all rollout writes are durably flushed. + pub(crate) async fn flush_rollout(&self) { + let recorder = { + let guard = self.services.rollout.lock().await; + guard.clone() + }; + if let Some(rec) = recorder + && let Err(e) = rec.flush().await { + warn!("failed to flush rollout recorder: {e}"); + } + } + fn next_internal_sub_id(&self) -> String { let id = self .next_internal_sub_id @@ -611,6 +623,8 @@ impl Session { // Build and record initial items (user instructions + environment context) let items = self.build_initial_context(&turn_context); self.record_conversation_items(&turn_context, &items).await; + // Ensure initial items are visible to immediate readers (e.g., tests, forks). + self.flush_rollout().await; } InitialHistory::Resumed(_) | InitialHistory::Forked(_) => { let rollout_items = conversation_history.get_rollout_items(); @@ -627,6 +641,8 @@ impl Session { if persist && !rollout_items.is_empty() { self.persist_rollout_items(&rollout_items).await; } + // Flush after seeding history and any persisted rollout copy. + self.flush_rollout().await; } } } @@ -2204,6 +2220,8 @@ pub(crate) async fn exit_review_mode( }], ) .await; + // Make the recorded review note visible immediately for readers. + session.flush_rollout().await; } fn mcp_init_error_display( From 77e4647fa07da2e55a37e33568880d3e6a915fd1 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Sun, 26 Oct 2025 20:35:00 -0700 Subject: [PATCH 12/12] tests --- codex-rs/core/src/codex.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 9d76f186168..a7134c9c9c6 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -604,9 +604,10 @@ impl Session { guard.clone() }; if let Some(rec) = recorder - && let Err(e) = rec.flush().await { - warn!("failed to flush rollout recorder: {e}"); - } + && let Err(e) = rec.flush().await + { + warn!("failed to flush rollout recorder: {e}"); + } } fn next_internal_sub_id(&self) -> String {