Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions src/server/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,53 @@ impl<C> Server<C> {
chrono::Utc::now().timestamp_millis()
}

/// Test-only synchronization hook for the concurrent `initialize_jobs` dedup test.
///
/// When the env var `TORC_TEST_INITIALIZE_JOBS_BARRIER` is set to an integer N >= 2, the
/// first N `initialize_jobs` task-creation calls block here until all N have completed their
/// INSERT/dedup decision, then proceed together. Because a task's worker is only spawned
/// after `create_or_get_initialize_jobs_task` returns, holding here keeps the winning
/// caller's task in `queued` (active) state until the other caller's INSERT has run --
/// guaranteeing the partial unique index fires and the dedup path is taken deterministically,
/// rather than racing the first task's completion.
///
/// This is a no-op whenever the env var is unset, which is always the case in normal
/// operation. A bounded timeout ensures a misconfigured N can never hang the server.
async fn test_initialize_jobs_rendezvous() {
struct Rendezvous {
barrier: tokio::sync::Barrier,
n: usize,
seen: std::sync::atomic::AtomicUsize,
}
static RENDEZVOUS: std::sync::OnceLock<Option<Rendezvous>> = std::sync::OnceLock::new();
let rendezvous = RENDEZVOUS.get_or_init(|| {
std::env::var("TORC_TEST_INITIALIZE_JOBS_BARRIER")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.filter(|&n| n >= 2)
.map(|n| Rendezvous {
barrier: tokio::sync::Barrier::new(n),
n,
seen: std::sync::atomic::AtomicUsize::new(0),
})
});
if let Some(rendezvous) = rendezvous {
// Only the first N callers participate in the rendezvous. `tokio::sync::Barrier` is
// reusable, so without this guard every subsequent group of N calls would block again
// (and an unmatched later call would hang for the full timeout).
let prev = rendezvous
.seen
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if prev < rendezvous.n {
let _ = tokio::time::timeout(
std::time::Duration::from_secs(30),
rendezvous.barrier.wait(),
)
.await;
}
}
}
Comment on lines +424 to +457

/// Load the single active async task for a workflow, if any.
pub(super) async fn get_active_task(
&self,
Expand Down Expand Up @@ -506,6 +553,10 @@ impl<C> Server<C> {

match insert_result {
Ok(result) => {
// Test-only rendezvous: hold here before returning so the task stays
// `queued` (its worker is not spawned until this function returns) until any
// concurrent caller has run its own INSERT/dedup. No-op in normal operation.
Self::test_initialize_jobs_rendezvous().await;
return Ok(TaskCreation::Created(models::TaskModel::new(
result.last_insert_rowid(),
workflow_id,
Expand Down Expand Up @@ -557,6 +608,9 @@ impl<C> Server<C> {
});
}

// Test-only rendezvous: matches the wait on the `Created` path so both
// concurrent callers leave this function together. No-op in normal operation.
Self::test_initialize_jobs_rendezvous().await;
return Ok(TaskCreation::Existing(existing));
}
Err(e) => {
Expand Down
35 changes: 31 additions & 4 deletions tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,17 @@ pub fn run_torc_standalone_ok(
}

fn start_process(db_url: &str, db_file: NamedTempFile) -> ServerProcess {
start_process_with_env(db_url, db_file, &[])
}

/// Like [`start_process`] but injects additional environment variables into the spawned
/// `torc-server` process. Used by tests that need to enable server-side test hooks (e.g. the
/// `TORC_TEST_INITIALIZE_JOBS_BARRIER` rendezvous for the concurrent-dedup test).
fn start_process_with_env(
db_url: &str,
db_file: NamedTempFile,
extra_env: &[(&str, &str)],
) -> ServerProcess {
println!("Setting up database with url: {}", db_url);
let status = Command::new("sqlx")
.arg("--no-dotenv")
Expand Down Expand Up @@ -264,7 +275,8 @@ fn start_process(db_url: &str, db_file: NamedTempFile) -> ServerProcess {
"Starting server on port {} (attempt {}/{})",
port, attempt, SERVER_START_ATTEMPTS
);
let mut child = Command::new(get_exe_path("./target/debug/torc-server"))
let mut command = Command::new(get_exe_path("./target/debug/torc-server"));
command
.arg("run")
.arg("--port")
.arg(port.to_string())
Expand All @@ -273,9 +285,11 @@ fn start_process(db_url: &str, db_file: NamedTempFile) -> ServerProcess {
.env("DATABASE_URL", db_url)
.env("RUST_LOG", "info")
.stdout(std::process::Stdio::inherit())
.stderr(std::process::Stdio::inherit())
.spawn()
.expect("failed to start server process");
.stderr(std::process::Stdio::inherit());
for (key, value) in extra_env {
command.env(key, value);
}
let mut child = command.spawn().expect("failed to start server process");

let pid = child.id();

Expand Down Expand Up @@ -332,6 +346,19 @@ pub fn start_server() -> ServerProcess {
process
}

/// Start a dedicated server instance with extra environment variables.
///
/// Unlike [`start_server`], this is not a `#[once]` fixture: each call spawns a fresh,
/// isolated `torc-server` with its own database. Use it for tests that need to enable a
/// server-side test hook (via env var) without affecting servers shared by other tests.
pub fn start_server_with_env(extra_env: &[(&str, &str)]) -> ServerProcess {
let _ = env_logger::try_init();

let db_file = NamedTempFile::new().expect("Failed to create temporary file");
let url = format!("sqlite:{}", db_file.path().display());
start_process_with_env(&url, db_file, extra_env)
}

pub fn create_test_workflow(config: &Configuration, workflow_name: &str) -> models::WorkflowModel {
let user = "test_user".to_string();
let workflow = models::WorkflowModel::new(workflow_name.to_string(), user);
Expand Down
17 changes: 12 additions & 5 deletions tests/test_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,22 @@ fn test_initialize_jobs_async_creates_task_and_emits_sse(start_server: &ServerPr
}
}

#[rstest]
#[test]
#[serial]
fn test_initialize_jobs_async_concurrent_requests_return_same_task(start_server: &ServerProcess) {
let server = start_server;
fn test_initialize_jobs_async_concurrent_requests_return_same_task() {
// Dedicated server with the server-side rendezvous hook enabled: the first two concurrent
// `initialize_jobs` task-creation calls block inside the server until both have run their
// INSERT/dedup, then proceed together. This makes the dedup deterministic -- the second
// request's INSERT is guaranteed to see the first task still active -- instead of relying on
// initialization being slow enough for the requests to overlap by chance.
let server = common::start_server_with_env(&[("TORC_TEST_INITIALIZE_JOBS_BARRIER", "2")]);
let server = &server;
let workflow = common::create_test_workflow(&server.config, "tasks-test-idempotent-workflow");
let workflow_id = workflow.id.unwrap();

// Create enough jobs to make initialization take long enough for a concurrent request to race.
for i in 0..50 {
// A few jobs so initialization has real work to do and succeeds; the count no longer affects
// determinism (the server-side barrier guarantees the requests overlap).
for i in 0..5 {
let job_name = format!("job_{i}");
let _job = common::create_test_job(&server.config, workflow_id, &job_name);
}
Expand Down
Loading