Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 51 additions & 9 deletions codex-rs/core/src/conversation_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,34 @@ pub struct NewConversation {

/// [`ConversationManager`] is responsible for creating conversations and
/// maintaining them in memory.
#[derive(Clone)]
struct ConversationEntry {
conversation: Arc<CodexConversation>,
session_configured: SessionConfiguredEvent,
}

impl ConversationEntry {
fn new(
conversation: Arc<CodexConversation>,
session_configured: SessionConfiguredEvent,
) -> Self {
Self {
conversation,
session_configured,
}
}

fn to_new_conversation(&self, conversation_id: ConversationId) -> NewConversation {
NewConversation {
conversation_id,
conversation: self.conversation.clone(),
session_configured: self.session_configured.clone(),
}
}
}

pub struct ConversationManager {
conversations: Arc<RwLock<HashMap<ConversationId, Arc<CodexConversation>>>>,
conversations: Arc<RwLock<HashMap<ConversationId, ConversationEntry>>>,
auth_manager: Arc<AuthManager>,
session_source: SessionSource,
}
Expand Down Expand Up @@ -99,10 +125,11 @@ impl ConversationManager {
};

let conversation = Arc::new(CodexConversation::new(codex));
let entry = ConversationEntry::new(conversation.clone(), session_configured.clone());
self.conversations
.write()
.await
.insert(conversation_id, conversation.clone());
.insert(conversation_id, entry);

Ok(NewConversation {
conversation_id,
Expand All @@ -118,7 +145,7 @@ impl ConversationManager {
let conversations = self.conversations.read().await;
conversations
.get(&conversation_id)
.cloned()
.map(|entry| entry.conversation.clone())
.ok_or_else(|| CodexErr::ConversationNotFound(conversation_id))
}

Expand All @@ -129,11 +156,22 @@ impl ConversationManager {
auth_manager: Arc<AuthManager>,
) -> CodexResult<NewConversation> {
let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?;
let CodexSpawnOk {
codex,
conversation_id,
} = Codex::spawn(config, auth_manager, initial_history, self.session_source).await?;
self.finalize_spawn(codex, conversation_id).await
if let InitialHistory::Resumed(resumed) = &initial_history
&& let Some(existing) = self
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I don't love that we do the read() here and on that basis do the write() later, as that means there is still time between these two events for the conversation to be inserted twice?

.conversations
.read()
.await
.get(&resumed.conversation_id)
.cloned()
{
Ok(existing.to_new_conversation(resumed.conversation_id))
} else {
let CodexSpawnOk {
codex,
conversation_id,
} = Codex::spawn(config, auth_manager, initial_history, self.session_source).await?;
self.finalize_spawn(codex, conversation_id).await
}
}

/// Removes the conversation from the manager's internal map, though the
Expand All @@ -144,7 +182,11 @@ impl ConversationManager {
&self,
conversation_id: &ConversationId,
) -> Option<Arc<CodexConversation>> {
self.conversations.write().await.remove(conversation_id)
self.conversations
.write()
.await
.remove(conversation_id)
.map(|entry| entry.conversation)
}

/// Fork an existing conversation by taking messages up to the given position
Expand Down
23 changes: 22 additions & 1 deletion codex-rs/core/tests/suite/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,11 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
session_configured,
..
} = conversation_manager
.resume_conversation_from_rollout(config, session_path.clone(), auth_manager)
.resume_conversation_from_rollout(
config.clone(),
session_path.clone(),
auth_manager.clone(),
)
.await
.expect("resume conversation");

Expand All @@ -260,6 +264,23 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
let expected_initial_json = json!([]);
assert_eq!(initial_json, expected_initial_json);

let NewConversation {
conversation: codex_again,
session_configured: session_configured_again,
..
} = conversation_manager
.resume_conversation_from_rollout(
config.clone(),
session_path.clone(),
auth_manager.clone(),
)
.await
.expect("resume existing conversation");
assert!(Arc::ptr_eq(&codex, &codex_again));
let session_configured_json = serde_json::to_value(&session_configured).unwrap();
let session_configured_again_json = serde_json::to_value(&session_configured_again).unwrap();
assert_eq!(session_configured_json, session_configured_again_json);

// 2) Submit new input; the request body must include the prior item followed by the new user input.
codex
.submit(Op::UserInput {
Expand Down
Loading