<PackageReference Include="System.Reactive" Version="6.0.2" />

ObserveOnObserverLongRunning<TSource>

sealed class ObserveOnObserverLongRunning<TSource> : IdentitySink<TSource>
Signals events on a ISchedulerLongRunning by blocking the emission thread while waiting for them from the upstream.
using System.Collections.Concurrent; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] internal sealed class ObserveOnObserverLongRunning<[System.Runtime.CompilerServices.Nullable(2)] TSource> : IdentitySink<TSource> { private readonly ISchedulerLongRunning _scheduler; private readonly ConcurrentQueue<TSource> _queue; private readonly object _suspendGuard; private long _wip; private bool _done; [System.Runtime.CompilerServices.Nullable(2)] private Exception _error; private bool _disposed; private int _runDrainOnce; private SingleAssignmentDisposableValue _drainTask; private static readonly Action<ObserveOnObserverLongRunning<TSource>, ICancelable> DrainLongRunning = delegate(ObserveOnObserverLongRunning<TSource> self, ICancelable cancelable) { self.Drain(); }; public ObserveOnObserverLongRunning(ISchedulerLongRunning scheduler, IObserver<TSource> observer) : base(observer) { _scheduler = scheduler; _queue = new ConcurrentQueue<TSource>(); _suspendGuard = new object(); } public override void OnCompleted() { Volatile.Write(ref _done, true); Schedule(); } public override void OnError(Exception error) { _error = error; Volatile.Write(ref _done, true); Schedule(); } public override void OnNext(TSource value) { _queue.Enqueue(value); Schedule(); } private void Schedule() { if (Volatile.Read(ref _runDrainOnce) == 0 && Interlocked.CompareExchange(ref _runDrainOnce, 1, 0) == 0) _drainTask.Disposable = _scheduler.ScheduleLongRunning<ObserveOnObserverLongRunning<TSource>>(this, DrainLongRunning); if (Interlocked.Increment(ref _wip) == 1) { lock (_suspendGuard) { Monitor.Pulse(_suspendGuard); } } } protected override void Dispose(bool disposing) { Volatile.Write(ref _disposed, true); lock (_suspendGuard) { Monitor.Pulse(_suspendGuard); } _drainTask.Dispose(); base.Dispose(disposing); } private void Drain() { ConcurrentQueue<TSource> queue = _queue; while (true) { if (Volatile.Read(ref _disposed)) { TSource result; while (queue.TryDequeue(out result)) { } return; } bool num = Volatile.Read(ref _done); TSource result2; bool flag = queue.TryDequeue(out result2); if (num && !flag) break; if (flag) { ForwardOnNext(result2); if (Interlocked.Decrement(ref _wip) != 0) continue; } if (Volatile.Read(ref _wip) == 0 && !Volatile.Read(ref _disposed)) { object suspendGuard = _suspendGuard; if (Monitor.TryEnter(suspendGuard)) { if (Volatile.Read(ref _wip) == 0 && !Volatile.Read(ref _disposed)) Monitor.Wait(suspendGuard); Monitor.Exit(suspendGuard); } } } Exception error = _error; if (error != null) ForwardOnError(error); else ForwardOnCompleted(); } } }