Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/openfang-api/static/js/pages/comms.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ function commsPage() {
stateBadgeClass(state) {
switch(state) {
case 'Running': return 'badge badge-success';
case 'Suspended': return 'badge badge-warning';
case 'Idle': case 'Suspended': return 'badge badge-warning';
case 'Terminated': case 'Crashed': return 'badge badge-danger';
default: return 'badge badge-dim';
}
Expand Down
2 changes: 1 addition & 1 deletion crates/openfang-cli/src/tui/screens/comms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ fn draw_task_modal(f: &mut Frame, area: Rect, state: &CommsState) {
fn state_color(state: &str) -> Style {
match state {
"Running" => Style::default().fg(theme::GREEN),
"Suspended" => Style::default().fg(theme::YELLOW),
"Suspended" | "Idle" => Style::default().fg(theme::YELLOW),
"Terminated" | "Crashed" => Style::default().fg(theme::RED),
_ => theme::dim_style(),
}
Expand Down
4 changes: 3 additions & 1 deletion crates/openfang-cli/src/tui/theme.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ pub fn state_badge(state: &str) -> (&'static str, Style) {
let lower = state.to_lowercase();
if lower.contains("run") {
("[RUN]", badge_running())
} else if lower.contains("creat") || lower.contains("new") || lower.contains("idle") {
} else if lower.contains("idle") {
("[IDL]", badge_suspended())
} else if lower.contains("creat") || lower.contains("new") {
("[NEW]", badge_created())
} else if lower.contains("sus") || lower.contains("paus") {
("[SUS]", badge_suspended())
Expand Down
5 changes: 5 additions & 0 deletions crates/openfang-kernel/src/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ impl BackgroundExecutor {
pub fn active_count(&self) -> usize {
self.tasks.len()
}

/// Check if an agent has an active background task loop.
pub fn has_task(&self, agent_id: AgentId) -> bool {
self.tasks.contains_key(&agent_id)
}
}

/// Parse a proactive condition string into a `TriggerPattern`.
Expand Down
14 changes: 14 additions & 0 deletions crates/openfang-kernel/src/cron.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,20 @@ impl CronScheduler {
self.jobs.len()
}

/// Check if an agent has any enabled cron jobs due within `window_secs` from now.
///
/// Used by the heartbeat monitor to distinguish idle agents (no upcoming work)
/// from agents that should be alive (cron firing soon).
pub fn has_due_jobs_soon(&self, agent_id: AgentId, window_secs: i64) -> bool {
let deadline = Utc::now() + Duration::seconds(window_secs);
self.jobs.iter().any(|entry| {
let meta = entry.value();
meta.job.agent_id == agent_id
&& meta.job.enabled
&& meta.job.next_run.map(|t| t <= deadline).unwrap_or(false)
})
}

/// Return jobs whose `next_run` is at or before `now` and are enabled.
///
/// **Important**: This also pre-advances each due job's `next_run` to the
Expand Down
22 changes: 17 additions & 5 deletions crates/openfang-kernel/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl Default for RecoveryTracker {
}
}

/// Check all running and crashed agents and return their heartbeat status.
/// Check all running, crashed, and idle agents and return their heartbeat status.
///
/// This is a pure function — it doesn't start a background task.
/// The caller (kernel) can run this periodically or in a background task.
Expand All @@ -135,9 +135,10 @@ pub fn check_agents(registry: &AgentRegistry, config: &HeartbeatConfig) -> Vec<H
let mut statuses = Vec::new();

for entry_ref in registry.list() {
// Check Running agents (for unresponsiveness) and Crashed agents (for recovery)
// Check Running agents (for unresponsiveness), Crashed (for recovery),
// and Idle (so kernel can wake them if work arrives)
match entry_ref.state {
AgentState::Running | AgentState::Crashed => {}
AgentState::Running | AgentState::Crashed | AgentState::Idle => {}
_ => continue,
}

Expand All @@ -151,8 +152,13 @@ pub fn check_agents(registry: &AgentRegistry, config: &HeartbeatConfig) -> Vec<H
.map(|a| a.heartbeat_interval_secs * UNRESPONSIVE_MULTIPLIER)
.unwrap_or(config.default_timeout_secs) as i64;

// Crashed agents are always considered unresponsive
let unresponsive = entry_ref.state == AgentState::Crashed || inactive_secs > timeout_secs;
// Idle agents are never considered unresponsive — they're expected to be inactive.
// Crashed agents are always considered unresponsive.
let unresponsive = match entry_ref.state {
AgentState::Idle => false,
AgentState::Crashed => true,
_ => inactive_secs > timeout_secs,
};

if unresponsive && entry_ref.state == AgentState::Running {
warn!(
Expand All @@ -167,6 +173,12 @@ pub fn check_agents(registry: &AgentRegistry, config: &HeartbeatConfig) -> Vec<H
inactive_secs,
"Agent is crashed — eligible for recovery"
);
} else if entry_ref.state == AgentState::Idle {
debug!(
agent = %entry_ref.name,
inactive_secs,
"Agent is idle — waiting for work"
);
} else {
debug!(
agent = %entry_ref.name,
Expand Down
87 changes: 69 additions & 18 deletions crates/openfang-kernel/src/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1536,6 +1536,14 @@ impl OpenFangKernel {
sender_id: Option<String>,
sender_name: Option<String>,
) -> KernelResult<AgentLoopResult> {
// Wake idle agents on message arrival
if let Some(entry) = self.registry.get(agent_id) {
if entry.state == AgentState::Idle {
info!(agent = %entry.name, "Waking idle agent — message received");
let _ = self.registry.set_state(agent_id, AgentState::Running);
}
}

// Acquire per-agent lock to serialize concurrent messages for the same agent.
// This prevents session corruption when multiple messages arrive in quick
// succession (e.g. rapid voice messages via Telegram). Messages for different
Expand Down Expand Up @@ -1634,6 +1642,14 @@ impl OpenFangKernel {
tokio::sync::mpsc::Receiver<StreamEvent>,
tokio::task::JoinHandle<KernelResult<AgentLoopResult>>,
)> {
// Wake idle agents on message arrival
if let Some(entry) = self.registry.get(agent_id) {
if entry.state == AgentState::Idle {
info!(agent = %entry.name, "Waking idle agent — streaming message received");
let _ = self.registry.set_state(agent_id, AgentState::Running);
}
}

// Enforce quota before spawning the streaming task
self.scheduler
.check_quota(agent_id)
Expand Down Expand Up @@ -4059,6 +4075,14 @@ impl OpenFangKernel {
let agent_id = job.agent_id;
let job_name = job.name.clone();

// Wake idle agents for cron execution
if let Some(entry) = kernel.registry.get(agent_id) {
if entry.state == AgentState::Idle {
info!(agent = %entry.name, job = %job_name, "Waking idle agent — cron job due");
let _ = kernel.registry.set_state(agent_id, AgentState::Running);
}
}

match &job.action {
openfang_types::scheduler::CronAction::SystemEvent { text } => {
tracing::debug!(job = %job_name, "Cron: firing system event");
Expand Down Expand Up @@ -4464,25 +4488,52 @@ impl OpenFangKernel {

// --- Unresponsive Running agent ---
if status.unresponsive && status.state == AgentState::Running {
// Mark as Crashed so next cycle triggers recovery
let _ = kernel
.registry
.set_state(status.agent_id, AgentState::Crashed);
warn!(
agent = %status.name,
inactive_secs = status.inactive_secs,
"Unresponsive Running agent marked as Crashed for recovery"
);
// Smart idle detection: check if the agent has pending work
// before marking it as crashed.
let has_pending_cron = kernel.cron_scheduler
.has_due_jobs_soon(status.agent_id, config.default_timeout_secs as i64);
let has_active_bg_task = kernel.background.has_task(status.agent_id);
let has_active_schedule = kernel.registry.get(status.agent_id)
.map(|e| !matches!(e.manifest.schedule, ScheduleMode::Reactive))
.unwrap_or(false);

if has_pending_cron || has_active_bg_task || has_active_schedule {
// Agent SHOULD be alive but isn't responding — real crash
let _ = kernel
.registry
.set_state(status.agent_id, AgentState::Crashed);
warn!(
agent = %status.name,
inactive_secs = status.inactive_secs,
has_pending_cron,
has_active_bg_task,
has_active_schedule,
"Unresponsive Running agent marked as Crashed for recovery"
);

let event = Event::new(
status.agent_id,
EventTarget::System,
EventPayload::System(SystemEvent::HealthCheckFailed {
agent_id: status.agent_id,
unresponsive_secs: status.inactive_secs as u64,
}),
);
kernel.event_bus.publish(event).await;
let event = Event::new(
status.agent_id,
EventTarget::System,
EventPayload::System(SystemEvent::HealthCheckFailed {
agent_id: status.agent_id,
unresponsive_secs: status.inactive_secs as u64,
}),
);
kernel.event_bus.publish(event).await;
} else {
// Agent has nothing to do — mark idle, don't recover
let _ = kernel
.registry
.set_state(status.agent_id, AgentState::Idle);
info!(
agent = %status.name,
inactive_secs = status.inactive_secs,
"Agent has no pending work — marked Idle"
);
// Clear any prior recovery failures so the agent
// starts fresh when it wakes up
recovery_tracker.reset(status.agent_id);
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/openfang-types/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ pub enum AgentState {
Terminated,
/// Agent crashed and is awaiting recovery.
Crashed,
/// Agent is idle — no pending work, will be woken on demand.
Idle,
}

/// Permission-based operational mode for an agent.
Expand Down
Loading