Skip to content

A focused collection of high–value Reactive Extensions (Rx) operators that do not ship with System.Reactive but are commonly needed when building reactive .NET applications.

License

Notifications You must be signed in to change notification settings

reactiveui/Extensions

Repository files navigation

Build codecov

ReactiveUI.Extensions

A focused collection of high–value Reactive Extensions (Rx) operators that do not ship with System.Reactive but are commonly needed when building reactive .NET applications.

The goal of this library is to:

  • Reduce boilerplate for frequent reactive patterns (timers, buffering, throttling, heartbeats, etc.)
  • Provide pragmatic, allocation?aware helpers for performance sensitive scenarios
  • Avoid additional dependencies – only System.Reactive is required

Supported Target Frameworks: .NET Standard 2.0, .NET 8, .NET 9, .NET 10.


Table of Contents

  1. Installation
  2. Quick Start
  3. API Catalog
  4. Operator Categories & Examples
  5. Performance Notes
  6. Thread Safety
  7. License

Installation

# Package coming soon (example)
dotnet add package ReactiveUI.Extensions

Reference the project directly while developing locally.


Quick Start

using System;
using System.Reactive.Linq;
using ReactiveUI.Extensions;

var source = Observable.Interval(TimeSpan.FromMilliseconds(120))
                       .Take(10)
                       .Select(i => (long?) (i % 3 == 0 ? null : i));

// 1. Filter nulls + convert to a Unit signal.
var signal = source.WhereIsNotNull().AsSignal();

// 2. Add a heartbeat if the upstream goes quiet for 500ms.
var withHeartbeat = source.WhereIsNotNull()
                          .Heartbeat(TimeSpan.FromMilliseconds(500), Scheduler.Default);

// 3. Retry with exponential backoff up to 5 times.
var resilient = Observable.Defer(() =>
        Observable.Throw<long>(new InvalidOperationException("Boom")))
    .RetryWithBackoff(maxRetries: 5, initialDelay: TimeSpan.FromMilliseconds(100));

// 4. Conflate bursty updates.
var conflated = source.Conflate(TimeSpan.FromMilliseconds(300), Scheduler.Default);

using (conflated.Subscribe(Console.WriteLine))
{
    Console.ReadLine();
}

API Catalog

Below is the full list of extension methods (grouped logically).
Some overloads omitted for brevity.

Category Operators
Null & Signal WhereIsNotNull, AsSignal
Timing & Scheduling SyncTimer, Schedule (overloads), ScheduleSafe, ThrottleFirst, DebounceImmediate
Inactivity / Liveness Heartbeat, DetectStale, BufferUntilInactive
Error Handling CatchIgnore, CatchAndReturn, OnErrorRetry (overloads), RetryWithBackoff
Combining & Aggregation CombineLatestValuesAreAllTrue, CombineLatestValuesAreAllFalse, GetMax, GetMin, Partition
Logical / Boolean Not, WhereTrue, WhereFalse
Async / Task SelectAsyncSequential, SelectLatestAsync, SelectAsyncConcurrent, SubscribeAsync (overloads), SynchronizeSynchronous, SynchronizeAsync, SubscribeSynchronous (overloads)
Backpressure Conflate
Filtering / Conditional Filter (Regex), TakeUntil (predicate), WaitUntil
Buffering BufferUntil, BufferUntilInactive
Transformation & Utility Shuffle, ForEach, FromArray, Using, While, Start, OnNext (params helper), DoOnSubscribe, DoOnDispose

Operator Categories & Examples

Null / Signal Helpers

IObservable<string?> raw = GetPossiblyNullStream();
IObservable<string> cleaned = raw.WhereIsNotNull();
IObservable<Unit> signal = cleaned.AsSignal();

Timing, Scheduling & Flow Control

// Shared timer for a given period (one underlying timer per distinct TimeSpan)
var sharedTimer = ReactiveExtensions.SyncTimer(TimeSpan.FromSeconds(1));

// Delay emission of a single value
42.Schedule(TimeSpan.FromMilliseconds(250), Scheduler.Default)
  .Subscribe(v => Console.WriteLine($"Delayed: {v}"));

// Safe scheduling when a scheduler may be null
IScheduler? maybeScheduler = null;
maybeScheduler.ScheduleSafe(() => Console.WriteLine("Ran inline"));

// ThrottleFirst: allow first item per window, ignore rest
var throttled = Observable.Interval(TimeSpan.FromMilliseconds(50))
                          .ThrottleFirst(TimeSpan.FromMilliseconds(200));

// DebounceImmediate: emit first immediately then debounce rest
var debounced = Observable.Interval(TimeSpan.FromMilliseconds(40))
                          .DebounceImmediate(TimeSpan.FromMilliseconds(250));

Inactivity / Liveness

// Heartbeat emits IHeartbeat<T> where IsHeartbeat == true during quiet periods
var heartbeats = Observable.Interval(TimeSpan.FromMilliseconds(400))
                           .Take(5)
                           .Heartbeat(TimeSpan.FromMilliseconds(300), Scheduler.Default);

// DetectStale emits IStale<T>: one stale marker after inactivity, or fresh update wrappers
var staleAware = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(500))
                           .Take(3)
                           .DetectStale(TimeSpan.FromMilliseconds(300), Scheduler.Default);

// BufferUntilInactive groups events separated by inactivity
var bursts = Observable.Interval(TimeSpan.FromMilliseconds(60)).Take(20);
var groups = bursts.BufferUntilInactive(TimeSpan.FromMilliseconds(200));

Error Handling & Resilience

var flaky = Observable.Create<int>(o =>
{
    o.OnNext(1);
    o.OnError(new InvalidOperationException("Fail"));
    return () => { };
});

// Ignore all errors and complete silently
a flakySafe = flaky.CatchIgnore();

// Replace error with a fallback value
var withFallback = flaky.CatchAndReturn(-1);

// Retry only specific exception type with logging
var retried = flaky.OnErrorRetry<int, InvalidOperationException>(ex => Console.WriteLine(ex.Message), retryCount: 3);

// Retry with exponential backoff
var backoff = flaky.RetryWithBackoff(maxRetries: 5, initialDelay: TimeSpan.FromMilliseconds(100));

Combining, Partitioning & Logical Helpers

var a = Observable.Interval(TimeSpan.FromMilliseconds(150)).Select(i => i % 2 == 0);
var b = Observable.Interval(TimeSpan.FromMilliseconds(170)).Select(i => i % 3 == 0);

var allTrue = new[] { a, b }.CombineLatestValuesAreAllTrue();
var allFalse = new[] { a, b }.CombineLatestValuesAreAllFalse();

var numbers = Observable.Range(1, 10);
var (even, odd) = numbers.Partition(n => n % 2 == 0); // Partition stream

var toggles = a.Not(); // Negate booleans

Async / Task Integration

IObservable<int> inputs = Observable.Range(1, 5);

// Sequential (preserves order)
var seq = inputs.SelectAsyncSequential(async i => { await Task.Delay(50); return i * 2; });

// Latest only (cancels previous)
var latest = inputs.SelectLatestAsync(async i => { await Task.Delay(100); return i; });

// Limited parallelism
var concurrent = inputs.SelectAsyncConcurrent(async i => { await Task.Delay(100); return i; }, maxConcurrency: 2);

// Asynchronous subscription (serializing tasks)
inputs.SubscribeAsync(async i => await Task.Delay(10));

// Synchronous gate: ensures per-item async completion before next is emitted
a inputs.SubscribeSynchronous(async i => await Task.Delay(25));

Backpressure / Conflation

// Conflate: enforce minimum spacing between emissions while always outputting the most recent value
a var noisy = Observable.Interval(TimeSpan.FromMilliseconds(20)).Take(30);
var conflated = noisy.Conflate(TimeSpan.FromMilliseconds(200), Scheduler.Default);

Selective & Conditional Emission

// TakeUntil predicate (inclusive)
var untilFive = Observable.Range(1, 100).TakeUntil(x => x == 5);

// WaitUntil first match then complete
var firstEven = Observable.Range(1, 10).WaitUntil(x => x % 2 == 0);

Buffering & Transformation

// BufferUntil - collect chars between delimiters
var chars = "<a><bc><d>".ToCharArray().ToObservable();
var frames = chars.BufferUntil('<', '>'); // emits "<a>", "<bc>", "<d>"

// Shuffle arrays in-place
var arrays = Observable.Return(new[] { 1, 2, 3, 4, 5 });
var shuffled = arrays.Shuffle();

Subscription & Side Effects

var stream = Observable.Range(1, 3)
    .DoOnSubscribe(() => Console.WriteLine("Subscribed"))
    .DoOnDispose(() => Console.WriteLine("Disposed"));

using (stream.Subscribe(Console.WriteLine))
{
    // auto dispose at using end
}

Utility & Miscellaneous

// Emit list contents quickly with low allocations
var listSource = Observable.Return<IEnumerable<int>>(new List<int> { 1, 2, 3 });
listSource.ForEach().Subscribe(Console.WriteLine);

// Using helper for deterministic disposal
var value = new MemoryStream().Using(ms => ms.Length);

// While loop (reactive)
var counter = 0;
ReactiveExtensions.While(() => counter++ < 3, () => Console.WriteLine(counter))
                   .Subscribe();

// Batch push with OnNext params
var subj = new Subject<int>();
subj.OnNext(1, 2, 3, 4);

Performance Notes

  • FastForEach path avoids iterator allocations for List<T>, IList<T>, and arrays.
  • SyncTimer ensures only one shared timer per period reducing timer overhead.
  • Conflate helps tame high–frequency producers without dropping the final value of a burst.
  • Heartbeat and DetectStale use lightweight scheduling primitives.
  • Most operators avoid capturing lambdas in hot loops where practical.

Thread Safety

  • All operators are pure functional transformations unless documented otherwise.
  • SyncTimer uses a ConcurrentDictionary and returns a hot IConnectableObservable that connects once per unique TimeSpan.
  • Methods returning shared observables (SyncTimer, Partition result sequences) are safe for multi-subscriber usage unless the upstream is inherently side-effecting.

License

MIT – see LICENSE file.


Contributing

Issues / PRs welcome. Please keep additions dependency–free and focused on broadly useful reactive patterns.


Change Log (Excerpt)

(Keep this section updated as the library evolves.)

  • Added async task projection helpers (SelectAsyncSequential, SelectLatestAsync, SelectAsyncConcurrent).
  • Added liveness operators (Heartbeat, DetectStale, BufferUntilInactive).
  • Added resilience (RetryWithBackoff, expanded OnErrorRetry overloads).
  • Added flow control (Conflate, ThrottleFirst, DebounceImmediate).
  • Removed DisposeWith extension use System.Reactive.Disposables.Fluent from System.Reactive.

Happy reactive coding!

About

A focused collection of high–value Reactive Extensions (Rx) operators that do not ship with System.Reactive but are commonly needed when building reactive .NET applications.

Topics

Resources

License

Stars

Watchers

Forks

Sponsor this project

 

Contributors 6

Languages