Skip to content

Commit 3ba09d2

Browse files
thomhurstclaude
andcommitted
feat: Add ProcessInParallel extension methods for IAsyncEnumerable<T>
- Add ProcessInParallel() methods that return IEnumerable<T> when awaited - Support both simple parallel processing and with transformations - Include overloads for max concurrency and thread pool scheduling - Add comprehensive unit tests for all new extension methods - Include example demonstrating usage and performance benefits - Achieve ~20x speedup for parallel vs sequential processing 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent 5b9bbd4 commit 3ba09d2

File tree

4 files changed

+510
-0
lines changed

4 files changed

+510
-0
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
#if NET6_0_OR_GREATER
2+
using System;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Runtime.CompilerServices;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using EnumerableAsyncProcessor.Extensions;
9+
10+
namespace EnumerableAsyncProcessor.Example;
11+
12+
public static class ProcessInParallelExample
13+
{
14+
public static async Task RunExample()
15+
{
16+
Console.WriteLine("ProcessInParallel Extension Examples");
17+
Console.WriteLine("====================================\n");
18+
19+
// Example 1: Simple parallel processing without transformation
20+
Console.WriteLine("Example 1: Simple parallel processing");
21+
var asyncEnumerable1 = GenerateAsyncEnumerable(5);
22+
var results1 = await asyncEnumerable1.ProcessInParallel();
23+
Console.WriteLine($"Results: {string.Join(", ", results1)}");
24+
25+
// Example 2: Parallel processing with transformation
26+
Console.WriteLine("\nExample 2: Parallel processing with transformation");
27+
var asyncEnumerable2 = GenerateAsyncEnumerable(5);
28+
var results2 = await asyncEnumerable2.ProcessInParallel(
29+
async item =>
30+
{
31+
await Task.Delay(100); // Simulate async work
32+
return item * 2;
33+
});
34+
Console.WriteLine($"Transformed results: {string.Join(", ", results2)}");
35+
36+
// Example 3: Parallel processing with max concurrency
37+
Console.WriteLine("\nExample 3: Parallel processing with max concurrency (3)");
38+
var asyncEnumerable3 = GenerateAsyncEnumerable(10);
39+
var results3 = await asyncEnumerable3.ProcessInParallel(
40+
async item =>
41+
{
42+
Console.WriteLine($"Processing item {item}");
43+
await Task.Delay(100);
44+
return $"Item-{item}";
45+
},
46+
maxConcurrency: 3);
47+
Console.WriteLine($"Results count: {results3.Count()}");
48+
49+
// Example 4: Performance comparison
50+
Console.WriteLine("\nExample 4: Performance comparison");
51+
var itemCount = 20;
52+
var asyncEnumerable4 = GenerateAsyncEnumerable(itemCount);
53+
54+
var start = DateTime.Now;
55+
var sequentialResults = new List<int>();
56+
await foreach (var item in asyncEnumerable4)
57+
{
58+
await Task.Delay(50); // Simulate work
59+
sequentialResults.Add(item * 2);
60+
}
61+
var sequentialTime = DateTime.Now - start;
62+
63+
var asyncEnumerable5 = GenerateAsyncEnumerable(itemCount);
64+
start = DateTime.Now;
65+
var parallelResults = await asyncEnumerable5.ProcessInParallel(
66+
async item =>
67+
{
68+
await Task.Delay(50); // Simulate work
69+
return item * 2;
70+
});
71+
var parallelTime = DateTime.Now - start;
72+
73+
Console.WriteLine($"Sequential processing time: {sequentialTime.TotalMilliseconds:F0}ms");
74+
Console.WriteLine($"Parallel processing time: {parallelTime.TotalMilliseconds:F0}ms");
75+
Console.WriteLine($"Speedup: {sequentialTime.TotalMilliseconds / parallelTime.TotalMilliseconds:F1}x");
76+
}
77+
78+
private static async IAsyncEnumerable<int> GenerateAsyncEnumerable(
79+
int count,
80+
[EnumeratorCancellation] CancellationToken cancellationToken = default)
81+
{
82+
for (int i = 1; i <= count; i++)
83+
{
84+
await Task.Yield();
85+
cancellationToken.ThrowIfCancellationRequested();
86+
yield return i;
87+
}
88+
}
89+
}
90+
#endif

EnumerableAsyncProcessor.Example/Program.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,8 @@ Task<HttpResponseMessage> PingAsync()
7070
// Run IAsyncEnumerable examples
7171
Console.WriteLine("\n\n=== Running IAsyncEnumerable Examples ===\n");
7272
await AsyncEnumerableExample.RunExamples();
73+
74+
// Run ProcessInParallel examples
75+
Console.WriteLine("\n\n=== Running ProcessInParallel Extension Examples ===\n");
76+
await ProcessInParallelExample.RunExample();
7377
#endif
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
#if NET6_0_OR_GREATER
2+
using System;
3+
using System.Collections.Generic;
4+
using System.Diagnostics;
5+
using System.Linq;
6+
using System.Runtime.CompilerServices;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using EnumerableAsyncProcessor.Extensions;
10+
using TUnit.Assertions;
11+
using TUnit.Core;
12+
13+
namespace EnumerableAsyncProcessor.UnitTests;
14+
15+
public class AsyncEnumerableParallelExtensionsTests
16+
{
17+
private static async IAsyncEnumerable<int> GenerateAsyncEnumerable(int count, [EnumeratorCancellation] CancellationToken cancellationToken = default)
18+
{
19+
for (int i = 1; i <= count; i++)
20+
{
21+
await Task.Yield();
22+
cancellationToken.ThrowIfCancellationRequested();
23+
yield return i;
24+
}
25+
}
26+
27+
private static async IAsyncEnumerable<int> GenerateDelayedAsyncEnumerable(int count, int delayMs, [EnumeratorCancellation] CancellationToken cancellationToken = default)
28+
{
29+
for (int i = 1; i <= count; i++)
30+
{
31+
await Task.Delay(delayMs, cancellationToken);
32+
yield return i;
33+
}
34+
}
35+
36+
[Test]
37+
public async Task ProcessInParallel_WithoutTransformation_ReturnsAllItems()
38+
{
39+
var asyncEnumerable = GenerateAsyncEnumerable(10);
40+
41+
var results = await asyncEnumerable.ProcessInParallel();
42+
43+
await Assert.That(results.Count()).IsEqualTo(10);
44+
await Assert.That(results.OrderBy(x => x)).IsEquivalentTo(Enumerable.Range(1, 10));
45+
}
46+
47+
[Test]
48+
public async Task ProcessInParallel_WithMaxConcurrency_ReturnsAllItems()
49+
{
50+
var asyncEnumerable = GenerateAsyncEnumerable(20);
51+
52+
var results = await asyncEnumerable.ProcessInParallel(5);
53+
54+
await Assert.That(results.Count()).IsEqualTo(20);
55+
await Assert.That(results.OrderBy(x => x)).IsEquivalentTo(Enumerable.Range(1, 20));
56+
}
57+
58+
[Test]
59+
public async Task ProcessInParallel_WithTransformation_ReturnsTransformedItems()
60+
{
61+
var asyncEnumerable = GenerateAsyncEnumerable(5);
62+
63+
var results = await asyncEnumerable.ProcessInParallel(
64+
async item =>
65+
{
66+
await Task.Delay(10);
67+
return item * 2;
68+
});
69+
70+
await Assert.That(results.Count()).IsEqualTo(5);
71+
await Assert.That(results.OrderBy(x => x)).IsEquivalentTo(new[] { 2, 4, 6, 8, 10 });
72+
}
73+
74+
[Test]
75+
public async Task ProcessInParallel_WithTransformationAndMaxConcurrency_ReturnsTransformedItems()
76+
{
77+
var asyncEnumerable = GenerateAsyncEnumerable(10);
78+
79+
var results = await asyncEnumerable.ProcessInParallel(
80+
async item =>
81+
{
82+
await Task.Delay(10);
83+
return item.ToString();
84+
},
85+
maxConcurrency: 3);
86+
87+
await Assert.That(results.Count()).IsEqualTo(10);
88+
await Assert.That(results.OrderBy(x => int.Parse(x))).IsEquivalentTo(
89+
Enumerable.Range(1, 10).Select(i => i.ToString()));
90+
}
91+
92+
[Test]
93+
public async Task ProcessInParallel_WithCancellation_ThrowsOperationCanceledException()
94+
{
95+
using var cts = new CancellationTokenSource();
96+
var asyncEnumerable = GenerateDelayedAsyncEnumerable(100, 50);
97+
98+
var task = asyncEnumerable.ProcessInParallel(cancellationToken: cts.Token);
99+
100+
// Cancel after a short delay
101+
cts.CancelAfter(100);
102+
103+
await Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
104+
}
105+
106+
[Test]
107+
public async Task ProcessInParallel_ActuallyRunsInParallel()
108+
{
109+
var asyncEnumerable = GenerateAsyncEnumerable(5);
110+
var stopwatch = Stopwatch.StartNew();
111+
112+
var results = await asyncEnumerable.ProcessInParallel(
113+
async item =>
114+
{
115+
await Task.Delay(100); // Each item takes 100ms
116+
return item;
117+
});
118+
119+
stopwatch.Stop();
120+
121+
// If running sequentially, it would take ~500ms
122+
// In parallel, it should take ~100ms (plus some overhead)
123+
await Assert.That(stopwatch.ElapsedMilliseconds).IsLessThan(250);
124+
await Assert.That(results.Count()).IsEqualTo(5);
125+
}
126+
127+
[Test]
128+
public async Task ProcessInParallel_WithMaxConcurrency_LimitsConcurrency()
129+
{
130+
var asyncEnumerable = GenerateAsyncEnumerable(10);
131+
var maxConcurrentTasks = 0;
132+
var currentConcurrentTasks = 0;
133+
var lockObj = new object();
134+
135+
var results = await asyncEnumerable.ProcessInParallel(
136+
async item =>
137+
{
138+
lock (lockObj)
139+
{
140+
currentConcurrentTasks++;
141+
maxConcurrentTasks = Math.Max(maxConcurrentTasks, currentConcurrentTasks);
142+
}
143+
144+
await Task.Delay(50); // Simulate work
145+
146+
lock (lockObj)
147+
{
148+
currentConcurrentTasks--;
149+
}
150+
151+
return item;
152+
},
153+
maxConcurrency: 3);
154+
155+
await Assert.That(maxConcurrentTasks).IsLessThanOrEqualTo(3);
156+
await Assert.That(results.Count()).IsEqualTo(10);
157+
}
158+
159+
[Test]
160+
public async Task ProcessInParallel_EmptyEnumerable_ReturnsEmptyResult()
161+
{
162+
var asyncEnumerable = GenerateAsyncEnumerable(0);
163+
164+
var results = await asyncEnumerable.ProcessInParallel();
165+
166+
await Assert.That(results.Count()).IsEqualTo(0);
167+
}
168+
169+
[Test]
170+
public async Task ProcessInParallel_WithScheduleOnThreadPool_ProcessesAllItems()
171+
{
172+
var asyncEnumerable = GenerateAsyncEnumerable(10);
173+
174+
var results = await asyncEnumerable.ProcessInParallel(
175+
maxConcurrency: null,
176+
scheduleOnThreadPool: true);
177+
178+
await Assert.That(results.Count()).IsEqualTo(10);
179+
await Assert.That(results.OrderBy(x => x)).IsEquivalentTo(Enumerable.Range(1, 10));
180+
}
181+
182+
[Test]
183+
public async Task ProcessInParallel_WithTransformationScheduleOnThreadPool_ProcessesAllItems()
184+
{
185+
var asyncEnumerable = GenerateAsyncEnumerable(10);
186+
187+
var results = await asyncEnumerable.ProcessInParallel(
188+
async item =>
189+
{
190+
await Task.Delay(10);
191+
return item * 3;
192+
},
193+
maxConcurrency: null,
194+
scheduleOnThreadPool: true);
195+
196+
await Assert.That(results.Count()).IsEqualTo(10);
197+
await Assert.That(results.OrderBy(x => x)).IsEquivalentTo(
198+
Enumerable.Range(1, 10).Select(i => i * 3));
199+
}
200+
201+
[Test]
202+
public async Task ProcessInParallel_PreservesExceptionFromTransformation()
203+
{
204+
var asyncEnumerable = GenerateAsyncEnumerable(5);
205+
206+
var task = asyncEnumerable.ProcessInParallel(
207+
async item =>
208+
{
209+
await Task.Delay(10);
210+
if (item == 3)
211+
{
212+
throw new InvalidOperationException("Test exception");
213+
}
214+
return item;
215+
});
216+
217+
var exception = await Assert.ThrowsAsync<InvalidOperationException>(async () => await task);
218+
await Assert.That(exception.Message).IsEqualTo("Test exception");
219+
}
220+
}
221+
#endif

0 commit comments

Comments
 (0)