Skip to content

Commit 2709981

Browse files
committed
Added a new overload for .Synchronize() utilizing System.Threading.Lock from .NET 9.
1 parent 98adf33 commit 2709981

File tree

8 files changed

+410
-94
lines changed

8 files changed

+410
-94
lines changed

Rx.NET/Source/Directory.Build.targets

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
<PropertyGroup Condition="$(TargetFramework.StartsWith('net6.0')) or $(TargetFramework.StartsWith('net7.0')) or $(TargetFramework.StartsWith('net8.0'))">
1919
<DefineConstants>$(DefineConstants);HAS_TRIMMABILITY_ATTRIBUTES</DefineConstants>
2020
</PropertyGroup>
21-
<PropertyGroup Condition="$(TargetFramework.StartsWith('net6.0-windows')) or $(TargetFramework.StartsWith('net8.0-windows')) or $(TargetFramework.StartsWith('net9.0-windows'))">
21+
<PropertyGroup Condition="$(TargetFramework.StartsWith('net6.0-windows'))">
2222
<DefineConstants>$(DefineConstants);HAS_WINRT;HAS_WINFORMS;HAS_WPF;HAS_DISPATCHER;DESKTOPCLR;WINDOWS;CSWINRT</DefineConstants>
2323
</PropertyGroup>
24+
<PropertyGroup Condition="$(TargetFramework.StartsWith('net9.0'))">
25+
<DefineConstants>$(DefineConstants);HAS_SYSTEM_THREADING_LOCK</DefineConstants>
26+
</PropertyGroup>
2427

2528
<ItemGroup Condition="('$(TargetFramework)' == 'net472' or '$(TargetFramework)' == 'uap10.0.18362' or '$(TargetFramework)' == 'netstandard2.0') and $(IsPackable)">
2629
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" />

Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.Synchronize.cs

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT License.
3-
// See the LICENSE file in the project root for more information.
3+
// See the LICENSE file in the project root for more information.
4+
using System.Threading;
45

56
namespace System.Reactive.Concurrency
67
{
7-
internal sealed class Synchronize<TSource> : Producer<TSource, Synchronize<TSource>._>
8+
internal sealed class SynchronizeWithObject<TSource> : Producer<TSource, SynchronizeWithObject<TSource>._>
89
{
910
private readonly IObservable<TSource> _source;
1011
private readonly object? _gate;
1112

12-
public Synchronize(IObservable<TSource> source, object gate)
13+
public SynchronizeWithObject(IObservable<TSource> source, object gate)
1314
{
1415
_source = source;
1516
_gate = gate;
1617
}
1718

18-
public Synchronize(IObservable<TSource> source)
19+
public SynchronizeWithObject(IObservable<TSource> source)
1920
{
2021
_source = source;
2122
}
@@ -28,7 +29,7 @@ internal sealed class _ : IdentitySink<TSource>
2829
{
2930
private readonly object _gate;
3031

31-
public _(Synchronize<TSource> parent, IObserver<TSource> observer)
32+
public _(SynchronizeWithObject<TSource> parent, IObserver<TSource> observer)
3233
: base(observer)
3334
{
3435
_gate = parent._gate ?? new object();
@@ -59,4 +60,57 @@ public override void OnCompleted()
5960
}
6061
}
6162
}
63+
64+
#if HAS_SYSTEM_THREADING_LOCK
65+
internal sealed class SynchronizeWithLock<TSource> : Producer<TSource, SynchronizeWithLock<TSource>._>
66+
{
67+
private readonly IObservable<TSource> _source;
68+
private readonly Lock _gate;
69+
70+
public SynchronizeWithLock(IObservable<TSource> source, Lock gate)
71+
{
72+
_source = source;
73+
_gate = gate;
74+
}
75+
76+
protected override _ CreateSink(IObserver<TSource> observer) => new(this, observer);
77+
78+
protected override void Run(_ sink) => sink.Run(_source);
79+
80+
internal sealed class _ : IdentitySink<TSource>
81+
{
82+
private readonly Lock _gate;
83+
84+
public _(SynchronizeWithLock<TSource> parent, IObserver<TSource> observer)
85+
: base(observer)
86+
{
87+
_gate = parent._gate;
88+
}
89+
90+
public override void OnNext(TSource value)
91+
{
92+
lock (_gate)
93+
{
94+
ForwardOnNext(value);
95+
}
96+
}
97+
98+
public override void OnError(Exception error)
99+
{
100+
lock (_gate)
101+
{
102+
ForwardOnError(error);
103+
}
104+
}
105+
106+
public override void OnCompleted()
107+
{
108+
lock (_gate)
109+
{
110+
ForwardOnCompleted();
111+
}
112+
}
113+
}
114+
}
115+
#endif
62116
}

Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT License.
3-
// See the LICENSE file in the project root for more information.
3+
// See the LICENSE file in the project root for more information.
44

55
using System.ComponentModel;
66
using System.Reactive.Disposables;
@@ -229,7 +229,7 @@ public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> sou
229229
throw new ArgumentNullException(nameof(source));
230230
}
231231

232-
return new Synchronize<TSource>(source);
232+
return new SynchronizeWithObject<TSource>(source);
233233
}
234234

235235
/// <summary>
@@ -252,9 +252,34 @@ public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> sou
252252
throw new ArgumentNullException(nameof(gate));
253253
}
254254

255-
return new Synchronize<TSource>(source, gate);
255+
return new SynchronizeWithObject<TSource>(source, gate);
256256
}
257257

258+
#if HAS_SYSTEM_THREADING_LOCK
259+
/// <summary>
260+
/// Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object.
261+
/// </summary>
262+
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
263+
/// <param name="source">Source sequence.</param>
264+
/// <param name="gate">Gate object to synchronize each observer call on.</param>
265+
/// <returns>The source sequence whose outgoing calls to observers are synchronized on the given gate object.</returns>
266+
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="gate"/> is <c>null</c>.</exception>
267+
public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source, Lock gate)
268+
{
269+
if (source == null)
270+
{
271+
throw new ArgumentNullException(nameof(source));
272+
}
273+
274+
if (gate == null)
275+
{
276+
throw new ArgumentNullException(nameof(gate));
277+
}
278+
279+
return new SynchronizeWithLock<TSource>(source, gate);
280+
}
281+
#endif
282+
258283
#endregion
259284
}
260285
}

Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT License.
3-
// See the LICENSE file in the project root for more information.
3+
// See the LICENSE file in the project root for more information.
44

55
using System.Collections.Generic;
66
using System.Diagnostics.CodeAnalysis;
@@ -369,6 +369,9 @@ internal partial interface IQueryLanguage
369369

370370
IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source);
371371
IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source, object gate);
372+
#if HAS_SYSTEM_THREADING_LOCK
373+
IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source, Lock gate);
374+
#endif
372375

373376
#endregion
374377

Rx.NET/Source/src/System.Reactive/Linq/Observable.Concurrency.cs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT License.
3-
// See the LICENSE file in the project root for more information.
3+
// See the LICENSE file in the project root for more information.
44

55
using System.Reactive.Concurrency;
66
using System.Threading;
@@ -175,6 +175,32 @@ public static IObservable<TSource> Synchronize<TSource>(this IObservable<TSource
175175
return s_impl.Synchronize(source, gate);
176176
}
177177

178+
#if HAS_SYSTEM_THREADING_LOCK
179+
/// <summary>
180+
/// Synchronizes the observable sequence such that observer notifications cannot be delivered concurrently, using the specified gate object.
181+
/// This overload is useful when writing n-ary query operators, in order to prevent concurrent callbacks from different sources by synchronizing on a common gate object.
182+
/// </summary>
183+
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
184+
/// <param name="source">Source sequence.</param>
185+
/// <param name="gate">Gate object to synchronize each observer call on.</param>
186+
/// <returns>The source sequence whose outgoing calls to observers are synchronized on the given gate object.</returns>
187+
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="gate"/> is null.</exception>
188+
public static IObservable<TSource> Synchronize<TSource>(this IObservable<TSource> source, Lock gate)
189+
{
190+
if (source == null)
191+
{
192+
throw new ArgumentNullException(nameof(source));
193+
}
194+
195+
if (gate == null)
196+
{
197+
throw new ArgumentNullException(nameof(gate));
198+
}
199+
200+
return s_impl.Synchronize(source, gate);
201+
}
202+
#endif
203+
178204
#endregion
179205
}
180206
}

Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Concurrency.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT License.
3-
// See the LICENSE file in the project root for more information.
3+
// See the LICENSE file in the project root for more information.
44

55
using System.Reactive.Concurrency;
66
using System.Threading;
@@ -49,6 +49,13 @@ public virtual IObservable<TSource> Synchronize<TSource>(IObservable<TSource> so
4949
return Synchronization.Synchronize(source, gate);
5050
}
5151

52+
#if HAS_SYSTEM_THREADING_LOCK
53+
public virtual IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source, Lock gate)
54+
{
55+
return Synchronization.Synchronize(source, gate);
56+
}
57+
#endif
58+
5259
#endregion
5360
}
5461
}

Rx.NET/Source/tests/Tests.System.Reactive/Tests/Concurrency/SynchronizationTest.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT License.
3-
// See the LICENSE file in the project root for more information.
3+
// See the LICENSE file in the project root for more information.
44

55
using System;
66
using System.Reactive.Concurrency;
@@ -37,7 +37,10 @@ public void Synchronization_Synchronize_ArgumentChecking()
3737
{
3838
ReactiveAssert.Throws<ArgumentNullException>(() => Synchronization.Synchronize(default(IObservable<int>)));
3939
ReactiveAssert.Throws<ArgumentNullException>(() => Synchronization.Synchronize(default(IObservable<int>), new object()));
40-
ReactiveAssert.Throws<ArgumentNullException>(() => Synchronization.Synchronize(DummyObservable<int>.Instance, null));
40+
ReactiveAssert.Throws<ArgumentNullException>(() => Synchronization.Synchronize(DummyObservable<int>.Instance, (null as object)!));
41+
#if HAS_SYSTEM_THREADING_LOCK
42+
ReactiveAssert.Throws<ArgumentNullException>(() => Synchronization.Synchronize(DummyObservable<int>.Instance, (null as Lock)!));
43+
#endif
4144
}
4245

4346
private class MySyncCtx : SynchronizationContext

0 commit comments

Comments
 (0)