AsyncSubject<T>
Represents the result of an asynchronous operation.
The last value before the OnCompleted notification, or the error received through OnError, is sent to all subscribed observers.
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive.Subjects
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
public sealed class AsyncSubject<[System.Runtime.CompilerServices.Nullable(2)] T> : SubjectBase<T>, INotifyCompletion
{
[System.Runtime.CompilerServices.NullableContext(0)]
private sealed class AsyncSubjectDisposable : IDisposable
{
[System.Runtime.CompilerServices.Nullable(1)]
private AsyncSubject<T> _subject;
[System.Runtime.CompilerServices.Nullable(new byte[] {
2,
1
})]
private volatile IObserver<T> _observer;
[System.Runtime.CompilerServices.Nullable(new byte[] {
2,
1
})]
public IObserver<T> Observer {
[return: System.Runtime.CompilerServices.Nullable(new byte[] {
2,
1
})]
get {
return _observer;
}
}
[System.Runtime.CompilerServices.NullableContext(1)]
public AsyncSubjectDisposable(AsyncSubject<T> subject, IObserver<T> observer)
{
_subject = subject;
_observer = observer;
}
public void Dispose()
{
if (Interlocked.Exchange<IObserver<T>>(ref _observer, (IObserver<T>)null) != null) {
_subject.Unsubscribe(this);
_subject = null;
}
}
}
[System.Runtime.CompilerServices.Nullable(0)]
private sealed class AwaitObserver : IObserver<T>
{
[System.Runtime.CompilerServices.Nullable(2)]
private readonly SynchronizationContext _context;
private readonly Action _callback;
public AwaitObserver(Action callback)
{
_context = SynchronizationContext.Current;
_callback = callback;
}
public void OnCompleted()
{
InvokeOnOriginalContext();
}
public void OnError(Exception error)
{
InvokeOnOriginalContext();
}
public void OnNext(T value)
{
}
private void InvokeOnOriginalContext()
{
if (_context != null)
_context.Post(delegate(object c) {
((Action)c)();
}, _callback);
else
_callback();
}
}
[System.Runtime.CompilerServices.Nullable(0)]
private sealed class BlockingObserver : IObserver<T>
{
private readonly ManualResetEventSlim _e;
public BlockingObserver(ManualResetEventSlim e)
{
_e = e;
}
public void OnCompleted()
{
Done();
}
public void OnError(Exception error)
{
Done();
}
public void OnNext(T value)
{
}
private void Done()
{
_e.Set();
}
}
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
1,
0
})]
private AsyncSubjectDisposable[] _observers;
[System.Runtime.CompilerServices.Nullable(2)]
private T _value;
private bool _hasValue;
[System.Runtime.CompilerServices.Nullable(2)]
private Exception _exception;
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
1,
0
})]
private static readonly AsyncSubjectDisposable[] Terminated = new AsyncSubjectDisposable[0];
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
1,
0
})]
private static readonly AsyncSubjectDisposable[] Disposed = new AsyncSubjectDisposable[0];
public override bool HasObservers => Volatile.Read<AsyncSubjectDisposable[]>(ref _observers).Length != 0;
public override bool IsDisposed => Volatile.Read<AsyncSubjectDisposable[]>(ref _observers) == Disposed;
public bool IsCompleted => Volatile.Read<AsyncSubjectDisposable[]>(ref _observers) == Terminated;
public AsyncSubject()
{
_observers = Array.Empty<AsyncSubjectDisposable>();
}
public override void OnCompleted()
{
while (true) {
AsyncSubjectDisposable[] array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers);
if (array == Disposed)
break;
if (array == Terminated)
return;
if (Interlocked.CompareExchange<AsyncSubjectDisposable[]>(ref _observers, Terminated, array) == array) {
if (_hasValue) {
T value = _value;
AsyncSubjectDisposable[] array2 = array;
for (int i = 0; i < array2.Length; i++) {
IObserver<T> observer = array2[i].Observer;
if (observer != null) {
observer.OnNext(value);
observer.OnCompleted();
}
}
} else {
AsyncSubjectDisposable[] array2 = array;
for (int i = 0; i < array2.Length; i++) {
array2[i].Observer?.OnCompleted();
}
}
}
}
_exception = null;
ThrowDisposed();
}
public override void OnError(Exception error)
{
if (error == null)
throw new ArgumentNullException("error");
while (true) {
AsyncSubjectDisposable[] array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers);
if (array == Disposed)
break;
if (array == Terminated)
return;
_exception = error;
if (Interlocked.CompareExchange<AsyncSubjectDisposable[]>(ref _observers, Terminated, array) == array) {
AsyncSubjectDisposable[] array2 = array;
for (int i = 0; i < array2.Length; i++) {
array2[i].Observer?.OnError(error);
}
}
}
_exception = null;
_value = default(T);
ThrowDisposed();
}
public override void OnNext(T value)
{
AsyncSubjectDisposable[] array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers);
if (array == Disposed) {
_value = default(T);
_exception = null;
ThrowDisposed();
} else if (array != Terminated) {
_value = value;
_hasValue = true;
}
}
public override IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
AsyncSubjectDisposable asyncSubjectDisposable = null;
while (true) {
AsyncSubjectDisposable[] array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers);
if (array == Disposed) {
_value = default(T);
_exception = null;
ThrowDisposed();
break;
}
if (array == Terminated) {
Exception exception = _exception;
if (exception != null)
observer.OnError(exception);
else {
if (_hasValue)
observer.OnNext(_value);
observer.OnCompleted();
}
break;
}
if (asyncSubjectDisposable == null)
asyncSubjectDisposable = new AsyncSubjectDisposable(this, observer);
int num = array.Length;
AsyncSubjectDisposable[] array2 = new AsyncSubjectDisposable[num + 1];
Array.Copy(array, 0, array2, 0, num);
array2[num] = asyncSubjectDisposable;
if (Interlocked.CompareExchange<AsyncSubjectDisposable[]>(ref _observers, array2, array) == array)
return asyncSubjectDisposable;
}
return Disposable.Empty;
}
private void Unsubscribe([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0
})] AsyncSubjectDisposable observer)
{
AsyncSubjectDisposable[] array;
AsyncSubjectDisposable[] array2;
do {
array = Volatile.Read<AsyncSubjectDisposable[]>(ref _observers);
int num = array.Length;
if (num == 0)
break;
int num2 = Array.IndexOf<AsyncSubjectDisposable>(array, observer);
if (num2 < 0)
break;
if (num == 1)
array2 = Array.Empty<AsyncSubjectDisposable>();
else {
array2 = new AsyncSubjectDisposable[num - 1];
Array.Copy(array, 0, array2, 0, num2);
Array.Copy(array, num2 + 1, array2, num2, num - num2 - 1);
}
} while (Interlocked.CompareExchange<AsyncSubjectDisposable[]>(ref _observers, array2, array) != array);
}
private static void ThrowDisposed()
{
throw new ObjectDisposedException(string.Empty);
}
public override void Dispose()
{
if (Interlocked.Exchange<AsyncSubjectDisposable[]>(ref _observers, Disposed) != Disposed) {
_exception = null;
_value = default(T);
_hasValue = false;
}
}
public AsyncSubject<T> GetAwaiter()
{
return this;
}
public void OnCompleted(Action continuation)
{
if (continuation == null)
throw new ArgumentNullException("continuation");
Subscribe(new AwaitObserver(continuation));
}
public T GetResult()
{
if (Volatile.Read<AsyncSubjectDisposable[]>(ref _observers) != Terminated) {
using (ManualResetEventSlim manualResetEventSlim = new ManualResetEventSlim(false)) {
Subscribe(new BlockingObserver(manualResetEventSlim));
manualResetEventSlim.Wait();
}
}
Exception exception = _exception;
if (exception != null)
exception.Throw();
if (!_hasValue)
throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
return _value;
}
}
}