<PackageReference Include="System.Reactive" Version="6.0.1" />

AsyncSubject<T>

public sealed class AsyncSubject<T> : SubjectBase<T>, INotifyCompletion
Represents the result of an asynchronous operation. The last value before the OnCompleted notification, or the error received through OnError, is sent to all subscribed observers.
using System.Reactive.Disposables; using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive.Subjects { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] public sealed class AsyncSubject<[System.Runtime.CompilerServices.Nullable(2)] T> : SubjectBase<T>, INotifyCompletion { [System.Runtime.CompilerServices.NullableContext(0)] private sealed class AsyncSubjectDisposable : IDisposable { [System.Runtime.CompilerServices.Nullable(1)] private AsyncSubject<T> _subject; [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 1 })] private volatile IObserver<T> _observer; [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 1 })] public IObserver<T> Observer { [return: System.Runtime.CompilerServices.Nullable(new byte[] { 2, 1 })] get { return _observer; } } [System.Runtime.CompilerServices.NullableContext(1)] public AsyncSubjectDisposable(AsyncSubject<T> subject, IObserver<T> observer) { _subject = subject; _observer = observer; } public void Dispose() { if (Interlocked.Exchange<IObserver<T>>(ref _observer, (IObserver<T>)null) != null) { _subject.Unsubscribe(this); _subject = null; } } } [System.Runtime.CompilerServices.Nullable(0)] private sealed class AwaitObserver : IObserver<T> { [System.Runtime.CompilerServices.Nullable(2)] private readonly SynchronizationContext _context; private readonly Action _callback; public AwaitObserver(Action callback) { _context = SynchronizationContext.Current; _callback = callback; } public void OnCompleted() { InvokeOnOriginalContext(); } public void OnError(Exception error) { InvokeOnOriginalContext(); } public void OnNext(T value) { } private void InvokeOnOriginalContext() { if (_context != null) _context.Post(delegate(object c) { ((Action)c)(); }, _callback); else _callback(); } } [System.Runtime.CompilerServices.Nullable(0)] private sealed class BlockingObserver : IObserver<T> { private readonly ManualResetEventSlim _e; public BlockingObserver(ManualResetEventSlim e) { _e = e; } public void OnCompleted() { Done(); } public void OnError(Exception error) { Done(); } public void OnNext(T value) { } private void Done() { _e.Set(); } } [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 1, 0 })] private AsyncSubjectDisposable[] _observers; [System.Runtime.CompilerServices.Nullable(2)] private T _value; private bool _hasValue; [System.Runtime.CompilerServices.Nullable(2)] private Exception _exception; [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 1, 0 })] private static readonly AsyncSubjectDisposable[] Terminated = new AsyncSubjectDisposable[0]; [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 1, 0 })] private static readonly AsyncSubjectDisposable[] Disposed = new AsyncSubjectDisposable[0]; public override bool HasObservers => Volatile.Read<AsyncSubjectDisposable[]>(ref _observers).Length != 0; public override bool IsDisposed => Volatile.Read<AsyncSubjectDisposable[]>(ref _observers) == Disposed; public bool IsCompleted => Volatile.Read<AsyncSubjectDisposable[]>(ref _observers) == Terminated; public AsyncSubject() { _observers = Array.Empty<AsyncSubjectDisposable>(); } public override void OnCompleted() { while (true) { AsyncSubjectDisposable[] array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers); if (array == Disposed) break; if (array == Terminated) return; if (Interlocked.CompareExchange<AsyncSubjectDisposable[]>(ref _observers, Terminated, array) == array) { if (_hasValue) { T value = _value; AsyncSubjectDisposable[] array2 = array; for (int i = 0; i < array2.Length; i++) { IObserver<T> observer = array2[i].Observer; if (observer != null) { observer.OnNext(value); observer.OnCompleted(); } } } else { AsyncSubjectDisposable[] array2 = array; for (int i = 0; i < array2.Length; i++) { array2[i].Observer?.OnCompleted(); } } } } _exception = null; ThrowDisposed(); } public override void OnError(Exception error) { if (error == null) throw new ArgumentNullException("error"); while (true) { AsyncSubjectDisposable[] array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers); if (array == Disposed) break; if (array == Terminated) return; _exception = error; if (Interlocked.CompareExchange<AsyncSubjectDisposable[]>(ref _observers, Terminated, array) == array) { AsyncSubjectDisposable[] array2 = array; for (int i = 0; i < array2.Length; i++) { array2[i].Observer?.OnError(error); } } } _exception = null; _value = default(T); ThrowDisposed(); } public override void OnNext(T value) { AsyncSubjectDisposable[] array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers); if (array == Disposed) { _value = default(T); _exception = null; ThrowDisposed(); } else if (array != Terminated) { _value = value; _hasValue = true; } } public override IDisposable Subscribe(IObserver<T> observer) { if (observer == null) throw new ArgumentNullException("observer"); AsyncSubjectDisposable asyncSubjectDisposable = null; while (true) { AsyncSubjectDisposable[] array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers); if (array == Disposed) { _value = default(T); _exception = null; ThrowDisposed(); break; } if (array == Terminated) { Exception exception = _exception; if (exception != null) observer.OnError(exception); else { if (_hasValue) observer.OnNext(_value); observer.OnCompleted(); } break; } if (asyncSubjectDisposable == null) asyncSubjectDisposable = new AsyncSubjectDisposable(this, observer); int num = array.Length; AsyncSubjectDisposable[] array2 = new AsyncSubjectDisposable[num + 1]; Array.Copy(array, 0, array2, 0, num); array2[num] = asyncSubjectDisposable; if (Interlocked.CompareExchange<AsyncSubjectDisposable[]>(ref _observers, array2, array) == array) return asyncSubjectDisposable; } return Disposable.Empty; } private void Unsubscribe([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] AsyncSubjectDisposable observer) { AsyncSubjectDisposable[] array; AsyncSubjectDisposable[] array2; do { array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers); int num = array.Length; if (num == 0) break; int num2 = Array.IndexOf<AsyncSubjectDisposable>(array, observer); if (num2 < 0) break; if (num == 1) array2 = Array.Empty<AsyncSubjectDisposable>(); else { array2 = new AsyncSubjectDisposable[num - 1]; Array.Copy(array, 0, array2, 0, num2); Array.Copy(array, num2 + 1, array2, num2, num - num2 - 1); } } while (Interlocked.CompareExchange<AsyncSubjectDisposable[]>(ref _observers, array2, array) != array); } private static void ThrowDisposed() { throw new ObjectDisposedException(string.Empty); } public override void Dispose() { if (Interlocked.Exchange<AsyncSubjectDisposable[]>(ref _observers, Disposed) != Disposed) { _exception = null; _value = default(T); _hasValue = false; } } public AsyncSubject<T> GetAwaiter() { return this; } public void OnCompleted(Action continuation) { if (continuation == null) throw new ArgumentNullException("continuation"); Subscribe(new AwaitObserver(continuation)); } public T GetResult() { if (Volatile.Read<AsyncSubjectDisposable[]>(ref _observers) != Terminated) { using (ManualResetEventSlim manualResetEventSlim = new ManualResetEventSlim(false)) { Subscribe(new BlockingObserver(manualResetEventSlim)); manualResetEventSlim.Wait(); } } Exception exception = _exception; if (exception != null) exception.Throw(); if (!_hasValue) throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS); return _value; } } }