AsyncSubject<T>
Represents the result of an asynchronous operation.
The last value before the OnCompleted notification, or the error received through OnError, is sent to all subscribed observers.
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive.Subjects
{
public sealed class AsyncSubject<T> : SubjectBase<T>, INotifyCompletion
{
private sealed class AsyncSubjectDisposable : IDisposable
{
internal readonly IObserver<T> Downstream;
private AsyncSubject<T> _parent;
public AsyncSubjectDisposable(AsyncSubject<T> parent, IObserver<T> downstream)
{
_parent = parent;
Downstream = downstream;
}
public void Dispose()
{
Interlocked.Exchange<AsyncSubject<T>>(ref _parent, (AsyncSubject<T>)null)?.Remove(this);
}
internal bool IsDisposed()
{
return Volatile.Read<AsyncSubject<T>>(ref _parent) == null;
}
}
private sealed class AwaitObserver : IObserver<T>
{
private readonly SynchronizationContext _context;
private readonly Action _callback;
public AwaitObserver(Action callback, bool originalContext)
{
if (originalContext)
_context = SynchronizationContext.Current;
_callback = callback;
}
public void OnCompleted()
{
InvokeOnOriginalContext();
}
public void OnError(Exception error)
{
InvokeOnOriginalContext();
}
public void OnNext(T value)
{
}
private void InvokeOnOriginalContext()
{
if (_context != null)
_context.Post(delegate(object c) {
((Action)c)();
}, _callback);
else
_callback();
}
}
private AsyncSubjectDisposable[] _observers;
private T _value;
private bool _hasValue;
private Exception _exception;
private static readonly AsyncSubjectDisposable[] Terminated = new AsyncSubjectDisposable[0];
private static readonly AsyncSubjectDisposable[] Disposed = new AsyncSubjectDisposable[0];
public override bool HasObservers => _observers.Length != 0;
public override bool IsDisposed => Volatile.Read<AsyncSubjectDisposable[]>(ref _observers) == Disposed;
public bool IsCompleted => Volatile.Read<AsyncSubjectDisposable[]>(ref _observers) == Terminated;
public AsyncSubject()
{
_observers = Array.Empty<AsyncSubjectDisposable>();
}
public override void OnCompleted()
{
while (true) {
AsyncSubjectDisposable[] array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers);
if (array == Disposed)
break;
if (array == Terminated)
return;
if (Interlocked.CompareExchange<AsyncSubjectDisposable[]>(ref _observers, Terminated, array) == array) {
if (_hasValue) {
T value = _value;
AsyncSubjectDisposable[] array2 = array;
foreach (AsyncSubjectDisposable asyncSubjectDisposable in array2) {
if (!asyncSubjectDisposable.IsDisposed()) {
asyncSubjectDisposable.Downstream.OnNext(value);
asyncSubjectDisposable.Downstream.OnCompleted();
}
}
} else {
AsyncSubjectDisposable[] array2 = array;
foreach (AsyncSubjectDisposable asyncSubjectDisposable2 in array2) {
if (!asyncSubjectDisposable2.IsDisposed())
asyncSubjectDisposable2.Downstream.OnCompleted();
}
}
}
}
_exception = null;
ThrowDisposed();
}
public override void OnError(Exception error)
{
if (error == null)
throw new ArgumentNullException("error");
while (true) {
AsyncSubjectDisposable[] array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers);
if (array == Disposed)
break;
if (array == Terminated)
return;
_exception = error;
if (Interlocked.CompareExchange<AsyncSubjectDisposable[]>(ref _observers, Terminated, array) == array) {
AsyncSubjectDisposable[] array2 = array;
foreach (AsyncSubjectDisposable asyncSubjectDisposable in array2) {
if (!asyncSubjectDisposable.IsDisposed())
asyncSubjectDisposable.Downstream.OnError(error);
}
}
}
_exception = null;
_value = default(T);
ThrowDisposed();
}
public override void OnNext(T value)
{
AsyncSubjectDisposable[] array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers);
if (array == Disposed) {
_value = default(T);
_exception = null;
ThrowDisposed();
} else if (array != Terminated) {
_value = value;
_hasValue = true;
}
}
public override IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
AsyncSubjectDisposable asyncSubjectDisposable = new AsyncSubjectDisposable(this, observer);
if (!Add(asyncSubjectDisposable)) {
Exception exception = _exception;
if (exception != null)
observer.OnError(exception);
else {
if (_hasValue)
observer.OnNext(_value);
observer.OnCompleted();
}
return Disposable.Empty;
}
return asyncSubjectDisposable;
}
private bool Add(AsyncSubjectDisposable inner)
{
AsyncSubjectDisposable[] array;
AsyncSubjectDisposable[] array2;
do {
array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers);
if (array == Disposed) {
_value = default(T);
_exception = null;
ThrowDisposed();
return true;
}
if (array == Terminated)
return false;
int num = array.Length;
array2 = new AsyncSubjectDisposable[num + 1];
Array.Copy(array, 0, array2, 0, num);
array2[num] = inner;
} while (Interlocked.CompareExchange<AsyncSubjectDisposable[]>(ref _observers, array2, array) != array);
return true;
}
private void Remove(AsyncSubjectDisposable inner)
{
AsyncSubjectDisposable[] array;
AsyncSubjectDisposable[] array2;
do {
array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers);
int num = array.Length;
if (num == 0)
break;
int num2 = -1;
for (int i = 0; i < num; i++) {
if (array[i] == inner) {
num2 = i;
break;
}
}
if (num2 < 0)
break;
array2 = null;
if (num == 1)
array2 = Array.Empty<AsyncSubjectDisposable>();
else {
array2 = new AsyncSubjectDisposable[num - 1];
Array.Copy(array, 0, array2, 0, num2);
Array.Copy(array, num2 + 1, array2, num2, num - num2 - 1);
}
} while (Interlocked.CompareExchange<AsyncSubjectDisposable[]>(ref _observers, array2, array) != array);
}
private void ThrowDisposed()
{
throw new ObjectDisposedException(string.Empty);
}
public override void Dispose()
{
if (Interlocked.Exchange<AsyncSubjectDisposable[]>(ref _observers, Disposed) != Disposed) {
_exception = null;
_value = default(T);
_hasValue = false;
}
}
public AsyncSubject<T> GetAwaiter()
{
return this;
}
public void OnCompleted(Action continuation)
{
if (continuation == null)
throw new ArgumentNullException("continuation");
OnCompleted(continuation, true);
}
private void OnCompleted(Action continuation, bool originalContext)
{
Subscribe(new AwaitObserver(continuation, originalContext));
}
public T GetResult()
{
if (Volatile.Read<AsyncSubjectDisposable[]>(ref _observers) != Terminated) {
ManualResetEvent e = new ManualResetEvent(false);
OnCompleted(delegate {
e.Set();
}, false);
e.WaitOne();
}
_exception.ThrowIfNotNull();
if (!_hasValue)
throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
return _value;
}
}
}