Skip to content

Conversation

@zhangfengcdt
Copy link
Member

This PR introduces an optional sequential index building mode for spatial joins as an alternative to the default parallel implementation.

A new use_sequential_index_build boolean flag (default: false) controls the index building strategy. The sequential implementation via build_index_sync collects partitions one-by-one without spawning async tasks, supporting execution contexts that lack full async runtime support.

Key types and functions are now publicly exported: build_index, build_index_sync, SpatialIndex, SpatialIndexBuilder, SpatialJoinBuildMetrics, and SpatialPredicate, enabling external components to programmatically build and manage spatial indexes.

This PR is to support external use of the spatial join executor in other query engine projects (single or distributed), such as Apache Datafusion Comet.

Resolved conflicts after major refactoring:
- Moved build_index and build_index_sync to new build_index.rs module
- Updated to use new refactored module structure (index submodules)
- Fixed OnceAsync usage pattern (lock -> get_or_insert -> try_once)
- Made SpatialIndex, SpatialJoinBuildMetrics, SpatialIndexBuilder public for Comet
- Preserved build_index_sync for JNI execution contexts
- Added timing instrumentation for index building performance

The key feature preserved: using build_index_sync (sequential collection)
instead of build_index (parallel with JoinSet) to avoid deadlocks in
Comet/JNI synchronous execution contexts.
Comment on lines 36 to 38
/// Synchronous version of build_index that doesn't spawn tasks
/// Used in execution contexts without async runtime support (e.g., Spark/Comet JNI)
pub async fn build_index_sync(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This newly added build_index_sync is also an async function. Why can we not directly use build_index, which is also an async function?

For the use case of Comet, I believe that the DataFusion physical plans constructed by Comet are all single partition. What problem have you encountered when using build_index?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kontinuation Yes, I think the function name might be a bit confusing, I can rename it to build_index_sequential to better convey that it avoids task spawning.

So basically, the reason we need this (instead of using build_index directly) is because the limitation with JoinSet::spawn() in JNI contexts: when running in a Spark/Comet JNI context, JoinSet::spawn() fails because:

  1. The tokio runtime in JNI contexts may be single-threaded or have limited threading capabilities
  2. JoinSet::spawn() requires a multi-threaded runtime to spawn new tasks

Even with single-partition DataFusion plans (which is typical for Comet), collect_all still calls JoinSet::spawn():

  // In collect_all:
  let mut join_set = JoinSet::new();
  for ... {
      join_set.spawn(async move { ... });  // <- This fails in JNI contexts
  }
  join_set.join_all().await;  // Wait for all tasks

The issue isn't about parallelism benefits (single partition means nothing to parallelize anyway), but about JoinSet::spawn() itself not working in the JNI execution environment.

When I call build_index directly in JNI context, it hanged forever. I guess this might be caused by a deadlock by JoinSet::spawn().

In a JNI context:

  1. When JoinSet::spawn() is called, it schedules new tasks to run on the runtime's worker threads
  2. Then join_all().await blocks the current thread waiting for those spawned tasks
  3. But if the current thread IS the only worker thread (or all workers are blocked), no one can execute the spawned tasks
  4. Deadlock: the current thread waits for tasks that need the current thread to execute

It would be ideal to reuse build_index if there is a workaround for it, but I also think using a clean build_index_sequential makes sense because this is called in a distributed framework and the parallelism is provided and dictated by the distributed schedulings themselves.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comet is using a multi-thread Tokio runtime according to jni_api.rs, and the Tokio runtime will use default config if COMET_* environment variables are not present.

The problem may not be the usage of single-threaded runtime, probably there are too-many in-flight queries and the workers pool have been exhausted. No matter what the actual problem is, a sequential index building option is always a good thing to have.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to reproduce this by spawning some async tasks in the native comet scan operator, but Comet tests involving native scan operator finished successfully and did not hang. I have also tried setting env COMET_WORKER_THREADS=1 and still could not observe hanging. The problem could be in somewhere else and not related to spawning async tasks.

impl Stream for ScanStream<'_> {
    type Item = DataFusionResult<RecordBatch>;

    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // ...

        println!("polling batches from scan: spawn an async task");
        let mut join_set = JoinSet::new();
        for k in 0..10 {
            let res = join_set.spawn(async move {
                println!("hello {}", k);
                k
            });
            println!("spawned async task: {:?}", res);
        }

        let fut = join_set.join_all();
        let pin_fut = pin!(fut);
        let res = ready!(pin_fut.poll(ctx));
        println!("async task result: {:?}", res);

        // ...
    }

    // ...
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your tests directly places JoinSet::spawn directly inside poll_next. The hang I observed might not be directly caused by JoinSet::spawn itself. The difference might be:

  • Your test: Spawns directly inside poll_next using the provided Context
  • Spatial join: Uses OnceAsync for lazy initialization, which has its own coordination logic

The interaction between block_on(async { poll!(...) }), OnceAsync, and JoinSet might behave differently than direct spawning in poll_next.

But in SpatialJoinExec, build_index is called through OnceAsync, which has its own coordination logic. The execution path is:

  1. JNI calls get_runtime().block_on(async { poll!(stream.next()) })
  2. Stream's poll_next triggers OnceAsync
  3. OnceAsync calls build_index → collect_all → JoinSet::spawn

@zhangfengcdt zhangfengcdt marked this pull request as ready for review December 1, 2025 18:57
Copy link
Member

@Kontinuation Kontinuation left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build_index_sync and build_index has lots of duplicated code. Can we add a parameter to build_index to switch between the concurrent mode and sequential mode?

- rename to build_index_seq
- created a private build_index_impl function with a concurrent: bool parameter
- remove duplicated logic from the code
@zhangfengcdt
Copy link
Member Author

build_index_sync and build_index has lots of duplicated code. Can we add a parameter to build_index to switch between the concurrent mode and sequential mode?

@Kontinuation I have refactored the build_index_sync to (1) rename it to build_index_seq (2) add a new flag to control concurrency (3) remove duplicated logic.

Could you please take a look again? Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants