AsyncSubject<T>
Represents the result of an asynchronous operation.
            The last value before the OnCompleted notification, or the error received through OnError, is sent to all subscribed observers.
            
                using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive.Subjects
{
    [System.Runtime.CompilerServices.NullableContext(1)]
    [System.Runtime.CompilerServices.Nullable(new byte[] {
        0,
        1
    })]
    public sealed class AsyncSubject<[System.Runtime.CompilerServices.Nullable(2)] T> : SubjectBase<T>, INotifyCompletion
    {
        [System.Runtime.CompilerServices.NullableContext(0)]
        private sealed class AsyncSubjectDisposable : IDisposable
        {
            [System.Runtime.CompilerServices.Nullable(1)]
            private AsyncSubject<T> _subject;
            [System.Runtime.CompilerServices.Nullable(new byte[] {
                2,
                1
            })]
            private volatile IObserver<T> _observer;
            [System.Runtime.CompilerServices.Nullable(new byte[] {
                2,
                1
            })]
            public IObserver<T> Observer {
                [return: System.Runtime.CompilerServices.Nullable(new byte[] {
                    2,
                    1
                })]
                get {
                    return _observer;
                }
            }
            [System.Runtime.CompilerServices.NullableContext(1)]
            public AsyncSubjectDisposable(AsyncSubject<T> subject, IObserver<T> observer)
            {
                _subject = subject;
                _observer = observer;
            }
            public void Dispose()
            {
                if (Interlocked.Exchange<IObserver<T>>(ref _observer, (IObserver<T>)null) != null) {
                    _subject.Unsubscribe(this);
                    _subject = null;
                }
            }
        }
        [System.Runtime.CompilerServices.Nullable(0)]
        private sealed class AwaitObserver : IObserver<T>
        {
            [System.Runtime.CompilerServices.Nullable(2)]
            private readonly SynchronizationContext _context;
            private readonly Action _callback;
            public AwaitObserver(Action callback)
            {
                _context = SynchronizationContext.Current;
                _callback = callback;
            }
            public void OnCompleted()
            {
                InvokeOnOriginalContext();
            }
            public void OnError(Exception error)
            {
                InvokeOnOriginalContext();
            }
            public void OnNext(T value)
            {
            }
            private void InvokeOnOriginalContext()
            {
                if (_context != null)
                    _context.Post(delegate(object c) {
                        ((Action)c)();
                    }, _callback);
                else
                    _callback();
            }
        }
        [System.Runtime.CompilerServices.Nullable(0)]
        private sealed class BlockingObserver : IObserver<T>
        {
            private readonly ManualResetEventSlim _e;
            public BlockingObserver(ManualResetEventSlim e)
            {
                _e = e;
            }
            public void OnCompleted()
            {
                Done();
            }
            public void OnError(Exception error)
            {
                Done();
            }
            public void OnNext(T value)
            {
            }
            private void Done()
            {
                _e.Set();
            }
        }
        [System.Runtime.CompilerServices.Nullable(new byte[] {
            1,
            1,
            0
        })]
        private AsyncSubjectDisposable[] _observers;
        [System.Runtime.CompilerServices.Nullable(2)]
        private T _value;
        private bool _hasValue;
        [System.Runtime.CompilerServices.Nullable(2)]
        private Exception _exception;
        [System.Runtime.CompilerServices.Nullable(new byte[] {
            1,
            1,
            0
        })]
        private static readonly AsyncSubjectDisposable[] Terminated = new AsyncSubjectDisposable[0];
        [System.Runtime.CompilerServices.Nullable(new byte[] {
            1,
            1,
            0
        })]
        private static readonly AsyncSubjectDisposable[] Disposed = new AsyncSubjectDisposable[0];
        public override bool HasObservers => Volatile.Read<AsyncSubjectDisposable[]>(ref _observers).Length != 0;
        public override bool IsDisposed => Volatile.Read<AsyncSubjectDisposable[]>(ref _observers) == Disposed;
        public bool IsCompleted => Volatile.Read<AsyncSubjectDisposable[]>(ref _observers) == Terminated;
        public AsyncSubject()
        {
            _observers = Array.Empty<AsyncSubjectDisposable>();
        }
        public override void OnCompleted()
        {
            while (true) {
                AsyncSubjectDisposable[] array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers);
                if (array == Disposed)
                    break;
                if (array == Terminated)
                    return;
                if (Interlocked.CompareExchange<AsyncSubjectDisposable[]>(ref _observers, Terminated, array) == array) {
                    if (_hasValue) {
                        T value = _value;
                        AsyncSubjectDisposable[] array2 = array;
                        for (int i = 0; i < array2.Length; i++) {
                            IObserver<T> observer = array2[i].Observer;
                            if (observer != null) {
                                observer.OnNext(value);
                                observer.OnCompleted();
                            }
                        }
                    } else {
                        AsyncSubjectDisposable[] array2 = array;
                        for (int i = 0; i < array2.Length; i++) {
                            array2[i].Observer?.OnCompleted();
                        }
                    }
                }
            }
            _exception = null;
            ThrowDisposed();
        }
        public override void OnError(Exception error)
        {
            if (error == null)
                throw new ArgumentNullException("error");
            while (true) {
                AsyncSubjectDisposable[] array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers);
                if (array == Disposed)
                    break;
                if (array == Terminated)
                    return;
                _exception = error;
                if (Interlocked.CompareExchange<AsyncSubjectDisposable[]>(ref _observers, Terminated, array) == array) {
                    AsyncSubjectDisposable[] array2 = array;
                    for (int i = 0; i < array2.Length; i++) {
                        array2[i].Observer?.OnError(error);
                    }
                }
            }
            _exception = null;
            _value = default(T);
            ThrowDisposed();
        }
        public override void OnNext(T value)
        {
            AsyncSubjectDisposable[] array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers);
            if (array == Disposed) {
                _value = default(T);
                _exception = null;
                ThrowDisposed();
            } else if (array != Terminated) {
                _value = value;
                _hasValue = true;
            }
        }
        public override IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null)
                throw new ArgumentNullException("observer");
            AsyncSubjectDisposable asyncSubjectDisposable = null;
            while (true) {
                AsyncSubjectDisposable[] array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers);
                if (array == Disposed) {
                    _value = default(T);
                    _exception = null;
                    ThrowDisposed();
                    break;
                }
                if (array == Terminated) {
                    Exception exception = _exception;
                    if (exception != null)
                        observer.OnError(exception);
                    else {
                        if (_hasValue)
                            observer.OnNext(_value);
                        observer.OnCompleted();
                    }
                    break;
                }
                if (asyncSubjectDisposable == null)
                    asyncSubjectDisposable = new AsyncSubjectDisposable(this, observer);
                int num = array.Length;
                AsyncSubjectDisposable[] array2 = new AsyncSubjectDisposable[num + 1];
                Array.Copy(array, 0, array2, 0, num);
                array2[num] = asyncSubjectDisposable;
                if (Interlocked.CompareExchange<AsyncSubjectDisposable[]>(ref _observers, array2, array) == array)
                    return asyncSubjectDisposable;
            }
            return Disposable.Empty;
        }
        private void Unsubscribe([System.Runtime.CompilerServices.Nullable(new byte[] {
            1,
            0
        })] AsyncSubjectDisposable observer)
        {
            AsyncSubjectDisposable[] array;
            AsyncSubjectDisposable[] array2;
            do {
                array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers);
                int num = array.Length;
                if (num == 0)
                    break;
                int num2 = Array.IndexOf<AsyncSubjectDisposable>(array, observer);
                if (num2 < 0)
                    break;
                if (num == 1)
                    array2 = Array.Empty<AsyncSubjectDisposable>();
                else {
                    array2 = new AsyncSubjectDisposable[num - 1];
                    Array.Copy(array, 0, array2, 0, num2);
                    Array.Copy(array, num2 + 1, array2, num2, num - num2 - 1);
                }
            } while (Interlocked.CompareExchange<AsyncSubjectDisposable[]>(ref _observers, array2, array) != array);
        }
        private static void ThrowDisposed()
        {
            throw new ObjectDisposedException(string.Empty);
        }
        public override void Dispose()
        {
            if (Interlocked.Exchange<AsyncSubjectDisposable[]>(ref _observers, Disposed) != Disposed) {
                _exception = null;
                _value = default(T);
                _hasValue = false;
            }
        }
        public AsyncSubject<T> GetAwaiter()
        {
            return this;
        }
        public void OnCompleted(Action continuation)
        {
            if (continuation == null)
                throw new ArgumentNullException("continuation");
            Subscribe(new AwaitObserver(continuation));
        }
        public T GetResult()
        {
            if (Volatile.Read<AsyncSubjectDisposable[]>(ref _observers) != Terminated) {
                using (ManualResetEventSlim manualResetEventSlim = new ManualResetEventSlim(false)) {
                    Subscribe(new BlockingObserver(manualResetEventSlim));
                    manualResetEventSlim.Wait();
                }
            }
            Exception exception = _exception;
            if (exception != null)
                exception.Throw();
            if (!_hasValue)
                throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
            return _value;
        }
    }
}