Subject<T>
Represents an object that is both an observable sequence as well as an observer.
Each notification is broadcasted to all subscribed observers.
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive.Subjects
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
public sealed class Subject<[System.Runtime.CompilerServices.Nullable(2)] T> : SubjectBase<T>
{
[System.Runtime.CompilerServices.NullableContext(0)]
private sealed class SubjectDisposable : IDisposable
{
[System.Runtime.CompilerServices.Nullable(1)]
private Subject<T> _subject;
[System.Runtime.CompilerServices.Nullable(new byte[] {
2,
1
})]
private volatile IObserver<T> _observer;
[System.Runtime.CompilerServices.Nullable(new byte[] {
2,
1
})]
public IObserver<T> Observer {
[return: System.Runtime.CompilerServices.Nullable(new byte[] {
2,
1
})]
get {
return _observer;
}
}
[System.Runtime.CompilerServices.NullableContext(1)]
public SubjectDisposable(Subject<T> subject, IObserver<T> observer)
{
_subject = subject;
_observer = observer;
}
public void Dispose()
{
if (Interlocked.Exchange<IObserver<T>>(ref _observer, (IObserver<T>)null) != null) {
_subject.Unsubscribe(this);
_subject = null;
}
}
}
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
1,
0
})]
private SubjectDisposable[] _observers;
[System.Runtime.CompilerServices.Nullable(2)]
private Exception _exception;
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
1,
0
})]
private static readonly SubjectDisposable[] Terminated = new SubjectDisposable[0];
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
1,
0
})]
private static readonly SubjectDisposable[] Disposed = new SubjectDisposable[0];
public override bool HasObservers => Volatile.Read<SubjectDisposable[]>(ref _observers).Length != 0;
public override bool IsDisposed => Volatile.Read<SubjectDisposable[]>(ref _observers) == Disposed;
public Subject()
{
_observers = Array.Empty<SubjectDisposable>();
}
private static void ThrowDisposed()
{
throw new ObjectDisposedException(string.Empty);
}
public override void OnCompleted()
{
SubjectDisposable[] array;
do {
array = Volatile.Read<SubjectDisposable[]>(ref _observers);
if (array == Disposed) {
_exception = null;
ThrowDisposed();
return;
}
if (array == Terminated)
return;
} while (Interlocked.CompareExchange<SubjectDisposable[]>(ref _observers, Terminated, array) != array);
SubjectDisposable[] array2 = array;
for (int i = 0; i < array2.Length; i++) {
array2[i].Observer?.OnCompleted();
}
}
public override void OnError(Exception error)
{
if (error == null)
throw new ArgumentNullException("error");
SubjectDisposable[] array;
do {
array = Volatile.Read<SubjectDisposable[]>(ref _observers);
if (array == Disposed) {
_exception = null;
ThrowDisposed();
return;
}
if (array == Terminated)
return;
_exception = error;
} while (Interlocked.CompareExchange<SubjectDisposable[]>(ref _observers, Terminated, array) != array);
SubjectDisposable[] array2 = array;
for (int i = 0; i < array2.Length; i++) {
array2[i].Observer?.OnError(error);
}
}
public override void OnNext(T value)
{
SubjectDisposable[] array = Volatile.Read<SubjectDisposable[]>(ref _observers);
if (array == Disposed) {
_exception = null;
ThrowDisposed();
} else {
SubjectDisposable[] array2 = array;
for (int i = 0; i < array2.Length; i++) {
array2[i].Observer?.OnNext(value);
}
}
}
public override IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
SubjectDisposable subjectDisposable = null;
while (true) {
SubjectDisposable[] array = Volatile.Read<SubjectDisposable[]>(ref _observers);
if (array == Disposed) {
_exception = null;
ThrowDisposed();
break;
}
if (array == Terminated) {
Exception exception = _exception;
if (exception != null)
observer.OnError(exception);
else
observer.OnCompleted();
break;
}
if (subjectDisposable == null)
subjectDisposable = new SubjectDisposable(this, observer);
int num = array.Length;
SubjectDisposable[] array2 = new SubjectDisposable[num + 1];
Array.Copy(array, 0, array2, 0, num);
array2[num] = subjectDisposable;
if (Interlocked.CompareExchange<SubjectDisposable[]>(ref _observers, array2, array) == array)
return subjectDisposable;
}
return Disposable.Empty;
}
private void Unsubscribe([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0
})] SubjectDisposable observer)
{
SubjectDisposable[] array;
SubjectDisposable[] array2;
do {
array = Volatile.Read<SubjectDisposable[]>(ref _observers);
int num = array.Length;
if (num == 0)
break;
int num2 = Array.IndexOf<SubjectDisposable>(array, observer);
if (num2 < 0)
break;
if (num == 1)
array2 = Array.Empty<SubjectDisposable>();
else {
array2 = new SubjectDisposable[num - 1];
Array.Copy(array, 0, array2, 0, num2);
Array.Copy(array, num2 + 1, array2, num2, num - num2 - 1);
}
} while (Interlocked.CompareExchange<SubjectDisposable[]>(ref _observers, array2, array) != array);
}
public override void Dispose()
{
Interlocked.Exchange<SubjectDisposable[]>(ref _observers, Disposed);
_exception = null;
}
}
}