Skip to content

[core] fix Async.foreach and derived methods#1514

Merged
fwbrasil merged 9 commits intomainfrom
foreach-improvements
Apr 12, 2026
Merged

[core] fix Async.foreach and derived methods#1514
fwbrasil merged 9 commits intomainfrom
foreach-improvements

Conversation

@fwbrasil
Copy link
Copy Markdown
Collaborator

@fwbrasil fwbrasil commented Apr 8, 2026

Problem

The bounded concurrency in Async.foreach* and derived methods had two issues:

  1. Index bug: f(idx + idx2, v) used the group index instead of the item offset, producing wrong indices for all groups after the first (e.g. 0,1,2,3,1,2,3,4,2,3 instead of 0..9). See tests.

  2. Static batching: items were split into equal-sized groups with sequential execution within each group. Workers that finished fast batches sat idle while slow batches continued, wasting concurrency.

Solution

Replace implementation with a new Fiber.internal.foreachIndexedBounded: spawns min(size, concurrency) worker fibers that pull items dynamically using a shared AtomicInt counter.

The bounded path in Async.foreachIndexed had two bugs:

1. Index bug: `f(idx + idx2, v)` used the group index instead of
   the item offset, producing wrong indices for all groups after
   the first (e.g. 0,1,2,3,1,2,3,4,2,3 instead of 0..9).

2. Static batching: items were split into equal-sized groups with
   sequential execution within each group. Workers that finished
   fast batches sat idle while slow batches continued, wasting
   concurrency.

Replace with work-stealing via foreachIndexedBounded: spawns
min(size, concurrency) worker fibers that pull items dynamically
from a shared AtomicInt counter. No idle workers, correct indices,
same IOPromise/IOTask machinery as foreachIndexed.
Copy link
Copy Markdown
Collaborator

@hearnadam hearnadam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did I not notice this?

Fiber.internal.foreachIndexed(Chunk.from(iterable.grouped(groupSize)))((idx, group) =>
Kyo.foreachIndexed(Chunk.from(group))((idx2, v) => isolate.isolate(state, f(idx + idx2, v)))
).map(_.use(r => Kyo.foreach(r.flattenChunk)(isolate.restore)))
val items = Chunk.from(iterable).toIndexed
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val items = Chunk.from(iterable).toIndexed
val items = Chunk.Indexed.from(iterable)

val size = items.size
if size == 0 then Fiber.succeed(Chunk.empty)
else
val numWorkers = Math.min(size, concurrency)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is 1, should we dispatch to Kyo.foreach?

@ahoy-jon
Copy link
Copy Markdown
Collaborator

How did I not notice this?

Me neither

@fwbrasil fwbrasil merged commit 12878f0 into main Apr 12, 2026
@fwbrasil fwbrasil deleted the foreach-improvements branch April 12, 2026 21:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants