Skip to content

Commit e92c4f6

Browse files
authored
feat: async ghost commit (#5618)
1 parent 15fa228 commit e92c4f6

File tree

13 files changed

+289
-11
lines changed

13 files changed

+289
-11
lines changed

codex-rs/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

codex-rs/core/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ codex-apply-patch = { workspace = true }
2424
codex-file-search = { workspace = true }
2525
codex-otel = { workspace = true, features = ["otel"] }
2626
codex-protocol = { workspace = true }
27+
codex-git-tooling = { workspace = true }
2728
codex-rmcp-client = { workspace = true }
2829
codex-async-utils = { workspace = true }
2930
codex-utils-string = { workspace = true }
3031
codex-utils-pty = { workspace = true }
32+
codex-utils-readiness = { workspace = true }
3133
codex-utils-tokenizer = { workspace = true }
3234
dirs = { workspace = true }
3335
dunce = { workspace = true }

codex-rs/core/src/chat_completions.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ pub(crate) async fn stream_chat_completions(
7676
ResponseItem::CustomToolCall { .. } => {}
7777
ResponseItem::CustomToolCallOutput { .. } => {}
7878
ResponseItem::WebSearchCall { .. } => {}
79+
ResponseItem::GhostSnapshot { .. } => {}
7980
}
8081
}
8182

@@ -270,6 +271,10 @@ pub(crate) async fn stream_chat_completions(
270271
"content": output,
271272
}));
272273
}
274+
ResponseItem::GhostSnapshot { .. } => {
275+
// Ghost snapshots annotate history but are not sent to the model.
276+
continue;
277+
}
273278
ResponseItem::Reasoning { .. }
274279
| ResponseItem::WebSearchCall { .. }
275280
| ResponseItem::Other => {

codex-rs/core/src/codex.rs

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,11 @@ use crate::state::SessionServices;
104104
use crate::state::SessionState;
105105
use crate::state::TaskKind;
106106
use crate::tasks::CompactTask;
107+
use crate::tasks::GhostSnapshotTask;
107108
use crate::tasks::RegularTask;
108109
use crate::tasks::ReviewTask;
110+
use crate::tasks::SessionTask;
111+
use crate::tasks::SessionTaskContext;
109112
use crate::tools::ToolRouter;
110113
use crate::tools::context::SharedTurnDiffTracker;
111114
use crate::tools::parallel::ToolCallRuntime;
@@ -128,6 +131,8 @@ use codex_protocol::models::ResponseInputItem;
128131
use codex_protocol::models::ResponseItem;
129132
use codex_protocol::protocol::InitialHistory;
130133
use codex_protocol::user_input::UserInput;
134+
use codex_utils_readiness::Readiness;
135+
use codex_utils_readiness::ReadinessFlag;
131136

132137
pub mod compact;
133138
use self::compact::build_compacted_history;
@@ -178,6 +183,7 @@ impl Codex {
178183
sandbox_policy: config.sandbox_policy.clone(),
179184
cwd: config.cwd.clone(),
180185
original_config_do_not_use: Arc::clone(&config),
186+
features: config.features.clone(),
181187
};
182188

183189
// Generate a unique ID for the lifetime of this Codex session.
@@ -271,6 +277,7 @@ pub(crate) struct TurnContext {
271277
pub(crate) is_review_mode: bool,
272278
pub(crate) final_output_json_schema: Option<Value>,
273279
pub(crate) codex_linux_sandbox_exe: Option<PathBuf>,
280+
pub(crate) tool_call_gate: Arc<ReadinessFlag>,
274281
}
275282

276283
impl TurnContext {
@@ -312,6 +319,9 @@ pub(crate) struct SessionConfiguration {
312319
/// operate deterministically.
313320
cwd: PathBuf,
314321

322+
/// Set of feature flags for this session
323+
features: Features,
324+
315325
// TODO(pakrym): Remove config from here
316326
original_config_do_not_use: Arc<Config>,
317327
}
@@ -406,6 +416,7 @@ impl Session {
406416
is_review_mode: false,
407417
final_output_json_schema: None,
408418
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
419+
tool_call_gate: Arc::new(ReadinessFlag::new()),
409420
}
410421
}
411422

@@ -1096,6 +1107,43 @@ impl Session {
10961107
self.send_event(turn_context, event).await;
10971108
}
10981109

1110+
async fn maybe_start_ghost_snapshot(
1111+
self: &Arc<Self>,
1112+
turn_context: Arc<TurnContext>,
1113+
cancellation_token: CancellationToken,
1114+
) {
1115+
if turn_context.is_review_mode
1116+
|| !self
1117+
.state
1118+
.lock()
1119+
.await
1120+
.session_configuration
1121+
.features
1122+
.enabled(Feature::GhostCommit)
1123+
{
1124+
return;
1125+
}
1126+
1127+
let token = match turn_context.tool_call_gate.subscribe().await {
1128+
Ok(token) => token,
1129+
Err(err) => {
1130+
warn!("failed to subscribe to ghost snapshot readiness: {err}");
1131+
return;
1132+
}
1133+
};
1134+
1135+
info!("spawning ghost snapshot task");
1136+
let task = GhostSnapshotTask::new(token);
1137+
Arc::new(task)
1138+
.run(
1139+
Arc::new(SessionTaskContext::new(self.clone())),
1140+
turn_context.clone(),
1141+
Vec::new(),
1142+
cancellation_token,
1143+
)
1144+
.await;
1145+
}
1146+
10991147
/// Returns the input if there was no task running to inject into
11001148
pub async fn inject_input(&self, input: Vec<UserInput>) -> Result<(), Vec<UserInput>> {
11011149
let mut active = self.active_turn.lock().await;
@@ -1508,6 +1556,7 @@ async fn spawn_review_thread(
15081556
is_review_mode: true,
15091557
final_output_json_schema: None,
15101558
codex_linux_sandbox_exe: parent_turn_context.codex_linux_sandbox_exe.clone(),
1559+
tool_call_gate: Arc::new(ReadinessFlag::new()),
15111560
};
15121561

15131562
// Seed the child task with the review prompt as the initial user message.
@@ -1571,6 +1620,8 @@ pub(crate) async fn run_task(
15711620
.await;
15721621
}
15731622

1623+
sess.maybe_start_ghost_snapshot(Arc::clone(&turn_context), cancellation_token.child_token())
1624+
.await;
15741625
let mut last_agent_message: Option<String> = None;
15751626
// Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains
15761627
// many turns, from the perspective of the user, it is a single turn.
@@ -1763,6 +1814,13 @@ fn parse_review_output_event(text: &str) -> ReviewOutputEvent {
17631814
}
17641815
}
17651816

1817+
fn filter_model_visible_history(input: Vec<ResponseItem>) -> Vec<ResponseItem> {
1818+
input
1819+
.into_iter()
1820+
.filter(|item| !matches!(item, ResponseItem::GhostSnapshot { .. }))
1821+
.collect()
1822+
}
1823+
17661824
async fn run_turn(
17671825
sess: Arc<Session>,
17681826
turn_context: Arc<TurnContext>,
@@ -1783,7 +1841,7 @@ async fn run_turn(
17831841
.supports_parallel_tool_calls;
17841842
let parallel_tool_calls = model_supports_parallel;
17851843
let prompt = Prompt {
1786-
input,
1844+
input: filter_model_visible_history(input),
17871845
tools: router.specs(),
17881846
parallel_tool_calls,
17891847
base_instructions_override: turn_context.base_instructions.clone(),
@@ -2278,6 +2336,8 @@ fn is_mcp_client_startup_timeout_error(error: &anyhow::Error) -> bool {
22782336
|| error_message.contains("timed out handshaking with MCP server")
22792337
}
22802338

2339+
use crate::features::Feature;
2340+
use crate::features::Features;
22812341
#[cfg(test)]
22822342
pub(crate) use tests::make_session_and_context;
22832343

@@ -2594,6 +2654,7 @@ mod tests {
25942654
sandbox_policy: config.sandbox_policy.clone(),
25952655
cwd: config.cwd.clone(),
25962656
original_config_do_not_use: Arc::clone(&config),
2657+
features: Features::default(),
25972658
};
25982659

25992660
let state = SessionState::new(session_configuration.clone());
@@ -2662,6 +2723,7 @@ mod tests {
26622723
sandbox_policy: config.sandbox_policy.clone(),
26632724
cwd: config.cwd.clone(),
26642725
original_config_do_not_use: Arc::clone(&config),
2726+
features: Features::default(),
26652727
};
26662728

26672729
let state = SessionState::new(session_configuration.clone());

codex-rs/core/src/conversation_history.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use codex_protocol::models::FunctionCallOutputPayload;
22
use codex_protocol::models::ResponseItem;
33
use codex_protocol::protocol::TokenUsage;
44
use codex_protocol::protocol::TokenUsageInfo;
5+
use std::ops::Deref;
56
use tracing::error;
67

78
/// Transcript of conversation history
@@ -40,7 +41,9 @@ impl ConversationHistory {
4041
I::Item: std::ops::Deref<Target = ResponseItem>,
4142
{
4243
for item in items {
43-
if !is_api_message(&item) {
44+
let item_ref = item.deref();
45+
let is_ghost_snapshot = matches!(item_ref, ResponseItem::GhostSnapshot { .. });
46+
if !is_api_message(item_ref) && !is_ghost_snapshot {
4447
continue;
4548
}
4649

@@ -165,6 +168,7 @@ impl ConversationHistory {
165168
| ResponseItem::WebSearchCall { .. }
166169
| ResponseItem::FunctionCallOutput { .. }
167170
| ResponseItem::CustomToolCallOutput { .. }
171+
| ResponseItem::GhostSnapshot { .. }
168172
| ResponseItem::Other
169173
| ResponseItem::Message { .. } => {
170174
// nothing to do for these variants
@@ -231,6 +235,7 @@ impl ConversationHistory {
231235
| ResponseItem::LocalShellCall { .. }
232236
| ResponseItem::Reasoning { .. }
233237
| ResponseItem::WebSearchCall { .. }
238+
| ResponseItem::GhostSnapshot { .. }
234239
| ResponseItem::Other
235240
| ResponseItem::Message { .. } => {
236241
// nothing to do for these variants
@@ -355,6 +360,7 @@ fn is_api_message(message: &ResponseItem) -> bool {
355360
| ResponseItem::LocalShellCall { .. }
356361
| ResponseItem::Reasoning { .. }
357362
| ResponseItem::WebSearchCall { .. } => true,
363+
ResponseItem::GhostSnapshot { .. } => false,
358364
ResponseItem::Other => false,
359365
}
360366
}

codex-rs/core/src/features.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ pub enum Feature {
4141
WebSearchRequest,
4242
/// Enable the model-based risk assessments for sandboxed commands.
4343
SandboxCommandAssessment,
44+
/// Create a ghost commit at each turn.
45+
GhostCommit,
4446
}
4547

4648
impl Feature {
@@ -248,4 +250,10 @@ pub const FEATURES: &[FeatureSpec] = &[
248250
stage: Stage::Experimental,
249251
default_enabled: false,
250252
},
253+
FeatureSpec {
254+
id: Feature::GhostCommit,
255+
key: "ghost_commit",
256+
stage: Stage::Experimental,
257+
default_enabled: false,
258+
},
251259
];

codex-rs/core/src/rollout/policy.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ pub(crate) fn should_persist_response_item(item: &ResponseItem) -> bool {
2626
| ResponseItem::FunctionCallOutput { .. }
2727
| ResponseItem::CustomToolCall { .. }
2828
| ResponseItem::CustomToolCallOutput { .. }
29-
| ResponseItem::WebSearchCall { .. } => true,
29+
| ResponseItem::WebSearchCall { .. }
30+
| ResponseItem::GhostSnapshot { .. } => true,
3031
ResponseItem::Other => false,
3132
}
3233
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
use crate::codex::TurnContext;
2+
use crate::state::TaskKind;
3+
use crate::tasks::SessionTask;
4+
use crate::tasks::SessionTaskContext;
5+
use async_trait::async_trait;
6+
use codex_git_tooling::CreateGhostCommitOptions;
7+
use codex_git_tooling::GitToolingError;
8+
use codex_git_tooling::create_ghost_commit;
9+
use codex_protocol::models::ResponseItem;
10+
use codex_protocol::user_input::UserInput;
11+
use codex_utils_readiness::Readiness;
12+
use codex_utils_readiness::Token;
13+
use std::borrow::ToOwned;
14+
use std::sync::Arc;
15+
use tokio_util::sync::CancellationToken;
16+
use tracing::info;
17+
use tracing::warn;
18+
19+
pub(crate) struct GhostSnapshotTask {
20+
token: Token,
21+
}
22+
23+
#[async_trait]
24+
impl SessionTask for GhostSnapshotTask {
25+
fn kind(&self) -> TaskKind {
26+
TaskKind::Regular
27+
}
28+
29+
async fn run(
30+
self: Arc<Self>,
31+
session: Arc<SessionTaskContext>,
32+
ctx: Arc<TurnContext>,
33+
_input: Vec<UserInput>,
34+
cancellation_token: CancellationToken,
35+
) -> Option<String> {
36+
tokio::task::spawn(async move {
37+
let token = self.token;
38+
let ctx_for_task = Arc::clone(&ctx);
39+
let cancelled = tokio::select! {
40+
_ = cancellation_token.cancelled() => true,
41+
_ = async {
42+
let repo_path = ctx_for_task.cwd.clone();
43+
// Required to run in a dedicated blocking pool.
44+
match tokio::task::spawn_blocking(move || {
45+
let options = CreateGhostCommitOptions::new(&repo_path);
46+
create_ghost_commit(&options)
47+
})
48+
.await
49+
{
50+
Ok(Ok(ghost_commit)) => {
51+
info!("ghost snapshot blocking task finished");
52+
session
53+
.session
54+
.record_conversation_items(&ctx, &[ResponseItem::GhostSnapshot {
55+
commit_id: ghost_commit.id().to_string(),
56+
parent: ghost_commit.parent().map(ToOwned::to_owned),
57+
}])
58+
.await;
59+
info!("ghost commit captured: {}", ghost_commit.id());
60+
}
61+
Ok(Err(err)) => {
62+
warn!(
63+
sub_id = ctx_for_task.sub_id.as_str(),
64+
"failed to capture ghost snapshot: {err}"
65+
);
66+
let message = match err {
67+
GitToolingError::NotAGitRepository { .. } => {
68+
"Snapshots disabled: current directory is not a Git repository."
69+
.to_string()
70+
}
71+
_ => format!("Snapshots disabled after ghost snapshot error: {err}."),
72+
};
73+
session
74+
.session
75+
.notify_background_event(&ctx_for_task, message)
76+
.await;
77+
}
78+
Err(err) => {
79+
warn!(
80+
sub_id = ctx_for_task.sub_id.as_str(),
81+
"ghost snapshot task panicked: {err}"
82+
);
83+
let message =
84+
format!("Snapshots disabled after ghost snapshot panic: {err}.");
85+
session
86+
.session
87+
.notify_background_event(&ctx_for_task, message)
88+
.await;
89+
}
90+
}
91+
} => false,
92+
};
93+
94+
if cancelled {
95+
info!("ghost snapshot task cancelled");
96+
}
97+
98+
match ctx.tool_call_gate.mark_ready(token).await {
99+
Ok(true) => info!("ghost snapshot gate marked ready"),
100+
Ok(false) => warn!("ghost snapshot gate already ready"),
101+
Err(err) => warn!("failed to mark ghost snapshot ready: {err}"),
102+
}
103+
});
104+
None
105+
}
106+
}
107+
108+
impl GhostSnapshotTask {
109+
pub(crate) fn new(token: Token) -> Self {
110+
Self { token }
111+
}
112+
}

0 commit comments

Comments
 (0)