From 145a4ed1f445e72bb406a6997fe321260ea06164 Mon Sep 17 00:00:00 2001 From: Philippe Branchu Date: Mon, 23 Mar 2026 04:34:05 +0000 Subject: [PATCH 1/2] Add agent_send_async and agent_cancel tools for non-blocking delegation Introduces two new built-in tools: - agent_send_async: dispatches a message to another agent in the background, returns immediately with a callback_id, and delivers the result as a follow-up message via an mpsc channel (consumed by channel adapters) - agent_cancel: aborts a pending async task by callback_id This enables voice and real-time channel adapters to acknowledge the user immediately while long-running delegations (email, calendar, etc.) complete in the background. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/openfang-runtime/src/tool_runner.rs | 139 +++++++++++++++++++++ 1 file changed, 139 insertions(+) diff --git a/crates/openfang-runtime/src/tool_runner.rs b/crates/openfang-runtime/src/tool_runner.rs index 0e1853593..7e4718d05 100644 --- a/crates/openfang-runtime/src/tool_runner.rs +++ b/crates/openfang-runtime/src/tool_runner.rs @@ -18,6 +18,33 @@ use tracing::{debug, warn}; /// Maximum inter-agent call depth to prevent infinite recursion (A->B->C->...). const MAX_AGENT_CALL_DEPTH: u32 = 5; +/// Default timeout for async agent delegation (seconds). +const DEFAULT_ASYNC_TIMEOUT_SECS: u64 = 30; + +/// Tracks pending async agent tasks for cancellation and result delivery. +static ASYNC_TASKS: std::sync::LazyLock>> = + std::sync::LazyLock::new(dashmap::DashMap::new); + +/// Channel for delivering async delegation results to channel adapters (e.g. voice). +/// The tool runner sends Jeeves's formatted response; the voice adapter receives and speaks it. +static ASYNC_RESULT_TX: std::sync::LazyLock> = + std::sync::LazyLock::new(|| { + let (tx, rx) = tokio::sync::mpsc::channel(64); + *ASYNC_RESULT_RX.lock().unwrap() = Some(rx); + tx + }); + +type AsyncResultRx = std::sync::Mutex>>; +static ASYNC_RESULT_RX: std::sync::LazyLock = + std::sync::LazyLock::new(|| std::sync::Mutex::new(None)); + +/// Take the async result receiver. Called once by the voice adapter at startup. +pub fn take_async_result_receiver() -> Option> { + // Ensure TX is initialized first (creates the channel) + let _ = &*ASYNC_RESULT_TX; + ASYNC_RESULT_RX.lock().unwrap().take() +} + /// Check if a shell command should be blocked by taint tracking. /// /// Layer 1: Shell metacharacter injection (backticks, `$(`, `${`, etc.) @@ -267,6 +294,8 @@ pub async fn execute_tool( // Inter-agent tools (require kernel handle) "agent_send" => tool_agent_send(input, kernel).await, + "agent_send_async" => tool_agent_send_async(input, kernel, caller_agent_id).await, + "agent_cancel" => tool_agent_cancel(input).await, "agent_spawn" => tool_agent_spawn(input, kernel, caller_agent_id).await, "agent_list" => tool_agent_list(kernel), "agent_kill" => tool_agent_kill(input, kernel), @@ -630,6 +659,30 @@ pub fn builtin_tool_definitions() -> Vec { "required": ["agent_id", "message"] }), }, + ToolDefinition { + name: "agent_send_async".to_string(), + description: "Send a message to another agent asynchronously. Returns immediately with a callback_id. The agent runs in the background and the result is delivered as a follow-up message when ready. Use this for tasks that may take a while (email, calendar, etc.) so you can acknowledge the user immediately.".to_string(), + input_schema: serde_json::json!({ + "type": "object", + "properties": { + "agent_id": { "type": "string", "description": "The target agent's UUID or name" }, + "message": { "type": "string", "description": "The message to send to the agent" }, + "timeout_secs": { "type": "integer", "description": "Timeout in seconds (default 30)" } + }, + "required": ["agent_id", "message"] + }), + }, + ToolDefinition { + name: "agent_cancel".to_string(), + description: "Cancel a pending async agent task by its callback_id.".to_string(), + input_schema: serde_json::json!({ + "type": "object", + "properties": { + "callback_id": { "type": "string", "description": "The callback_id returned by agent_send_async" } + }, + "required": ["callback_id"] + }), + }, ToolDefinition { name: "agent_spawn".to_string(), description: "Spawn a new agent from a TOML manifest. Returns the new agent's ID and name.".to_string(), @@ -1618,6 +1671,92 @@ async fn tool_agent_send( .await } +async fn tool_agent_send_async( + input: &serde_json::Value, + kernel: Option<&Arc>, + caller_agent_id: Option<&str>, +) -> Result { + let kh = require_kernel(kernel)?; + let agent_id = input["agent_id"] + .as_str() + .ok_or("Missing 'agent_id' parameter")? + .to_string(); + let message = input["message"] + .as_str() + .ok_or("Missing 'message' parameter")? + .to_string(); + let timeout_secs = input["timeout_secs"] + .as_u64() + .unwrap_or(DEFAULT_ASYNC_TIMEOUT_SECS); + + let caller_id = caller_agent_id + .ok_or("Cannot use agent_send_async without a caller agent context")? + .to_string(); + + let callback_id = uuid::Uuid::new_v4().to_string(); + let cb_id = callback_id.clone(); + + // Clone the kernel handle for the background task + let kh_clone = kh.clone(); + + let handle = tokio::spawn(async move { + // Run the target agent with a timeout + let result = tokio::time::timeout( + std::time::Duration::from_secs(timeout_secs), + kh_clone.send_to_agent(&agent_id, &message), + ) + .await; + + let result_text = match result { + Ok(Ok(response)) => response, + Ok(Err(e)) => format!("[Error from {agent_id}]: {e}"), + Err(_) => format!("[Timeout]: {agent_id} did not respond within {timeout_secs}s"), + }; + + // Deliver result to the caller agent for formatting, + // then send the formatted response to voice/channel adapters. + let callback_msg = format!("[Async result {cb_id} from {agent_id}]: {result_text}"); + match kh_clone.send_to_agent(&caller_id, &callback_msg).await { + Ok(formatted_response) => { + debug!("Async callback: {caller_id} responded with: {formatted_response}"); + let _ = ASYNC_RESULT_TX + .send((caller_id.clone(), formatted_response)) + .await; + } + Err(e) => { + warn!("Failed to deliver async callback to {caller_id}: {e}"); + // Fall back to raw result if caller fails + let _ = ASYNC_RESULT_TX.send((caller_id.clone(), result_text)).await; + } + } + + // Clean up tracking + ASYNC_TASKS.remove(&cb_id); + }); + + // Track the task for cancellation + ASYNC_TASKS.insert(callback_id.clone(), handle); + + Ok(format!( + "Task dispatched. callback_id: {callback_id}. \ + The result will be delivered as a follow-up message. \ + You can now respond to the user while the task runs." + )) +} + +async fn tool_agent_cancel(input: &serde_json::Value) -> Result { + let callback_id = input["callback_id"] + .as_str() + .ok_or("Missing 'callback_id' parameter")?; + + if let Some((_, handle)) = ASYNC_TASKS.remove(callback_id) { + handle.abort(); + Ok(format!("Task {callback_id} cancelled.")) + } else { + Err(format!("No pending task with callback_id: {callback_id}")) + } +} + async fn tool_agent_spawn( input: &serde_json::Value, kernel: Option<&Arc>, From 3984657c4faf52f754950a06db0438abdc239a1a Mon Sep 17 00:00:00 2001 From: Philippe Branchu Date: Mon, 23 Mar 2026 04:45:01 +0000 Subject: [PATCH 2/2] Add tests for agent_send_async and agent_cancel tools - test_async_tool_definitions_exist - test_async_tool_schema_has_required_params - test_cancel_tool_schema - test_agent_send_async_no_kernel_fails - test_agent_send_async_missing_caller - test_agent_cancel_missing_callback_id - test_agent_cancel_nonexistent_task - test_take_async_result_receiver_returns_once Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/openfang-runtime/src/tool_runner.rs | 99 ++++++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/crates/openfang-runtime/src/tool_runner.rs b/crates/openfang-runtime/src/tool_runner.rs index 7e4718d05..279154e32 100644 --- a/crates/openfang-runtime/src/tool_runner.rs +++ b/crates/openfang-runtime/src/tool_runner.rs @@ -4124,4 +4124,103 @@ mod tests { // Cleanup let _ = std::fs::remove_dir_all(&tmp); } + + // ── agent_send_async / agent_cancel tests ─────────────────────────── + + #[test] + fn test_async_tool_definitions_exist() { + let defs = builtin_tool_definitions(); + let names: Vec<&str> = defs.iter().map(|d| d.name.as_str()).collect(); + assert!( + names.contains(&"agent_send_async"), + "agent_send_async must be in builtin tools" + ); + assert!( + names.contains(&"agent_cancel"), + "agent_cancel must be in builtin tools" + ); + } + + #[test] + fn test_async_tool_schema_has_required_params() { + let defs = builtin_tool_definitions(); + let async_def = defs + .iter() + .find(|d| d.name == "agent_send_async") + .expect("agent_send_async not found"); + + let required = async_def.input_schema["required"] + .as_array() + .expect("missing required"); + let required_names: Vec<&str> = required.iter().filter_map(|v| v.as_str()).collect(); + assert!(required_names.contains(&"agent_id")); + assert!(required_names.contains(&"message")); + + // timeout_secs should be optional (in properties but not required) + assert!(async_def.input_schema["properties"]["timeout_secs"].is_object()); + assert!(!required_names.contains(&"timeout_secs")); + } + + #[test] + fn test_cancel_tool_schema() { + let defs = builtin_tool_definitions(); + let cancel_def = defs + .iter() + .find(|d| d.name == "agent_cancel") + .expect("agent_cancel not found"); + + let required = cancel_def.input_schema["required"] + .as_array() + .expect("missing required"); + let required_names: Vec<&str> = required.iter().filter_map(|v| v.as_str()).collect(); + assert!(required_names.contains(&"callback_id")); + } + + #[tokio::test] + async fn test_agent_send_async_no_kernel_fails() { + // All calls without a kernel handle should fail at require_kernel + let input = serde_json::json!({"agent_id": "test", "message": "hello"}); + let result = tool_agent_send_async(&input, None, Some("caller")).await; + assert!(result.is_err()); + assert!( + result.unwrap_err().contains("kernel"), + "Should fail with kernel error" + ); + } + + #[tokio::test] + async fn test_agent_send_async_missing_caller() { + // Without caller_agent_id, should fail (after kernel check) + // Since we can't easily provide a kernel in unit tests, we verify + // the error path via the no-kernel check instead. + let input = serde_json::json!({"agent_id": "test", "message": "hello"}); + let result = tool_agent_send_async(&input, None, None).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_agent_cancel_missing_callback_id() { + let input = serde_json::json!({}); + let result = tool_agent_cancel(&input).await; + assert!(result.is_err()); + assert!(result.unwrap_err().contains("callback_id")); + } + + #[tokio::test] + async fn test_agent_cancel_nonexistent_task() { + let input = serde_json::json!({"callback_id": "nonexistent-id-12345"}); + let result = tool_agent_cancel(&input).await; + assert!(result.is_err()); + assert!(result.unwrap_err().contains("No pending task")); + } + + #[test] + fn test_take_async_result_receiver_returns_once() { + // First call should return Some, second should return None + // (channel already taken) + // Note: this test is order-dependent with other tests that may + // call take_async_result_receiver, so we just verify the function + // doesn't panic. + let _ = take_async_result_receiver(); + } }