BehaviorSubject<T>
Represents a value that changes over time.
Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications.
using System.Diagnostics.CodeAnalysis;
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive.Subjects
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
public sealed class BehaviorSubject<[System.Runtime.CompilerServices.Nullable(2)] T> : SubjectBase<T>
{
[System.Runtime.CompilerServices.NullableContext(0)]
private sealed class Subscription : IDisposable
{
[System.Runtime.CompilerServices.Nullable(1)]
private BehaviorSubject<T> _subject;
[System.Runtime.CompilerServices.Nullable(new byte[] {
2,
1
})]
private IObserver<T> _observer;
[System.Runtime.CompilerServices.NullableContext(1)]
public Subscription(BehaviorSubject<T> subject, IObserver<T> observer)
{
_subject = subject;
_observer = observer;
}
public void Dispose()
{
IObserver<T> observer = Interlocked.Exchange<IObserver<T>>(ref _observer, (IObserver<T>)null);
if (observer != null) {
_subject.Unsubscribe(observer);
_subject = null;
}
}
}
private readonly object _gate = new object();
private ImmutableList<IObserver<T>> _observers;
private bool _isStopped;
private T _value;
[System.Runtime.CompilerServices.Nullable(2)]
private Exception _exception;
private bool _isDisposed;
public override bool HasObservers {
get {
ImmutableList<IObserver<T>> observers = _observers;
if (observers == null)
return false;
return observers.Data.Length != 0;
}
}
public override bool IsDisposed {
get {
lock (_gate) {
return _isDisposed;
}
}
}
public T Value {
get {
lock (_gate) {
CheckDisposed();
Exception exception = _exception;
if (exception != null)
exception.Throw();
return _value;
}
}
}
public BehaviorSubject(T value)
{
_value = value;
_observers = ImmutableList<IObserver<T>>.Empty;
}
public bool TryGetValue([System.Diagnostics.CodeAnalysis.MaybeNullWhen(false)] out T value)
{
lock (_gate) {
if (!_isDisposed) {
Exception exception = _exception;
if (exception != null)
exception.Throw();
value = _value;
return true;
}
value = default(T);
return false;
}
}
public override void OnCompleted()
{
IObserver<T>[] array = null;
lock (_gate) {
CheckDisposed();
if (!_isStopped) {
array = _observers.Data;
_observers = ImmutableList<IObserver<T>>.Empty;
_isStopped = true;
}
}
if (array != null) {
IObserver<T>[] array2 = array;
for (int i = 0; i < array2.Length; i++) {
array2[i].OnCompleted();
}
}
}
public override void OnError(Exception error)
{
if (error == null)
throw new ArgumentNullException("error");
IObserver<T>[] array = null;
lock (_gate) {
CheckDisposed();
if (!_isStopped) {
array = _observers.Data;
_observers = ImmutableList<IObserver<T>>.Empty;
_isStopped = true;
_exception = error;
}
}
if (array != null) {
IObserver<T>[] array2 = array;
for (int i = 0; i < array2.Length; i++) {
array2[i].OnError(error);
}
}
}
public override void OnNext(T value)
{
IObserver<T>[] array = null;
lock (_gate) {
CheckDisposed();
if (!_isStopped) {
_value = value;
array = _observers.Data;
}
}
if (array != null) {
IObserver<T>[] array2 = array;
for (int i = 0; i < array2.Length; i++) {
array2[i].OnNext(value);
}
}
}
public override IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
Exception exception = default(Exception);
lock (_gate) {
CheckDisposed();
if (!_isStopped) {
_observers = _observers.Add(observer);
observer.OnNext(_value);
return new Subscription(this, observer);
}
exception = _exception;
}
if (exception != null)
observer.OnError(exception);
else
observer.OnCompleted();
return Disposable.Empty;
}
private void Unsubscribe(IObserver<T> observer)
{
lock (_gate) {
if (!_isDisposed)
_observers = _observers.Remove(observer);
}
}
public override void Dispose()
{
lock (_gate) {
_isDisposed = true;
_observers = null;
_value = default(T);
_exception = null;
}
}
private void CheckDisposed()
{
if (_isDisposed)
throw new ObjectDisposedException(string.Empty);
}
}
}