<PackageReference Include="System.Reactive" Version="6.0.1" />

Subject<T>

public sealed class Subject<T> : SubjectBase<T>
Represents an object that is both an observable sequence as well as an observer. Each notification is broadcasted to all subscribed observers.
using System.Reactive.Disposables; using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive.Subjects { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(new byte[] { 0, 1 })] public sealed class Subject<[System.Runtime.CompilerServices.Nullable(2)] T> : SubjectBase<T> { [System.Runtime.CompilerServices.NullableContext(0)] private sealed class SubjectDisposable : IDisposable { [System.Runtime.CompilerServices.Nullable(1)] private Subject<T> _subject; [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 1 })] private volatile IObserver<T> _observer; [System.Runtime.CompilerServices.Nullable(new byte[] { 2, 1 })] public IObserver<T> Observer { [return: System.Runtime.CompilerServices.Nullable(new byte[] { 2, 1 })] get { return _observer; } } [System.Runtime.CompilerServices.NullableContext(1)] public SubjectDisposable(Subject<T> subject, IObserver<T> observer) { _subject = subject; _observer = observer; } public void Dispose() { if (Interlocked.Exchange<IObserver<T>>(ref _observer, (IObserver<T>)null) != null) { _subject.Unsubscribe(this); _subject = null; } } } [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 1, 0 })] private SubjectDisposable[] _observers; [System.Runtime.CompilerServices.Nullable(2)] private Exception _exception; [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 1, 0 })] private static readonly SubjectDisposable[] Terminated = new SubjectDisposable[0]; [System.Runtime.CompilerServices.Nullable(new byte[] { 1, 1, 0 })] private static readonly SubjectDisposable[] Disposed = new SubjectDisposable[0]; public override bool HasObservers => Volatile.Read<SubjectDisposable[]>(ref _observers).Length != 0; public override bool IsDisposed => Volatile.Read<SubjectDisposable[]>(ref _observers) == Disposed; public Subject() { _observers = Array.Empty<SubjectDisposable>(); } private static void ThrowDisposed() { throw new ObjectDisposedException(string.Empty); } public override void OnCompleted() { SubjectDisposable[] array; do { array = Volatile.Read<SubjectDisposable[]>(ref _observers); if (array == Disposed) { _exception = null; ThrowDisposed(); return; } if (array == Terminated) return; } while (Interlocked.CompareExchange<SubjectDisposable[]>(ref _observers, Terminated, array) != array); SubjectDisposable[] array2 = array; for (int i = 0; i < array2.Length; i++) { array2[i].Observer?.OnCompleted(); } } public override void OnError(Exception error) { if (error == null) throw new ArgumentNullException("error"); SubjectDisposable[] array; do { array = Volatile.Read<SubjectDisposable[]>(ref _observers); if (array == Disposed) { _exception = null; ThrowDisposed(); return; } if (array == Terminated) return; _exception = error; } while (Interlocked.CompareExchange<SubjectDisposable[]>(ref _observers, Terminated, array) != array); SubjectDisposable[] array2 = array; for (int i = 0; i < array2.Length; i++) { array2[i].Observer?.OnError(error); } } public override void OnNext(T value) { SubjectDisposable[] array = Volatile.Read<SubjectDisposable[]>(ref _observers); if (array == Disposed) { _exception = null; ThrowDisposed(); } else { SubjectDisposable[] array2 = array; for (int i = 0; i < array2.Length; i++) { array2[i].Observer?.OnNext(value); } } } public override IDisposable Subscribe(IObserver<T> observer) { if (observer == null) throw new ArgumentNullException("observer"); SubjectDisposable subjectDisposable = null; while (true) { SubjectDisposable[] array = Volatile.Read<SubjectDisposable[]>(ref _observers); if (array == Disposed) { _exception = null; ThrowDisposed(); break; } if (array == Terminated) { Exception exception = _exception; if (exception != null) observer.OnError(exception); else observer.OnCompleted(); break; } if (subjectDisposable == null) subjectDisposable = new SubjectDisposable(this, observer); int num = array.Length; SubjectDisposable[] array2 = new SubjectDisposable[num + 1]; Array.Copy(array, 0, array2, 0, num); array2[num] = subjectDisposable; if (Interlocked.CompareExchange<SubjectDisposable[]>(ref _observers, array2, array) == array) return subjectDisposable; } return Disposable.Empty; } private void Unsubscribe([System.Runtime.CompilerServices.Nullable(new byte[] { 1, 0 })] SubjectDisposable observer) { SubjectDisposable[] array; SubjectDisposable[] array2; do { array = Volatile.Read<SubjectDisposable[]>(ref _observers); int num = array.Length; if (num == 0) break; int num2 = Array.IndexOf<SubjectDisposable>(array, observer); if (num2 < 0) break; if (num == 1) array2 = Array.Empty<SubjectDisposable>(); else { array2 = new SubjectDisposable[num - 1]; Array.Copy(array, 0, array2, 0, num2); Array.Copy(array, num2 + 1, array2, num2, num - num2 - 1); } } while (Interlocked.CompareExchange<SubjectDisposable[]>(ref _observers, array2, array) != array); } public override void Dispose() { Interlocked.Exchange<SubjectDisposable[]>(ref _observers, Disposed); _exception = null; } } }