FastImmediateObserver<T>
Specialized scheduled observer similar to a scheduled observer for the immediate scheduler.
            
                using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive.Subjects
{
    [System.Runtime.CompilerServices.NullableContext(1)]
    [System.Runtime.CompilerServices.Nullable(0)]
    internal sealed class FastImmediateObserver<[System.Runtime.CompilerServices.Nullable(2)] T> : IScheduledObserver<T>, IObserver<T>, IDisposable
    {
        private readonly object _gate = new object();
        private volatile IObserver<T> _observer;
        private Queue<T> _queue = new Queue<T>();
        [System.Runtime.CompilerServices.Nullable(new byte[] {
            2,
            1
        })]
        private Queue<T> _queue2;
        [System.Runtime.CompilerServices.Nullable(2)]
        private Exception _error;
        private bool _done;
        private bool _busy;
        private bool _hasFaulted;
        public FastImmediateObserver(IObserver<T> observer)
        {
            _observer = observer;
        }
        public void Dispose()
        {
            Done();
        }
        public void EnsureActive()
        {
            EnsureActive(1);
        }
        public void EnsureActive(int count)
        {
            bool flag = false;
            lock (_gate) {
                if (!_hasFaulted && !_busy) {
                    flag = true;
                    _busy = true;
                }
            }
            if (flag) {
                while (true) {
                    Queue<T> queue = null;
                    Exception ex = null;
                    bool flag2 = false;
                    lock (_gate) {
                        if (_queue.Count > 0) {
                            if (_queue2 == null)
                                _queue2 = new Queue<T>();
                            queue = _queue;
                            _queue = _queue2;
                            _queue2 = null;
                        }
                        if (_error != null)
                            ex = _error;
                        else if (_done) {
                            flag2 = true;
                        } else if (queue == null) {
                            _busy = false;
                            return;
                        }
                    }
                    try {
                        if (queue != null) {
                            while (queue.Count > 0) {
                                _observer.OnNext(queue.Dequeue());
                            }
                            lock (_gate) {
                                _queue2 = queue;
                            }
                        }
                        if (ex != null) {
                            Done().OnError(ex);
                            return;
                        }
                        if (flag2) {
                            Done().OnCompleted();
                            return;
                        }
                    } catch {
                        lock (_gate) {
                            _hasFaulted = true;
                            _queue.Clear();
                        }
                        throw;
                    }
                }
            }
        }
        public void OnCompleted()
        {
            lock (_gate) {
                if (!_hasFaulted)
                    _done = true;
            }
        }
        public void OnError(Exception error)
        {
            lock (_gate) {
                if (!_hasFaulted)
                    _error = error;
            }
        }
        public void OnNext(T value)
        {
            lock (_gate) {
                if (!_hasFaulted)
                    _queue.Enqueue(value);
            }
        }
        private IObserver<T> Done()
        {
            return Interlocked.Exchange<IObserver<T>>(ref _observer, NopObserver<T>.Instance);
        }
    }
}