Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions src/client/commands/recover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,17 +827,25 @@ pub fn reset_failed_jobs(
return Ok(0);
}

let job_count = job_ids.len();

apis::workflows_api::reset_workflow_status(config, workflow_id, None)
.map_err(|e| format!("Failed to reset workflow status: {}", e))?;
info!(" Reset workflow status for workflow {}", workflow_id);

apis::workflows_api::reset_job_status(config, workflow_id, Some(true))
// NOTE: do not reset workflow status here. `reset_workflow_status` bumps
// run_id, and every caller of this function follows it with
// `reinitialize_workflow`, which already resets workflow status (and bumps
// run_id) exactly once. Resetting here too bumps run_id twice per recovery,
// leaving a gap (e.g. a recovered job jumping from run 1 to run 3). Reset
Comment thread
daniel-thom marked this conversation as resolved.
// only the failed jobs so the run_id bump stays single.
//
// `reset_job_status` resets all retryable failed jobs workflow-wide (the
// `job_ids` argument here only gates the no-op early return above), so use
// the server-reported `updated_count` for an accurate count, not job_ids.len().
let response = apis::workflows_api::reset_job_status(config, workflow_id, Some(true))
.map_err(|e| format!("Failed to reset failed job status: {}", e))?;
info!(" Reset failed job status for workflow {}", workflow_id);
let reset_count = response.updated_count.max(0) as usize;
info!(
" Reset {} failed job(s) for workflow {}",
reset_count, workflow_id
);

Ok(job_count)
Ok(reset_count)
}

/// Reinitialize the workflow (set up dependencies and fire on_workflow_start actions)
Expand Down
44 changes: 44 additions & 0 deletions tests/test_workflow_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,50 @@ fn test_reinitialize_workflow_real(start_server: &ServerProcess) {
assert_eq!(workflow_after.run_id.unwrap_or(0), original_run_id + 1);
}

#[rstest]
fn test_recover_reset_sequence_bumps_run_id_once(start_server: &ServerProcess) {
let config = start_server.config.clone();
let (manager, workflow) =
create_test_workflow_manager(config.clone(), "test_recover_run_id_once");
let workflow_id = workflow.id.unwrap();

// Initialize and run a job so the workflow has an established run.
let (job_id, _run_id) = execute_workflow_with_job(
&config,
&manager,
workflow_id,
"test_job",
"echo 'test'",
None,
)
.expect("execute workflow with job");

let before = apis::workflows_api::get_workflow(&config, workflow_id)
.expect("get workflow")
.run_id
.unwrap_or(0);

// The recover reset sequence: reset the failed jobs, then reinitialize.
// Together these must advance run_id by exactly 1 -- reset_failed_jobs no
// longer resets workflow status, so the single bump comes from
// reinitialize_workflow. Before the fix this bumped twice and skipped a
// run_id.
torc::client::commands::recover::reset_failed_jobs(&config, workflow_id, &[job_id])
.expect("reset_failed_jobs");
torc::client::commands::recover::reinitialize_workflow(&config, workflow_id)
.expect("reinitialize_workflow");

let after = apis::workflows_api::get_workflow(&config, workflow_id)
.expect("get workflow after")
.run_id
.unwrap_or(0);
assert_eq!(
after,
before + 1,
"recover reset sequence must bump run_id exactly once"
);
}

#[rstest]
fn test_get_run_id(start_server: &ServerProcess) {
let config = start_server.config.clone();
Expand Down
Loading