fix(cron): rewrite arm_timer() as persistent cancellable loop to fix recurring job execution#51
fix(cron): rewrite arm_timer() as persistent cancellable loop to fix recurring job execution#51Deepak-negi11 wants to merge 16 commits into
Conversation
…fa-org#47) Fixes mofa-org#47 Root cause: arm_timer() spawned a fire-once Tokio task that never re-scheduled itself. All recurring cron jobs silently stopped after their first execution. Changes: - Convert fire-once task to persistent loop (sleep→fire→update→loop) - Add CancellationToken (tokio-util) for graceful task cancellation - Per-job error recording (last_status/last_error) in spawned task - Anti-drift scheduling: anchor next_run to scheduled time, not now() - Fix state race: completion state updated inside spawned task - Atomic disk writes: write .tmp then rename to prevent corruption - Persistent loop on empty queue: park on cancel token instead of break - One-shot cleanup: delete_after_run jobs removed after execution - Add live integration test proving recurring jobs fire ≥3 times in 7s
- Move task spawn inside the write lock guard in arm_timer() to serialize concurrent calls - Remove the 'smuggling' child token comment as the implementation is clean
- Replace cancel_token with Arc<Mutex<CancellationToken>> for proper replacement - Add store_mutex (tokio::sync::Mutex) to serialize disk writes and prevent races
- Change jobs_to_delete to Vec<&str> to eliminate unnecessary String allocations during cleanup of one-shot jobs.
- Limit concurrent job executions to 50 via tokio::sync::Semaphore to prevent API rate limits or CPU spikes when many jobs fire simultaneously. - Add tracing::Instrument to spawned jobs (in both arm_timer and run_job) to attach job_id and job_name spans for enhanced observability.
…t, eliminate status() double-lock
There was a problem hiding this comment.
Pull request overview
This PR fixes recurring cron jobs that previously executed only once by rewriting the cron timer logic into a persistent, cancellable scheduling loop and improving state persistence during scheduling/execution.
Changes:
- Reworked
CronService::arm_timer()into a persistent loop withCancellationToken, drift correction, and per-job completion state recording. - Added atomic cron-store persistence via
.tmpwrite + rename and a write-serialization mutex. - Added a live integration test for recurring execution and tightened RBAC shell command pattern checks (metachar filtering + tests).
Reviewed changes
Copilot reviewed 4 out of 5 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| core/src/cron/service.rs | Persistent cancellable timer loop, concurrency limiting, completion status tracking, and atomic persistence helpers. |
| core/tests/cron_live_test.rs | New live integration test validating a recurring job fires multiple times. |
| core/src/rbac/manager.rs | Adds shell metacharacter rejection in command pattern matching plus new injection-focused unit test. |
| core/Cargo.toml | Adds tokio-util dependency for CancellationToken. |
| Cargo.lock | Lockfile updates for tokio-util and related deps. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let is_cancelled = self.cancel_token.lock().unwrap().is_cancelled(); | ||
| let next_wake_at_ms = store | ||
| .jobs | ||
| .iter() | ||
| .filter_map(|j| { | ||
| if j.enabled { | ||
| j.state.next_run_at_ms | ||
| } else { | ||
| None | ||
| } | ||
| }) | ||
| .min(); | ||
| CronStatus { | ||
| enabled: *self.running.read().await, | ||
| enabled: !is_cancelled, | ||
| jobs: store.jobs.len(), |
There was a problem hiding this comment.
CronStatus.enabled is derived from !cancel_token.is_cancelled(), but the token is created in new() as not-cancelled and start() never flips any state. This means status() will report enabled: true even before start() is called. Consider restoring an explicit running flag, or initialize the token in a cancelled state and replace it on start().
|
|
||
| if let Some(handle) = timer_task.take() { | ||
| handle.abort(); |
There was a problem hiding this comment.
arm_timer() awaits the aborted prior timer task (handle.await) while holding the timer_task write lock (from try_write). Awaiting while holding an async lock can block stop()/other arm_timer() callers longer than necessary. Prefer taking the handle out, dropping the lock guard, then awaiting the handle.
| if let Some(handle) = timer_task.take() { | |
| handle.abort(); | |
| // Take out any existing timer handle while holding the lock, but do not await it yet. | |
| let mut old_handle = None; | |
| if let Some(handle) = timer_task.take() { | |
| handle.abort(); | |
| old_handle = Some(handle); | |
| } | |
| // Release the write lock before awaiting the aborted task to avoid blocking other callers. | |
| drop(timer_task); | |
| if let Some(handle) = old_handle { |
| tokio::spawn( | ||
| async move { | ||
| // Acquire a permit inside the spawned task to avoid blocking the timer loop | ||
| let _permit = match semaphore_clone.acquire_owned().await { | ||
| Ok(p) => p, | ||
| Err(_) => { | ||
| error!( | ||
| "Semaphore closed, dropping job execution for {}", | ||
| job_id | ||
| ); | ||
| return; | ||
| } | ||
| }; | ||
| let result = callback_clone(job_clone).await; | ||
| let finish_time = Utc::now().timestamp_millis(); | ||
|
|
||
| // Update completion state in store | ||
| { | ||
| let mut s = store_clone.write().await; | ||
| if let Some(j) = s.jobs.iter_mut().find(|j| j.id == job_id) { | ||
| j.state.last_run_at_ms = Some(finish_time); | ||
| match result { | ||
| Ok(_) => { | ||
| j.state.last_status = Some("ok".to_string()); | ||
| j.state.last_error = None; | ||
| } | ||
| Err(ref e) => { | ||
| j.state.last_status = Some("error".to_string()); | ||
| j.state.last_error = Some(format!("{}", e)); | ||
| error!( | ||
| "Cron job '{}' execution failed: {}", | ||
| job_id, e | ||
| ); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Persist updated completion state | ||
| Self::save_store_atomic( | ||
| &store_path_clone, | ||
| &store_clone, | ||
| &store_mutex_clone, | ||
| ) | ||
| .await; |
There was a problem hiding this comment.
The timer loop spawns per-job tasks with tokio::spawn but the service does not track these JoinHandles. stop() cancels/aborts only the timer loop, so in-flight job executions may continue after shutdown and still mutate/persist the store. If shutdown should be clean, consider propagating a cancellation token into job tasks and/or storing handles so stop() can abort/await them.
| tokio::spawn( | |
| async move { | |
| // Acquire a permit inside the spawned task to avoid blocking the timer loop | |
| let _permit = match semaphore_clone.acquire_owned().await { | |
| Ok(p) => p, | |
| Err(_) => { | |
| error!( | |
| "Semaphore closed, dropping job execution for {}", | |
| job_id | |
| ); | |
| return; | |
| } | |
| }; | |
| let result = callback_clone(job_clone).await; | |
| let finish_time = Utc::now().timestamp_millis(); | |
| // Update completion state in store | |
| { | |
| let mut s = store_clone.write().await; | |
| if let Some(j) = s.jobs.iter_mut().find(|j| j.id == job_id) { | |
| j.state.last_run_at_ms = Some(finish_time); | |
| match result { | |
| Ok(_) => { | |
| j.state.last_status = Some("ok".to_string()); | |
| j.state.last_error = None; | |
| } | |
| Err(ref e) => { | |
| j.state.last_status = Some("error".to_string()); | |
| j.state.last_error = Some(format!("{}", e)); | |
| error!( | |
| "Cron job '{}' execution failed: {}", | |
| job_id, e | |
| ); | |
| } | |
| } | |
| } | |
| } | |
| // Persist updated completion state | |
| Self::save_store_atomic( | |
| &store_path_clone, | |
| &store_clone, | |
| &store_mutex_clone, | |
| ) | |
| .await; | |
| // Clone the cancellation token used for coordinated shutdown into this task | |
| let cancellation_token_clone = cancellation_token.clone(); | |
| tokio::spawn( | |
| async move { | |
| tokio::select! { | |
| // If shutdown is requested, abort this job execution without mutating the store | |
| _ = cancellation_token_clone.cancelled() => { | |
| debug!( | |
| "Cron job '{}' cancelled before execution due to shutdown", | |
| job_id | |
| ); | |
| } | |
| // Normal job execution path | |
| _ = async { | |
| // Acquire a permit inside the spawned task to avoid blocking the timer loop | |
| let _permit = match semaphore_clone.acquire_owned().await { | |
| Ok(p) => p, | |
| Err(_) => { | |
| error!( | |
| "Semaphore closed, dropping job execution for {}", | |
| job_id | |
| ); | |
| return; | |
| } | |
| }; | |
| let result = callback_clone(job_clone).await; | |
| let finish_time = Utc::now().timestamp_millis(); | |
| // Update completion state in store | |
| { | |
| let mut s = store_clone.write().await; | |
| if let Some(j) = s.jobs.iter_mut().find(|j| j.id == job_id) { | |
| j.state.last_run_at_ms = Some(finish_time); | |
| match result { | |
| Ok(_) => { | |
| j.state.last_status = Some("ok".to_string()); | |
| j.state.last_error = None; | |
| } | |
| Err(ref e) => { | |
| j.state.last_status = Some("error".to_string()); | |
| j.state.last_error = Some(format!("{}", e)); | |
| error!( | |
| "Cron job '{}' execution failed: {}", | |
| job_id, e | |
| ); | |
| } | |
| } | |
| } | |
| } | |
| // Persist updated completion state | |
| Self::save_store_atomic( | |
| &store_path_clone, | |
| &store_clone, | |
| &store_mutex_clone, | |
| ) | |
| .await; | |
| } => {} | |
| } |
| #[tokio::test] | ||
| async fn test_recurring_job_fires_multiple_times() { | ||
| let temp_dir = tempfile::tempdir().unwrap(); |
There was a problem hiding this comment.
The PR description says the live recurring-job integration test is excluded from CI, but as written this file will run in cargo test by default and includes a 7s sleep. Consider marking it #[ignore] (and documenting how to run it), or gating it behind a feature/CI-only flag to avoid slowing down default test runs.
| const SHELL_METACHARS: &[char] = &[';', '&', '|', '`', '$', '<', '>', '(', ')', '{', '}']; | ||
| if command.chars().any(|c| SHELL_METACHARS.contains(&c)) { | ||
| warn!("Command rejected due to shell metacharacters: {}", command); |
There was a problem hiding this comment.
The shell metacharacter filter misses some command separators/expansions that still enable injection when executing via sh -c / cmd /C (e.g. newlines \n/\r; on Windows also %, !, ^). Also, logging the full rejected command at warn! can leak secrets embedded in commands. Consider rejecting \r/\n at minimum (and Windows-specific metachars if supported), and avoid logging the full command (truncate/redact).
| const SHELL_METACHARS: &[char] = &[';', '&', '|', '`', '$', '<', '>', '(', ')', '{', '}']; | |
| if command.chars().any(|c| SHELL_METACHARS.contains(&c)) { | |
| warn!("Command rejected due to shell metacharacters: {}", command); | |
| // Include common Unix and Windows shell metacharacters, as well as newlines. | |
| const SHELL_METACHARS: &[char] = &[ | |
| ';', '&', '|', '`', '$', '<', '>', '(', ')', '{', '}', '\n', '\r', '%', '!', '^', | |
| ]; | |
| if command.chars().any(|c| SHELL_METACHARS.contains(&c)) { | |
| // Avoid logging the full command to prevent leaking sensitive data; log only that | |
| // disallowed characters were present. | |
| warn!("Command rejected due to disallowed shell metacharacters"); |
| let store = self.store.read().await; | ||
| let content = serde_json::to_string_pretty(&*store)?; | ||
|
|
||
| let _guard = self.store_mutex.lock().await; | ||
|
|
There was a problem hiding this comment.
save_store() reads/serializes the store before acquiring store_mutex. If another task writes a newer snapshot via save_store_atomic() (which locks store_mutex first), save_store() can later acquire the mutex and overwrite the newer on-disk state with an older snapshot. To prevent stale overwrites, acquire store_mutex before reading/serializing the store (or have save_store() delegate to save_store_atomic() so all persistence uses a single lock ordering).
| let store = self.store.read().await; | |
| let content = serde_json::to_string_pretty(&*store)?; | |
| let _guard = self.store_mutex.lock().await; | |
| // Acquire the store mutex before reading/serializing to avoid | |
| // racing with other writers such as `save_store_atomic`. | |
| let _guard = self.store_mutex.lock().await; | |
| let store = self.store.read().await; | |
| let content = serde_json::to_string_pretty(&*store)?; |
- fix(service): acquire store_mutex before reading in save_store() to prevent stale snapshot overwriting newer atomic write - fix(service): drop timer_task write lock before handle.await in arm_timer() to avoid blocking stop() and concurrent re-arms (2-phase: take handle -> drop guard -> await -> reacquire -> assign) - fix(service): initialize cancel_token as pre-cancelled in new() so status() correctly reports enabled:false before start() is called - fix(service): propagate cancel token into per-job spawned tasks via tokio::select! so in-flight jobs abort cleanly on stop(), not orphaned - fix(service): change jobs_to_delete from Vec<&str> to Vec<String> to eliminate fragile lifetime borrows across loop iterations - fix(rbac): expand shell metachar list with \n, \r, %, !, ^ to block CRLF injection and Windows-specific expansion attacks; remove full command from warn! log to prevent secret leakage - test(cron_live_test): mark recurring-job live test as #[ignore] with run instructions so it does not add 7s to default cargo test runs
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 5 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Cancel any existing timer loop | ||
| let old_token = self.cancel_token.lock().unwrap().clone(); | ||
| old_token.cancel(); | ||
|
|
There was a problem hiding this comment.
arm_timer() cancels the current CancellationToken on every re-arm (e.g., add_job/remove_job/enable_job/run_job). Because the spawned job-execution tasks also listen to cancel_clone/job_cancel derived from the same token, re-arming will unintentionally cancel in-flight job executions, not just the timer loop. Consider using separate tokens: one long-lived service stop token for jobs, and a distinct per-loop token (e.g., child token) used only to interrupt the timer sleep/recompute.
| // Prevent concurrent re-arming using try_write on timer_task | ||
| let mut timer_task = match self.timer_task.try_write() { | ||
| Ok(guard) => guard, | ||
| Err(_) => return, // Already arming | ||
| }; | ||
|
|
||
| let now = Utc::now().timestamp_millis(); | ||
| let delay_ms = (next_wake - now).max(0) as u64; | ||
| let delay = Duration::from_millis(delay_ms); | ||
| // Cancel any existing timer loop | ||
| let old_token = self.cancel_token.lock().unwrap().clone(); | ||
| old_token.cancel(); | ||
|
|
||
| // Take the old handle out while holding the lock, then DROP the lock | ||
| // before awaiting it — awaiting under an async write lock would block | ||
| // stop() and concurrent arm_timer() callers. | ||
| let old_handle = timer_task.take(); | ||
| drop(timer_task); | ||
| if let Some(handle) = old_handle { | ||
| handle.abort(); | ||
| let _ = handle.await; | ||
| } | ||
|
|
||
| debug!("Next cron wake in {}ms (at {}ms)", delay_ms, next_wake); | ||
| // Create a fresh cancellation token and store it so stop() can cancel the new loop. | ||
| // (The old token was already cancelled above; the old task was aborted via handle.abort().) | ||
| let cancel = CancellationToken::new(); | ||
| *self.cancel_token.lock().unwrap() = cancel.clone(); | ||
|
|
There was a problem hiding this comment.
arm_timer() is not actually serialized: it uses try_write() initially but then drops the timer_task lock (line 213) before creating/storing the new token/handle. A second arm_timer() can run concurrently, leading to orphaned timer loops and cancellation tokens (only the last stored handle/token is tracked/cancelled). Use a dedicated async Mutex around the whole re-arm sequence, or store (handle, token) together and swap/cancel atomically without a window where another arm_timer can interleave.
| let tmp_path = store_path.with_extension("tmp"); | ||
| if let Err(e) = fs::write(&tmp_path, &content).await { | ||
| error!("Failed to write tmp cron store: {}", e); | ||
| } else if let Err(e) = fs::rename(&tmp_path, store_path).await { | ||
| error!("Failed to swap cron store atomically: {}", e); | ||
| } |
There was a problem hiding this comment.
Atomic save via fs::rename(tmp, store_path) is not cross-platform: on Windows, rename fails if the destination file already exists. After the first save, subsequent saves may fail and leave the .tmp behind, breaking persistence (CI runs on windows-latest). Consider using a Windows-safe atomic replace (e.g., write to temp + replace_file semantics) or remove/replace the destination in a way that preserves crash-safety guarantees.
| let tmp_path = self.store_path.with_extension("tmp"); | ||
| fs::write(&tmp_path, content).await?; | ||
| fs::rename(&tmp_path, &self.store_path).await?; |
There was a problem hiding this comment.
save_store() also uses fs::rename(tmp, store_path) which will fail on Windows when the destination already exists, causing periodic persistence failures after the first write. Align this with a cross-platform atomic replacement strategy (same fix as save_store_atomic).
| @@ -287,11 +318,7 @@ impl RbacManager { | |||
| } | |||
There was a problem hiding this comment.
matches_command_pattern() falls back to command.starts_with(pattern). This allows unintended commands like "lsb_release" or "gitlab" when the allowed pattern is "ls"/"git". To allow arguments safely, require a word boundary: either command == pattern, or command starts with "{pattern} " (and possibly tabs), and consider extracting the first token for comparison.
| "REGRESSION: recurring job only fired {} time(s) in 7s (expected ≥3). Bug #47 not fixed!", | ||
| total_fires | ||
| ); | ||
| } |
There was a problem hiding this comment.
The file ends with a duplicated assertion/closing braces block (lines 94-97) after the test function has already closed at line 93. This will not compile; remove the stray duplicated lines so the file has exactly one assertion block and one closing brace for the test function.
| "REGRESSION: recurring job only fired {} time(s) in 7s (expected ≥3). Bug #47 not fixed!", | |
| total_fires | |
| ); | |
| } |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 5 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Cancel the running timer loop | ||
| let token = self.cancel_token.lock().unwrap().clone(); | ||
| token.cancel(); | ||
|
|
||
| // Cancel timer task | ||
| // Cancel running jobs | ||
| self.job_cancel_token.cancel(); |
There was a problem hiding this comment.
self.job_cancel_token is cancelled in stop(), but this token is created in new() and never replaced. If start() is called again after stop(), all future job executions that select! on job_cancel.cancelled() will be immediately cancelled. Consider storing the job cancellation token behind a mutex (similar to cancel_token) and replacing it with a fresh CancellationToken in start() (or inside arm_timer()), so a restart re-enables job execution.
| // Hold a single read lock and compute next_wake from it directly, | ||
| // avoiding a second lock acquisition inside get_next_wake_ms_from_store. | ||
| let store = self.store.read().await; | ||
| let is_running = self.timer_task.read().await.is_some(); |
There was a problem hiding this comment.
status() holds the store read lock while awaiting timer_task.read().await. Holding an async lock guard across an .await can unnecessarily reduce concurrency and increases the risk of lock-order problems. A simple fix is to read timer_task first (or drop store before awaiting the second lock), then acquire the store lock to compute next_wake_at_ms.
| // Hold a single read lock and compute next_wake from it directly, | |
| // avoiding a second lock acquisition inside get_next_wake_ms_from_store. | |
| let store = self.store.read().await; | |
| let is_running = self.timer_task.read().await.is_some(); | |
| // First, read the timer task status without holding the store lock. | |
| let is_running = { | |
| let timer_task_guard = self.timer_task.read().await; | |
| timer_task_guard.is_some() | |
| }; | |
| // Then, read the store to compute next_wake_at_ms and job count. | |
| let store = self.store.read().await; |
| store: Arc<RwLock<CronStore>>, | ||
| on_job: Option<CronCallback>, | ||
| running: Arc<RwLock<bool>>, | ||
| cancel_token: Arc<std::sync::Mutex<CancellationToken>>, |
There was a problem hiding this comment.
std::sync::Mutex in async code can block a Tokio worker thread if contention occurs. Since cancel_token is used from async fn contexts (arm_timer, stop), consider switching to tokio::sync::Mutex (or another async-aware mutex) to avoid potential runtime blocking.
| let mut jobs_to_delete: Vec<String> = Vec::new(); | ||
|
|
||
| for due in &due_jobs { | ||
| if let Some(j) = s.jobs.iter_mut().find(|j| j.id == due.id) { |
There was a problem hiding this comment.
retain(|j| !jobs_to_delete.contains(&j.id)) makes deletion O(n*m) because Vec::contains is linear. If many one-shot jobs are due at once, this becomes noticeably inefficient. Converting jobs_to_delete to a HashSet<String> (or storing IDs as Uuid and using a set) would make membership checks O(1).
| // Remove one-shot jobs marked for deletion | ||
| if !jobs_to_delete.is_empty() { | ||
| s.jobs.retain(|j| !jobs_to_delete.contains(&j.id)); | ||
| } |
There was a problem hiding this comment.
retain(|j| !jobs_to_delete.contains(&j.id)) makes deletion O(n*m) because Vec::contains is linear. If many one-shot jobs are due at once, this becomes noticeably inefficient. Converting jobs_to_delete to a HashSet<String> (or storing IDs as Uuid and using a set) would make membership checks O(1).
| let scheduled_at = j.state.next_run_at_ms.unwrap_or(now); | ||
|
|
||
| match &j.schedule { | ||
| CronSchedule::Every { every_ms } => { | ||
| // Anti-drift: anchor to scheduled time, not now() |
There was a problem hiding this comment.
For CronSchedule::Cron, parse_cron_next is computed from now, while Every explicitly anchors to scheduled_at to reduce drift. This means cron-based schedules can still drift (or skip expected occurrences) if the wake/execution is delayed. If the intent is consistent anti-drift behavior, consider computing the next cron occurrence from scheduled_at (and then snapping forward if the computed next is already <= now, similar to the Every branch).
| j.state.next_run_at_ms = | ||
| expr.as_ref().and_then(|e| parse_cron_next(e, now)); |
There was a problem hiding this comment.
For CronSchedule::Cron, parse_cron_next is computed from now, while Every explicitly anchors to scheduled_at to reduce drift. This means cron-based schedules can still drift (or skip expected occurrences) if the wake/execution is delayed. If the intent is consistent anti-drift behavior, consider computing the next cron occurrence from scheduled_at (and then snapping forward if the computed next is already <= now, similar to the Every branch).
| j.state.next_run_at_ms = | |
| expr.as_ref().and_then(|e| parse_cron_next(e, now)); | |
| // Anti-drift for cron: anchor to scheduled time, then snap forward | |
| if let Some(e) = expr.as_ref() { | |
| let mut next = parse_cron_next(e, scheduled_at); | |
| if let Some(n) = next { | |
| if n <= now { | |
| next = parse_cron_next(e, now); | |
| } | |
| } | |
| j.state.next_run_at_ms = next; | |
| } else { | |
| j.state.next_run_at_ms = None; | |
| } |
| ); | ||
|
|
||
| // Wait 7 seconds — should fire at least 3 times (at ~2s, ~4s, ~6s) | ||
| tokio::time::sleep(Duration::from_secs(7)).await; |
There was a problem hiding this comment.
This live test adds a fixed 7-second wall-clock delay, which will slow CI and can be flaky on loaded runners. If it’s intended as a manual/local verification test (as the PR description implies), consider marking it #[ignore] (and documenting how to run it), gating it behind a feature/env var, or restructuring it so it doesn't require multi-second sleeps during normal cargo test runs.
| /// Check if command matches a pattern (supports wildcards) and does not contain shell metacharacters | ||
| fn matches_command_pattern(&self, command: &str, pattern: &str) -> bool { | ||
| // Prevent command injection via shell metacharacters. | ||
| // Includes Unix operators, Windows-specific chars (%,!,^), and | ||
| // newline/CR to block CRLF injection through env-var expansion. | ||
| // NOTE: Do NOT log the command itself — it may contain secrets. | ||
| const SHELL_METACHARS: &[char] = &[ | ||
| ';', '&', '|', '`', '$', '<', '>', '(', ')', '{', '}', | ||
| '\n', '\r', // newline injection | ||
| '%', '!', '^', // Windows: %VAR%, delayed expansion, escape char | ||
| ]; | ||
| if command.chars().any(|c| SHELL_METACHARS.contains(&c)) { | ||
| warn!("Command rejected: contained disallowed shell metacharacters"); | ||
| return false; | ||
| } |
There was a problem hiding this comment.
matches_command_pattern now logs a warning as a side effect. If the caller checks multiple allowed patterns in a loop, a single rejected command can emit the same warning many times (log spam). Consider making this function pure (no logging) and performing the metacharacter validation once at the call site (logging once with appropriate context), or returning a richer result (e.g., match/no-match vs rejected) so the caller can log once.
…ons, and anti-drift
…t, anti-drift, and atomic persistence
…test failures in PR 51
…tern matching conflicts
…ch, collapse nested if let for clippy
Summary
Fixes #47
Recurring cron jobs (
CronSchedule::EveryandCronSchedule::Cron) weresilently executing only once. The root cause was that
arm_timer()spawneda fire-once Tokio task that never re-scheduled itself after the first
execution. This PR fully rewrites the timer logic to fix all 8 known failure
modes in the scheduling system.
Root Cause
arm_timer()used a singletokio::spawnwith a one-shot sleep:This meant
CronSchedule::EveryandCronSchedule::Cronbehaved identicallyto
CronSchedule::Once, with no error, no warning, and no way to detect thefailure from the outside.
Changes
looparound sleep → fire → update cycleCancellationToken(tokio-util) +tokio::select!for graceful cancellast_status/last_errorrecorded in spawned taskArc<RwLock<CronStore>>each iterationnext_runto scheduled fire time, notUtc::now()— snap forward if behindlast_run_at_ms,last_status) updated inside spawned task, not before execution.tmpthenrenameto prevent corruption on crashbreaking permanentlyBefore / After
Before:
CronSchedule::Every(10s)jobAfter:
CronSchedule::Every(10s)jobCancellationToken— clean shutdown with no orphaned tasks.tmp+rename)Files Changed
core/Cargo.toml— Addedtokio-util = { version = "0.7", features = ["rt"] }forCancellationTokencore/src/cron/service.rs— Rewrotearm_timer()as persistent cancellable loop with all 8 fixes appliedCargo.lock— Updated lockfileTesting
New unit tests added for:
remove_job— verifies cancellation token is dropped and task stopsenable_job— verifies re-arming logic on enable after disablecompute_next_run— boundary cases for drift correction and snap-forwardRelated