Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 238 additions & 0 deletions crates/openfang-runtime/src/tool_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,33 @@ use tracing::{debug, warn};
/// Maximum inter-agent call depth to prevent infinite recursion (A->B->C->...).
const MAX_AGENT_CALL_DEPTH: u32 = 5;

/// Default timeout for async agent delegation (seconds).
const DEFAULT_ASYNC_TIMEOUT_SECS: u64 = 30;

/// Tracks pending async agent tasks for cancellation and result delivery.
static ASYNC_TASKS: std::sync::LazyLock<dashmap::DashMap<String, tokio::task::JoinHandle<()>>> =
std::sync::LazyLock::new(dashmap::DashMap::new);

/// Channel for delivering async delegation results to channel adapters (e.g. voice).
/// The tool runner sends Jeeves's formatted response; the voice adapter receives and speaks it.
static ASYNC_RESULT_TX: std::sync::LazyLock<tokio::sync::mpsc::Sender<(String, String)>> =
std::sync::LazyLock::new(|| {
let (tx, rx) = tokio::sync::mpsc::channel(64);
*ASYNC_RESULT_RX.lock().unwrap() = Some(rx);
tx
});

type AsyncResultRx = std::sync::Mutex<Option<tokio::sync::mpsc::Receiver<(String, String)>>>;
static ASYNC_RESULT_RX: std::sync::LazyLock<AsyncResultRx> =
std::sync::LazyLock::new(|| std::sync::Mutex::new(None));

/// Take the async result receiver. Called once by the voice adapter at startup.
pub fn take_async_result_receiver() -> Option<tokio::sync::mpsc::Receiver<(String, String)>> {
// Ensure TX is initialized first (creates the channel)
let _ = &*ASYNC_RESULT_TX;
ASYNC_RESULT_RX.lock().unwrap().take()
}

/// Check if a shell command should be blocked by taint tracking.
///
/// Layer 1: Shell metacharacter injection (backticks, `$(`, `${`, etc.)
Expand Down Expand Up @@ -267,6 +294,8 @@ pub async fn execute_tool(

// Inter-agent tools (require kernel handle)
"agent_send" => tool_agent_send(input, kernel).await,
"agent_send_async" => tool_agent_send_async(input, kernel, caller_agent_id).await,
"agent_cancel" => tool_agent_cancel(input).await,
"agent_spawn" => tool_agent_spawn(input, kernel, caller_agent_id).await,
"agent_list" => tool_agent_list(kernel),
"agent_kill" => tool_agent_kill(input, kernel),
Expand Down Expand Up @@ -630,6 +659,30 @@ pub fn builtin_tool_definitions() -> Vec<ToolDefinition> {
"required": ["agent_id", "message"]
}),
},
ToolDefinition {
name: "agent_send_async".to_string(),
description: "Send a message to another agent asynchronously. Returns immediately with a callback_id. The agent runs in the background and the result is delivered as a follow-up message when ready. Use this for tasks that may take a while (email, calendar, etc.) so you can acknowledge the user immediately.".to_string(),
input_schema: serde_json::json!({
"type": "object",
"properties": {
"agent_id": { "type": "string", "description": "The target agent's UUID or name" },
"message": { "type": "string", "description": "The message to send to the agent" },
"timeout_secs": { "type": "integer", "description": "Timeout in seconds (default 30)" }
},
"required": ["agent_id", "message"]
}),
},
ToolDefinition {
name: "agent_cancel".to_string(),
description: "Cancel a pending async agent task by its callback_id.".to_string(),
input_schema: serde_json::json!({
"type": "object",
"properties": {
"callback_id": { "type": "string", "description": "The callback_id returned by agent_send_async" }
},
"required": ["callback_id"]
}),
},
ToolDefinition {
name: "agent_spawn".to_string(),
description: "Spawn a new agent from a TOML manifest. Returns the new agent's ID and name.".to_string(),
Expand Down Expand Up @@ -1618,6 +1671,92 @@ async fn tool_agent_send(
.await
}

async fn tool_agent_send_async(
input: &serde_json::Value,
kernel: Option<&Arc<dyn KernelHandle>>,
caller_agent_id: Option<&str>,
) -> Result<String, String> {
let kh = require_kernel(kernel)?;
let agent_id = input["agent_id"]
.as_str()
.ok_or("Missing 'agent_id' parameter")?
.to_string();
let message = input["message"]
.as_str()
.ok_or("Missing 'message' parameter")?
.to_string();
let timeout_secs = input["timeout_secs"]
.as_u64()
.unwrap_or(DEFAULT_ASYNC_TIMEOUT_SECS);

let caller_id = caller_agent_id
.ok_or("Cannot use agent_send_async without a caller agent context")?
.to_string();

let callback_id = uuid::Uuid::new_v4().to_string();
let cb_id = callback_id.clone();

// Clone the kernel handle for the background task
let kh_clone = kh.clone();

let handle = tokio::spawn(async move {
// Run the target agent with a timeout
let result = tokio::time::timeout(
std::time::Duration::from_secs(timeout_secs),
kh_clone.send_to_agent(&agent_id, &message),
)
.await;

let result_text = match result {
Ok(Ok(response)) => response,
Ok(Err(e)) => format!("[Error from {agent_id}]: {e}"),
Err(_) => format!("[Timeout]: {agent_id} did not respond within {timeout_secs}s"),
};

// Deliver result to the caller agent for formatting,
// then send the formatted response to voice/channel adapters.
let callback_msg = format!("[Async result {cb_id} from {agent_id}]: {result_text}");
match kh_clone.send_to_agent(&caller_id, &callback_msg).await {
Ok(formatted_response) => {
debug!("Async callback: {caller_id} responded with: {formatted_response}");
let _ = ASYNC_RESULT_TX
.send((caller_id.clone(), formatted_response))
.await;
}
Err(e) => {
warn!("Failed to deliver async callback to {caller_id}: {e}");
// Fall back to raw result if caller fails
let _ = ASYNC_RESULT_TX.send((caller_id.clone(), result_text)).await;
}
}

// Clean up tracking
ASYNC_TASKS.remove(&cb_id);
});

// Track the task for cancellation
ASYNC_TASKS.insert(callback_id.clone(), handle);

Ok(format!(
"Task dispatched. callback_id: {callback_id}. \
The result will be delivered as a follow-up message. \
You can now respond to the user while the task runs."
))
}

async fn tool_agent_cancel(input: &serde_json::Value) -> Result<String, String> {
let callback_id = input["callback_id"]
.as_str()
.ok_or("Missing 'callback_id' parameter")?;

if let Some((_, handle)) = ASYNC_TASKS.remove(callback_id) {
handle.abort();
Ok(format!("Task {callback_id} cancelled."))
} else {
Err(format!("No pending task with callback_id: {callback_id}"))
}
}

async fn tool_agent_spawn(
input: &serde_json::Value,
kernel: Option<&Arc<dyn KernelHandle>>,
Expand Down Expand Up @@ -3985,4 +4124,103 @@ mod tests {
// Cleanup
let _ = std::fs::remove_dir_all(&tmp);
}

// ── agent_send_async / agent_cancel tests ───────────────────────────

#[test]
fn test_async_tool_definitions_exist() {
let defs = builtin_tool_definitions();
let names: Vec<&str> = defs.iter().map(|d| d.name.as_str()).collect();
assert!(
names.contains(&"agent_send_async"),
"agent_send_async must be in builtin tools"
);
assert!(
names.contains(&"agent_cancel"),
"agent_cancel must be in builtin tools"
);
}

#[test]
fn test_async_tool_schema_has_required_params() {
let defs = builtin_tool_definitions();
let async_def = defs
.iter()
.find(|d| d.name == "agent_send_async")
.expect("agent_send_async not found");

let required = async_def.input_schema["required"]
.as_array()
.expect("missing required");
let required_names: Vec<&str> = required.iter().filter_map(|v| v.as_str()).collect();
assert!(required_names.contains(&"agent_id"));
assert!(required_names.contains(&"message"));

// timeout_secs should be optional (in properties but not required)
assert!(async_def.input_schema["properties"]["timeout_secs"].is_object());
assert!(!required_names.contains(&"timeout_secs"));
}

#[test]
fn test_cancel_tool_schema() {
let defs = builtin_tool_definitions();
let cancel_def = defs
.iter()
.find(|d| d.name == "agent_cancel")
.expect("agent_cancel not found");

let required = cancel_def.input_schema["required"]
.as_array()
.expect("missing required");
let required_names: Vec<&str> = required.iter().filter_map(|v| v.as_str()).collect();
assert!(required_names.contains(&"callback_id"));
}

#[tokio::test]
async fn test_agent_send_async_no_kernel_fails() {
// All calls without a kernel handle should fail at require_kernel
let input = serde_json::json!({"agent_id": "test", "message": "hello"});
let result = tool_agent_send_async(&input, None, Some("caller")).await;
assert!(result.is_err());
assert!(
result.unwrap_err().contains("kernel"),
"Should fail with kernel error"
);
}

#[tokio::test]
async fn test_agent_send_async_missing_caller() {
// Without caller_agent_id, should fail (after kernel check)
// Since we can't easily provide a kernel in unit tests, we verify
// the error path via the no-kernel check instead.
let input = serde_json::json!({"agent_id": "test", "message": "hello"});
let result = tool_agent_send_async(&input, None, None).await;
assert!(result.is_err());
}

#[tokio::test]
async fn test_agent_cancel_missing_callback_id() {
let input = serde_json::json!({});
let result = tool_agent_cancel(&input).await;
assert!(result.is_err());
assert!(result.unwrap_err().contains("callback_id"));
}

#[tokio::test]
async fn test_agent_cancel_nonexistent_task() {
let input = serde_json::json!({"callback_id": "nonexistent-id-12345"});
let result = tool_agent_cancel(&input).await;
assert!(result.is_err());
assert!(result.unwrap_err().contains("No pending task"));
}

#[test]
fn test_take_async_result_receiver_returns_once() {
// First call should return Some, second should return None
// (channel already taken)
// Note: this test is order-dependent with other tests that may
// call take_async_result_receiver, so we just verify the function
// doesn't panic.
let _ = take_async_result_receiver();
}
}