ScheduledObserver<T>
using System.Collections.Concurrent;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive
{
    [System.Runtime.CompilerServices.NullableContext(1)]
    [System.Runtime.CompilerServices.Nullable(new byte[] {
        0,
        1
    })]
    internal class ScheduledObserver<[System.Runtime.CompilerServices.Nullable(2)] T> : ObserverBase<T>, IScheduledObserver<T>, IObserver<T>, IDisposable
    {
        [System.Runtime.CompilerServices.NullableContext(0)]
        private sealed class SemaphoreSlimRelease : IDisposable
        {
            [System.Runtime.CompilerServices.Nullable(2)]
            private volatile SemaphoreSlim _dispatcherEvent;
            [System.Runtime.CompilerServices.NullableContext(1)]
            public SemaphoreSlimRelease(SemaphoreSlim dispatcherEvent)
            {
                _dispatcherEvent = dispatcherEvent;
            }
            public void Dispose()
            {
                Interlocked.Exchange<SemaphoreSlim>(ref _dispatcherEvent, (SemaphoreSlim)null)?.Release();
            }
        }
        private int _state;
        private const int Stopped = 0;
        private const int Running = 1;
        private const int Pending = 2;
        private const int Faulted = 9;
        private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
        private bool _failed;
        [System.Runtime.CompilerServices.Nullable(2)]
        private Exception _error;
        private bool _completed;
        private readonly IObserver<T> _observer;
        private readonly IScheduler _scheduler;
        [System.Runtime.CompilerServices.Nullable(2)]
        private readonly ISchedulerLongRunning _longRunning;
        private SerialDisposableValue _disposable;
        private readonly object _dispatcherInitGate = new object();
        [System.Runtime.CompilerServices.Nullable(2)]
        private readonly SemaphoreSlim _dispatcherEvent;
        [System.Runtime.CompilerServices.Nullable(2)]
        private readonly IDisposable _dispatcherEventRelease;
        [System.Runtime.CompilerServices.Nullable(2)]
        private IDisposable _dispatcherJob;
        public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
        {
            _scheduler = scheduler;
            _observer = observer;
            _longRunning = _scheduler.AsLongRunning();
            if (_longRunning != null) {
                _dispatcherEvent = new SemaphoreSlim(0);
                _dispatcherEventRelease = new SemaphoreSlimRelease(_dispatcherEvent);
            }
        }
        private void EnsureDispatcher()
        {
            if (_dispatcherJob == null) {
                lock (_dispatcherInitGate) {
                    if (_dispatcherJob == null) {
                        _dispatcherJob = _longRunning.ScheduleLongRunning(Dispatch);
                        _disposable.Disposable = StableCompositeDisposable.Create(_dispatcherJob, _dispatcherEventRelease);
                    }
                }
            }
        }
        private void Dispatch(ICancelable cancel)
        {
            do {
                _dispatcherEvent.Wait();
                if (cancel.IsDisposed)
                    return;
                T result;
                while (_queue.TryDequeue(out result)) {
                    try {
                        _observer.OnNext(result);
                    } catch {
                        T result2;
                        while (_queue.TryDequeue(out result2)) {
                        }
                        throw;
                    }
                    _dispatcherEvent.Wait();
                    if (cancel.IsDisposed)
                        return;
                }
                if (_failed) {
                    _observer.OnError(_error);
                    Dispose();
                    return;
                }
            } while (!_completed);
            _observer.OnCompleted();
            Dispose();
        }
        public void EnsureActive()
        {
            EnsureActive(1);
        }
        public void EnsureActive(int n)
        {
            if (_longRunning != null) {
                if (n > 0)
                    _dispatcherEvent.Release(n);
                EnsureDispatcher();
            } else
                EnsureActiveSlow();
        }
        private void EnsureActiveSlow()
        {
            bool flag = false;
            while (true) {
                switch (Interlocked.CompareExchange(ref _state, 1, 0)) {
                case 9:
                    return;
                case 0:
                    flag = true;
                    goto case 2;
                case 1:
                    if (Interlocked.CompareExchange(ref _state, 2, 1) != 1)
                        break;
                    goto case 2;
                case 2:
                    if (flag)
                        _disposable.Disposable = Scheduler.Schedule<object>(_scheduler, (object)null, (Action<object, Action<object>>)Run);
                    return;
                }
            }
        }
        [System.Runtime.CompilerServices.NullableContext(2)]
        private void Run(object state, [System.Runtime.CompilerServices.Nullable(new byte[] {
            1,
            2
        })] Action<object> recurse)
        {
            T result;
            while (!_queue.TryDequeue(out result)) {
                if (_failed) {
                    if (_queue.IsEmpty) {
                        Interlocked.Exchange(ref _state, 0);
                        _observer.OnError(_error);
                        Dispose();
                        return;
                    }
                } else if (_completed) {
                    if (_queue.IsEmpty) {
                        Interlocked.Exchange(ref _state, 0);
                        _observer.OnCompleted();
                        Dispose();
                        return;
                    }
                } else {
                    int num = Interlocked.CompareExchange(ref _state, 0, 1);
                    if (num == 1 || num == 9)
                        return;
                    _state = 1;
                }
            }
            Interlocked.Exchange(ref _state, 1);
            try {
                _observer.OnNext(result);
            } catch {
                Interlocked.Exchange(ref _state, 9);
                T result2;
                while (_queue.TryDequeue(out result2)) {
                }
                throw;
            }
            recurse(state);
        }
        protected override void OnNextCore(T value)
        {
            _queue.Enqueue(value);
        }
        protected override void OnErrorCore(Exception exception)
        {
            _error = exception;
            _failed = true;
        }
        protected override void OnCompletedCore()
        {
            _completed = true;
        }
        protected override void Dispose(bool disposing)
        {
            base.Dispose(disposing);
            if (disposing)
                _disposable.Dispose();
        }
    }
}