ObserveOnObserverLongRunning<TSource>
Signals events on a ISchedulerLongRunning by blocking the emission thread while waiting
            for them from the upstream.
            
                using System.Collections.Concurrent;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive
{
    [System.Runtime.CompilerServices.NullableContext(1)]
    [System.Runtime.CompilerServices.Nullable(new byte[] {
        0,
        1
    })]
    internal sealed class ObserveOnObserverLongRunning<[System.Runtime.CompilerServices.Nullable(2)] TSource> : IdentitySink<TSource>
    {
        private readonly ISchedulerLongRunning _scheduler;
        private readonly ConcurrentQueue<TSource> _queue;
        private readonly object _suspendGuard;
        private long _wip;
        private bool _done;
        [System.Runtime.CompilerServices.Nullable(2)]
        private Exception _error;
        private bool _disposed;
        private int _runDrainOnce;
        private SingleAssignmentDisposableValue _drainTask;
        private static readonly Action<ObserveOnObserverLongRunning<TSource>, ICancelable> DrainLongRunning = delegate(ObserveOnObserverLongRunning<TSource> self, ICancelable cancelable) {
            self.Drain();
        };
        public ObserveOnObserverLongRunning(ISchedulerLongRunning scheduler, IObserver<TSource> observer)
            : base(observer)
        {
            _scheduler = scheduler;
            _queue = new ConcurrentQueue<TSource>();
            _suspendGuard = new object();
        }
        public override void OnCompleted()
        {
            Volatile.Write(ref _done, true);
            Schedule();
        }
        public override void OnError(Exception error)
        {
            _error = error;
            Volatile.Write(ref _done, true);
            Schedule();
        }
        public override void OnNext(TSource value)
        {
            _queue.Enqueue(value);
            Schedule();
        }
        private void Schedule()
        {
            if (Volatile.Read(ref _runDrainOnce) == 0 && Interlocked.CompareExchange(ref _runDrainOnce, 1, 0) == 0)
                _drainTask.Disposable = _scheduler.ScheduleLongRunning<ObserveOnObserverLongRunning<TSource>>(this, DrainLongRunning);
            if (Interlocked.Increment(ref _wip) == 1) {
                lock (_suspendGuard) {
                    Monitor.Pulse(_suspendGuard);
                }
            }
        }
        protected override void Dispose(bool disposing)
        {
            Volatile.Write(ref _disposed, true);
            lock (_suspendGuard) {
                Monitor.Pulse(_suspendGuard);
            }
            _drainTask.Dispose();
            base.Dispose(disposing);
        }
        private void Drain()
        {
            ConcurrentQueue<TSource> queue = _queue;
            while (true) {
                if (Volatile.Read(ref _disposed)) {
                    TSource result;
                    while (queue.TryDequeue(out result)) {
                    }
                    return;
                }
                bool num = Volatile.Read(ref _done);
                TSource result2;
                bool flag = queue.TryDequeue(out result2);
                if (num && !flag)
                    break;
                if (flag) {
                    ForwardOnNext(result2);
                    if (Interlocked.Decrement(ref _wip) != 0)
                        continue;
                }
                if (Volatile.Read(ref _wip) == 0 && !Volatile.Read(ref _disposed)) {
                    object suspendGuard = _suspendGuard;
                    if (Monitor.TryEnter(suspendGuard)) {
                        if (Volatile.Read(ref _wip) == 0 && !Volatile.Read(ref _disposed))
                            Monitor.Wait(suspendGuard);
                        Monitor.Exit(suspendGuard);
                    }
                }
            }
            Exception error = _error;
            if (error != null)
                ForwardOnError(error);
            else
                ForwardOnCompleted();
        }
    }
}