Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions codex-rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions codex-rs/app-server-protocol/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@ client_request_definitions! {
response: GetAccountRateLimitsResponse,
},

#[serde(rename = "feedback/upload")]
#[ts(rename = "feedback/upload")]
UploadFeedback {
params: UploadFeedbackParams,
response: UploadFeedbackResponse,
},

#[serde(rename = "account/read")]
#[ts(rename = "account/read")]
GetAccount {
Expand Down Expand Up @@ -378,6 +385,23 @@ pub struct ListModelsResponse {
pub next_cursor: Option<String>,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
pub struct UploadFeedbackParams {
pub classification: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub conversation_id: Option<ConversationId>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thread_id?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still call it ConversationId tho

pub include_logs: bool,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
pub struct UploadFeedbackResponse {
pub thread_id: String,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(tag = "type")]
#[ts(tag = "type")]
Expand Down
1 change: 1 addition & 0 deletions codex-rs/app-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ codex-file-search = { workspace = true }
codex-login = { workspace = true }
codex-protocol = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-feedback = { workspace = true }
codex-utils-json-to-toml = { workspace = true }
chrono = { workspace = true }
serde = { workspace = true, features = ["derive"] }
Expand Down
80 changes: 80 additions & 0 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::SessionConfiguredNotification;
use codex_app_server_protocol::SetDefaultModelParams;
use codex_app_server_protocol::SetDefaultModelResponse;
use codex_app_server_protocol::UploadFeedbackParams;
use codex_app_server_protocol::UploadFeedbackResponse;
use codex_app_server_protocol::UserInfoResponse;
use codex_app_server_protocol::UserSavedConfig;
use codex_backend_client::Client as BackendClient;
Expand Down Expand Up @@ -85,6 +87,7 @@ use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecApprovalRequestEvent;
use codex_core::protocol::Op;
use codex_core::protocol::ReviewDecision;
use codex_feedback::CodexFeedback;
use codex_login::ServerOptions as LoginServerOptions;
use codex_login::ShutdownHandle;
use codex_login::run_login_server;
Expand Down Expand Up @@ -136,6 +139,7 @@ pub(crate) struct CodexMessageProcessor {
// Queue of pending interrupt requests per conversation. We reply when TurnAborted arrives.
pending_interrupts: Arc<Mutex<HashMap<ConversationId, Vec<RequestId>>>>,
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
feedback: CodexFeedback,
}

impl CodexMessageProcessor {
Expand All @@ -145,6 +149,7 @@ impl CodexMessageProcessor {
outgoing: Arc<OutgoingMessageSender>,
codex_linux_sandbox_exe: Option<PathBuf>,
config: Arc<Config>,
feedback: CodexFeedback,
) -> Self {
Self {
auth_manager,
Expand All @@ -156,6 +161,7 @@ impl CodexMessageProcessor {
active_login: Arc::new(Mutex::new(None)),
pending_interrupts: Arc::new(Mutex::new(HashMap::new())),
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
feedback,
}
}

Expand Down Expand Up @@ -275,6 +281,9 @@ impl CodexMessageProcessor {
} => {
self.get_account_rate_limits(request_id).await;
}
ClientRequest::UploadFeedback { request_id, params } => {
self.upload_feedback(request_id, params).await;
}
}
}

Expand Down Expand Up @@ -1418,6 +1427,77 @@ impl CodexMessageProcessor {
let response = FuzzyFileSearchResponse { files: results };
self.outgoing.send_response(request_id, response).await;
}

async fn upload_feedback(&self, request_id: RequestId, params: UploadFeedbackParams) {
let UploadFeedbackParams {
classification,
reason,
conversation_id,
include_logs,
} = params;

let snapshot = self.feedback.snapshot(conversation_id);
let thread_id = snapshot.thread_id.clone();

let validated_rollout_path = if include_logs {
match conversation_id {
Some(conv_id) => self.resolve_rollout_path(conv_id).await,
None => None,
}
} else {
None
};

let upload_result = tokio::task::spawn_blocking(move || {
let rollout_path_ref = validated_rollout_path.as_deref();
snapshot.upload_feedback(
&classification,
reason.as_deref(),
include_logs,
rollout_path_ref,
)
})
.await;

let upload_result = match upload_result {
Ok(result) => result,
Err(join_err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to upload feedback: {join_err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};

match upload_result {
Ok(()) => {
let response = UploadFeedbackResponse { thread_id };
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to upload feedback: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
}

async fn resolve_rollout_path(&self, conversation_id: ConversationId) -> Option<PathBuf> {
match self
.conversation_manager
.get_conversation(conversation_id)
.await
{
Ok(conv) => Some(conv.rollout_path()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we have an Op to get rollout path? Let's stick to it even though I don't like it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Op::GetPath

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot receive the events here because we are doing it in a loop

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use this new method in other callsites?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Different PR?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All callers of codex.submit(Op::GetPath) can be find-replaced to codex.rollout_path()

Err(_) => None,
}
}
}

async fn apply_bespoke_event_handling(
Expand Down
13 changes: 13 additions & 0 deletions codex-rs/app-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ use crate::message_processor::MessageProcessor;
use crate::outgoing_message::OutgoingMessage;
use crate::outgoing_message::OutgoingMessageSender;
use codex_app_server_protocol::JSONRPCMessage;
use codex_feedback::CodexFeedback;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::io::{self};
use tokio::sync::mpsc;
use tracing::Level;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::Layer;
use tracing_subscriber::filter::Targets;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

Expand Down Expand Up @@ -82,6 +85,8 @@ pub async fn run_main(
std::io::Error::new(ErrorKind::InvalidData, format!("error loading config: {e}"))
})?;

let feedback = CodexFeedback::new();

let otel =
codex_core::otel_init::build_provider(&config, env!("CARGO_PKG_VERSION")).map_err(|e| {
std::io::Error::new(
Expand All @@ -96,8 +101,15 @@ pub async fn run_main(
.with_writer(std::io::stderr)
.with_filter(EnvFilter::from_default_env());

let feedback_layer = tracing_subscriber::fmt::layer()
.with_writer(feedback.make_writer())
.with_ansi(false)
.with_target(false)
.with_filter(Targets::new().with_default(Level::TRACE));

let _ = tracing_subscriber::registry()
.with(stderr_fmt)
.with(feedback_layer)
.with(otel.as_ref().map(|provider| {
OpenTelemetryTracingBridge::new(&provider.logger).with_filter(
tracing_subscriber::filter::filter_fn(codex_core::otel_init::codex_export_filter),
Expand All @@ -112,6 +124,7 @@ pub async fn run_main(
outgoing_message_sender,
codex_linux_sandbox_exe,
std::sync::Arc::new(config),
feedback.clone(),
);
async move {
while let Some(msg) = incoming_rx.recv().await {
Expand Down
3 changes: 3 additions & 0 deletions codex-rs/app-server/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use codex_core::ConversationManager;
use codex_core::config::Config;
use codex_core::default_client::USER_AGENT_SUFFIX;
use codex_core::default_client::get_codex_user_agent;
use codex_feedback::CodexFeedback;
use codex_protocol::protocol::SessionSource;
use std::sync::Arc;

Expand All @@ -33,6 +34,7 @@ impl MessageProcessor {
outgoing: OutgoingMessageSender,
codex_linux_sandbox_exe: Option<PathBuf>,
config: Arc<Config>,
feedback: CodexFeedback,
) -> Self {
let outgoing = Arc::new(outgoing);
let auth_manager = AuthManager::shared(config.codex_home.clone(), false);
Expand All @@ -46,6 +48,7 @@ impl MessageProcessor {
outgoing.clone(),
codex_linux_sandbox_exe,
config,
feedback,
);

Self {
Expand Down
10 changes: 10 additions & 0 deletions codex-rs/app-server/tests/common/mcp_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use codex_app_server_protocol::SendUserMessageParams;
use codex_app_server_protocol::SendUserTurnParams;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::SetDefaultModelParams;
use codex_app_server_protocol::UploadFeedbackParams;

use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage;
Expand Down Expand Up @@ -242,6 +243,15 @@ impl McpProcess {
self.send_request("account/rateLimits/read", None).await
}

/// Send a `feedback/upload` JSON-RPC request.
pub async fn send_upload_feedback_request(
&mut self,
params: UploadFeedbackParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("feedback/upload", params).await
}

/// Send a `userInfo` JSON-RPC request.
pub async fn send_user_info_request(&mut self) -> anyhow::Result<i64> {
self.send_request("userInfo", None).await
Expand Down
48 changes: 20 additions & 28 deletions codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use async_channel::Sender;
use codex_apply_patch::ApplyPatchAction;
use codex_protocol::ConversationId;
use codex_protocol::items::TurnItem;
use codex_protocol::protocol::ConversationPathResponseEvent;
use codex_protocol::protocol::ExitedReviewModeEvent;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::ItemStartedEvent;
Expand Down Expand Up @@ -598,6 +597,19 @@ impl Session {
self.tx_event.clone()
}

/// Ensure all rollout writes are durably flushed.
pub(crate) async fn flush_rollout(&self) {
let recorder = {
let guard = self.services.rollout.lock().await;
guard.clone()
};
if let Some(rec) = recorder
&& let Err(e) = rec.flush().await
{
warn!("failed to flush rollout recorder: {e}");
}
}

fn next_internal_sub_id(&self) -> String {
let id = self
.next_internal_sub_id
Expand All @@ -612,6 +624,8 @@ impl Session {
// Build and record initial items (user instructions + environment context)
let items = self.build_initial_context(&turn_context);
self.record_conversation_items(&turn_context, &items).await;
// Ensure initial items are visible to immediate readers (e.g., tests, forks).
self.flush_rollout().await;
}
InitialHistory::Resumed(_) | InitialHistory::Forked(_) => {
let rollout_items = conversation_history.get_rollout_items();
Expand All @@ -628,6 +642,8 @@ impl Session {
if persist && !rollout_items.is_empty() {
self.persist_rollout_items(&rollout_items).await;
}
// Flush after seeding history and any persisted rollout copy.
self.flush_rollout().await;
}
}
}
Expand Down Expand Up @@ -1401,33 +1417,7 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
sess.send_event_raw(event).await;
break;
}
Op::GetPath => {
let sub_id = sub.id.clone();
// Flush rollout writes before returning the path so readers observe a consistent file.
let (path, rec_opt) = {
let guard = sess.services.rollout.lock().await;
match guard.as_ref() {
Some(rec) => (rec.get_rollout_path(), Some(rec.clone())),
None => {
error!("rollout recorder not found");
continue;
}
}
};
if let Some(rec) = rec_opt
&& let Err(e) = rec.flush().await
{
warn!("failed to flush rollout recorder before GetHistory: {e}");
}
let event = Event {
id: sub_id.clone(),
msg: EventMsg::ConversationPath(ConversationPathResponseEvent {
conversation_id: sess.conversation_id,
path,
}),
};
sess.send_event_raw(event).await;
}

Op::Review { review_request } => {
let turn_context = sess
.new_turn_with_sub_id(sub.id.clone(), SessionSettingsUpdate::default())
Expand Down Expand Up @@ -2231,6 +2221,8 @@ pub(crate) async fn exit_review_mode(
}],
)
.await;
// Make the recorded review note visible immediately for readers.
session.flush_rollout().await;
}

fn mcp_init_error_display(
Expand Down
Loading
Loading