ObserveOnObserverLongRunning<TSource>
Signals events on a ISchedulerLongRunning by blocking the emission thread while waiting
for them from the upstream.
using System.Collections.Concurrent;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
internal sealed class ObserveOnObserverLongRunning<[System.Runtime.CompilerServices.Nullable(2)] TSource> : IdentitySink<TSource>
{
private readonly ISchedulerLongRunning _scheduler;
private readonly ConcurrentQueue<TSource> _queue;
private readonly object _suspendGuard;
private long _wip;
private bool _done;
[System.Runtime.CompilerServices.Nullable(2)]
private Exception _error;
private bool _disposed;
private int _runDrainOnce;
private SingleAssignmentDisposableValue _drainTask;
private static readonly Action<ObserveOnObserverLongRunning<TSource>, ICancelable> DrainLongRunning = delegate(ObserveOnObserverLongRunning<TSource> self, ICancelable cancelable) {
self.Drain();
};
public ObserveOnObserverLongRunning(ISchedulerLongRunning scheduler, IObserver<TSource> observer)
: base(observer)
{
_scheduler = scheduler;
_queue = new ConcurrentQueue<TSource>();
_suspendGuard = new object();
}
public override void OnCompleted()
{
Volatile.Write(ref _done, true);
Schedule();
}
public override void OnError(Exception error)
{
_error = error;
Volatile.Write(ref _done, true);
Schedule();
}
public override void OnNext(TSource value)
{
_queue.Enqueue(value);
Schedule();
}
private void Schedule()
{
if (Volatile.Read(ref _runDrainOnce) == 0 && Interlocked.CompareExchange(ref _runDrainOnce, 1, 0) == 0)
_drainTask.Disposable = _scheduler.ScheduleLongRunning<ObserveOnObserverLongRunning<TSource>>(this, DrainLongRunning);
if (Interlocked.Increment(ref _wip) == 1) {
lock (_suspendGuard) {
Monitor.Pulse(_suspendGuard);
}
}
}
protected override void Dispose(bool disposing)
{
Volatile.Write(ref _disposed, true);
lock (_suspendGuard) {
Monitor.Pulse(_suspendGuard);
}
_drainTask.Dispose();
base.Dispose(disposing);
}
private void Drain()
{
ConcurrentQueue<TSource> queue = _queue;
while (true) {
if (Volatile.Read(ref _disposed)) {
TSource result;
while (queue.TryDequeue(out result)) {
}
return;
}
bool num = Volatile.Read(ref _done);
TSource result2;
bool flag = queue.TryDequeue(out result2);
if (num && !flag)
break;
if (flag) {
ForwardOnNext(result2);
if (Interlocked.Decrement(ref _wip) != 0)
continue;
}
if (Volatile.Read(ref _wip) == 0 && !Volatile.Read(ref _disposed)) {
object suspendGuard = _suspendGuard;
if (Monitor.TryEnter(suspendGuard)) {
if (Volatile.Read(ref _wip) == 0 && !Volatile.Read(ref _disposed))
Monitor.Wait(suspendGuard);
Monitor.Exit(suspendGuard);
}
}
}
Exception error = _error;
if (error != null)
ForwardOnError(error);
else
ForwardOnCompleted();
}
}
}