<PackageReference Include="System.Reactive" Version="4.3.2" />

AsyncSubject<T>

public sealed class AsyncSubject<T> : SubjectBase<T>, INotifyCompletion
Represents the result of an asynchronous operation. The last value before the OnCompleted notification, or the error received through OnError, is sent to all subscribed observers.
using System.Reactive.Disposables; using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive.Subjects { public sealed class AsyncSubject<T> : SubjectBase<T>, INotifyCompletion { private sealed class AsyncSubjectDisposable : IDisposable { internal readonly IObserver<T> Downstream; private AsyncSubject<T> _parent; public AsyncSubjectDisposable(AsyncSubject<T> parent, IObserver<T> downstream) { _parent = parent; Downstream = downstream; } public void Dispose() { Interlocked.Exchange<AsyncSubject<T>>(ref _parent, (AsyncSubject<T>)null)?.Remove(this); } internal bool IsDisposed() { return Volatile.Read<AsyncSubject<T>>(ref _parent) == null; } } private sealed class AwaitObserver : IObserver<T> { private readonly SynchronizationContext _context; private readonly Action _callback; public AwaitObserver(Action callback, bool originalContext) { if (originalContext) _context = SynchronizationContext.Current; _callback = callback; } public void OnCompleted() { InvokeOnOriginalContext(); } public void OnError(Exception error) { InvokeOnOriginalContext(); } public void OnNext(T value) { } private void InvokeOnOriginalContext() { if (_context != null) _context.Post(delegate(object c) { ((Action)c)(); }, _callback); else _callback(); } } private AsyncSubjectDisposable[] _observers; private T _value; private bool _hasValue; private Exception _exception; private static readonly AsyncSubjectDisposable[] Terminated = new AsyncSubjectDisposable[0]; private static readonly AsyncSubjectDisposable[] Disposed = new AsyncSubjectDisposable[0]; public override bool HasObservers => _observers.Length != 0; public override bool IsDisposed => Volatile.Read<AsyncSubjectDisposable[]>(ref _observers) == Disposed; public bool IsCompleted => Volatile.Read<AsyncSubjectDisposable[]>(ref _observers) == Terminated; public AsyncSubject() { _observers = Array.Empty<AsyncSubjectDisposable>(); } public override void OnCompleted() { while (true) { AsyncSubjectDisposable[] array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers); if (array == Disposed) break; if (array == Terminated) return; if (Interlocked.CompareExchange<AsyncSubjectDisposable[]>(ref _observers, Terminated, array) == array) { if (_hasValue) { T value = _value; AsyncSubjectDisposable[] array2 = array; foreach (AsyncSubjectDisposable asyncSubjectDisposable in array2) { if (!asyncSubjectDisposable.IsDisposed()) { asyncSubjectDisposable.Downstream.OnNext(value); asyncSubjectDisposable.Downstream.OnCompleted(); } } } else { AsyncSubjectDisposable[] array2 = array; foreach (AsyncSubjectDisposable asyncSubjectDisposable2 in array2) { if (!asyncSubjectDisposable2.IsDisposed()) asyncSubjectDisposable2.Downstream.OnCompleted(); } } } } _exception = null; ThrowDisposed(); } public override void OnError(Exception error) { if (error == null) throw new ArgumentNullException("error"); while (true) { AsyncSubjectDisposable[] array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers); if (array == Disposed) break; if (array == Terminated) return; _exception = error; if (Interlocked.CompareExchange<AsyncSubjectDisposable[]>(ref _observers, Terminated, array) == array) { AsyncSubjectDisposable[] array2 = array; foreach (AsyncSubjectDisposable asyncSubjectDisposable in array2) { if (!asyncSubjectDisposable.IsDisposed()) asyncSubjectDisposable.Downstream.OnError(error); } } } _exception = null; _value = default(T); ThrowDisposed(); } public override void OnNext(T value) { AsyncSubjectDisposable[] array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers); if (array == Disposed) { _value = default(T); _exception = null; ThrowDisposed(); } else if (array != Terminated) { _value = value; _hasValue = true; } } public override IDisposable Subscribe(IObserver<T> observer) { if (observer == null) throw new ArgumentNullException("observer"); AsyncSubjectDisposable asyncSubjectDisposable = new AsyncSubjectDisposable(this, observer); if (!Add(asyncSubjectDisposable)) { Exception exception = _exception; if (exception != null) observer.OnError(exception); else { if (_hasValue) observer.OnNext(_value); observer.OnCompleted(); } return Disposable.Empty; } return asyncSubjectDisposable; } private bool Add(AsyncSubjectDisposable inner) { AsyncSubjectDisposable[] array; AsyncSubjectDisposable[] array2; do { array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers); if (array == Disposed) { _value = default(T); _exception = null; ThrowDisposed(); return true; } if (array == Terminated) return false; int num = array.Length; array2 = new AsyncSubjectDisposable[num + 1]; Array.Copy(array, 0, array2, 0, num); array2[num] = inner; } while (Interlocked.CompareExchange<AsyncSubjectDisposable[]>(ref _observers, array2, array) != array); return true; } private void Remove(AsyncSubjectDisposable inner) { AsyncSubjectDisposable[] array; AsyncSubjectDisposable[] array2; do { array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers); int num = array.Length; if (num == 0) break; int num2 = -1; for (int i = 0; i < num; i++) { if (array[i] == inner) { num2 = i; break; } } if (num2 < 0) break; if (num == 1) array2 = Array.Empty<AsyncSubjectDisposable>(); else { array2 = new AsyncSubjectDisposable[num - 1]; Array.Copy(array, 0, array2, 0, num2); Array.Copy(array, num2 + 1, array2, num2, num - num2 - 1); } } while (Interlocked.CompareExchange<AsyncSubjectDisposable[]>(ref _observers, array2, array) != array); } private void ThrowDisposed() { throw new ObjectDisposedException(string.Empty); } public override void Dispose() { if (Interlocked.Exchange<AsyncSubjectDisposable[]>(ref _observers, Disposed) != Disposed) { _exception = null; _value = default(T); _hasValue = false; } } public AsyncSubject<T> GetAwaiter() { return this; } public void OnCompleted(Action continuation) { if (continuation == null) throw new ArgumentNullException("continuation"); OnCompleted(continuation, true); } private void OnCompleted(Action continuation, bool originalContext) { Subscribe(new AwaitObserver(continuation, originalContext)); } public T GetResult() { if (Volatile.Read<AsyncSubjectDisposable[]>(ref _observers) != Terminated) { ManualResetEvent e = new ManualResetEvent(false); OnCompleted(delegate { e.Set(); }, false); e.WaitOne(); } _exception.ThrowIfNotNull(); if (!_hasValue) throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS); return _value; } } }