ReplaySubject<T>
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive.Subjects
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
public sealed class ReplaySubject<[System.Runtime.CompilerServices.Nullable(2)] T> : SubjectBase<T>
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
private abstract class ReplayBase : SubjectBase<T>
{
[System.Runtime.CompilerServices.NullableContext(0)]
private sealed class Subscription : IDisposable
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0
})]
private ReplayBase _subject;
[System.Runtime.CompilerServices.Nullable(new byte[] {
2,
1
})]
private System.Reactive.IScheduledObserver<T> _observer;
[System.Runtime.CompilerServices.NullableContext(1)]
public Subscription([System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0
})] ReplayBase subject, System.Reactive.IScheduledObserver<T> observer)
{
_subject = subject;
_observer = observer;
}
public void Dispose()
{
System.Reactive.IScheduledObserver<T> scheduledObserver = Interlocked.Exchange<System.Reactive.IScheduledObserver<T>>(ref _observer, (System.Reactive.IScheduledObserver<T>)null);
if (scheduledObserver != null) {
scheduledObserver.Dispose();
_subject.Unsubscribe(scheduledObserver);
_subject = null;
}
}
}
private readonly object _gate = new object();
private System.Reactive.ImmutableList<System.Reactive.IScheduledObserver<T>> _observers;
private bool _isStopped;
[System.Runtime.CompilerServices.Nullable(2)]
private Exception _error;
private bool _isDisposed;
public override bool HasObservers {
get {
System.Reactive.ImmutableList<System.Reactive.IScheduledObserver<T>> observers = _observers;
if (observers == null)
return false;
return observers.Data.Length != 0;
}
}
public override bool IsDisposed {
get {
lock (_gate) {
return _isDisposed;
}
}
}
protected ReplayBase()
{
_observers = System.Reactive.ImmutableList<System.Reactive.IScheduledObserver<T>>.Empty;
_isStopped = false;
_error = null;
}
public override void OnNext(T value)
{
System.Reactive.IScheduledObserver<T>[] array = null;
lock (_gate) {
CheckDisposed();
if (!_isStopped) {
Next(value);
Trim();
array = _observers.Data;
System.Reactive.IScheduledObserver<T>[] array2 = array;
for (int i = 0; i < array2.Length; i++) {
array2[i].OnNext(value);
}
}
}
if (array != null) {
System.Reactive.IScheduledObserver<T>[] array2 = array;
for (int i = 0; i < array2.Length; i++) {
array2[i].EnsureActive();
}
}
}
public override void OnError(Exception error)
{
System.Reactive.IScheduledObserver<T>[] array = null;
lock (_gate) {
CheckDisposed();
if (!_isStopped) {
_isStopped = true;
_error = error;
Trim();
array = _observers.Data;
System.Reactive.IScheduledObserver<T>[] array2 = array;
for (int i = 0; i < array2.Length; i++) {
array2[i].OnError(error);
}
_observers = System.Reactive.ImmutableList<System.Reactive.IScheduledObserver<T>>.Empty;
}
}
if (array != null) {
System.Reactive.IScheduledObserver<T>[] array2 = array;
for (int i = 0; i < array2.Length; i++) {
array2[i].EnsureActive();
}
}
}
public override void OnCompleted()
{
System.Reactive.IScheduledObserver<T>[] array = null;
lock (_gate) {
CheckDisposed();
if (!_isStopped) {
_isStopped = true;
Trim();
array = _observers.Data;
System.Reactive.IScheduledObserver<T>[] array2 = array;
for (int i = 0; i < array2.Length; i++) {
array2[i].OnCompleted();
}
_observers = System.Reactive.ImmutableList<System.Reactive.IScheduledObserver<T>>.Empty;
}
}
if (array != null) {
System.Reactive.IScheduledObserver<T>[] array2 = array;
for (int i = 0; i < array2.Length; i++) {
array2[i].EnsureActive();
}
}
}
public override IDisposable Subscribe(IObserver<T> observer)
{
System.Reactive.IScheduledObserver<T> scheduledObserver = CreateScheduledObserver(observer);
int num = 0;
IDisposable result = Disposable.Empty;
lock (_gate) {
CheckDisposed();
Trim();
num = Replay(scheduledObserver);
if (_error != null) {
num++;
scheduledObserver.OnError(_error);
} else if (_isStopped) {
num++;
scheduledObserver.OnCompleted();
}
if (!_isStopped) {
result = new Subscription(this, scheduledObserver);
_observers = _observers.Add(scheduledObserver);
}
}
scheduledObserver.EnsureActive(num);
return result;
}
public override void Dispose()
{
lock (_gate) {
_isDisposed = true;
_observers = null;
DisposeCore();
}
}
protected abstract void DisposeCore();
protected abstract void Next(T value);
protected abstract int Replay(IObserver<T> observer);
protected abstract void Trim();
protected abstract System.Reactive.IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer);
private void CheckDisposed()
{
if (_isDisposed)
throw new ObjectDisposedException(string.Empty);
}
private void Unsubscribe(System.Reactive.IScheduledObserver<T> observer)
{
lock (_gate) {
if (!_isDisposed)
_observers = _observers.Remove(observer);
}
}
}
[System.Runtime.CompilerServices.Nullable(0)]
private sealed class ReplayByTime : ReplayBase
{
private const int InfiniteBufferSize = int.MaxValue;
private readonly int _bufferSize;
private readonly TimeSpan _window;
private readonly IScheduler _scheduler;
private readonly IStopwatch _stopwatch;
[System.Runtime.CompilerServices.Nullable(new byte[] {
1,
0,
1
})]
private readonly Queue<TimeInterval<T>> _queue;
public ReplayByTime(int bufferSize, TimeSpan window, IScheduler scheduler)
{
if (bufferSize < 0)
throw new ArgumentOutOfRangeException("bufferSize");
if (window < TimeSpan.Zero)
throw new ArgumentOutOfRangeException("window");
_bufferSize = bufferSize;
_window = window;
if (scheduler == null)
throw new ArgumentNullException("scheduler");
_scheduler = scheduler;
_stopwatch = _scheduler.StartStopwatch();
_queue = new Queue<TimeInterval<T>>();
}
public ReplayByTime(int bufferSize, TimeSpan window)
: this(bufferSize, window, System.Reactive.Concurrency.SchedulerDefaults.Iteration)
{
}
public ReplayByTime(IScheduler scheduler)
: this(2147483647, TimeSpan.MaxValue, scheduler)
{
}
public ReplayByTime(int bufferSize, IScheduler scheduler)
: this(bufferSize, TimeSpan.MaxValue, scheduler)
{
}
public ReplayByTime(TimeSpan window, IScheduler scheduler)
: this(2147483647, window, scheduler)
{
}
public ReplayByTime(TimeSpan window)
: this(2147483647, window, System.Reactive.Concurrency.SchedulerDefaults.Iteration)
{
}
protected override System.Reactive.IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer)
{
return new System.Reactive.ScheduledObserver<T>(_scheduler, observer);
}
protected override void DisposeCore()
{
_queue.Clear();
}
protected override void Next(T value)
{
TimeSpan elapsed = _stopwatch.Elapsed;
_queue.Enqueue(new TimeInterval<T>(value, elapsed));
}
protected override int Replay(IObserver<T> observer)
{
int count = _queue.Count;
foreach (TimeInterval<T> item in _queue) {
observer.OnNext(item.Value);
}
return count;
}
protected override void Trim()
{
TimeSpan elapsed = _stopwatch.Elapsed;
while (_queue.Count > _bufferSize) {
_queue.Dequeue();
}
while (_queue.Count > 0 && elapsed.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0) {
_queue.Dequeue();
}
}
}
[System.Runtime.CompilerServices.Nullable(0)]
private sealed class ReplayOne : ReplayBufferBase
{
private bool _hasValue;
[System.Runtime.CompilerServices.Nullable(2)]
private T _value;
protected override void Trim()
{
}
protected override void Next(T value)
{
_hasValue = true;
_value = value;
}
protected override int Replay(IObserver<T> observer)
{
int result = 0;
if (_hasValue) {
result = 1;
observer.OnNext(_value);
}
return result;
}
protected override void DisposeCore()
{
_value = default(T);
}
}
[System.Runtime.CompilerServices.NullableContext(0)]
private sealed class ReplayMany : ReplayManyBase
{
private readonly int _bufferSize;
public ReplayMany(int bufferSize)
: base(bufferSize)
{
_bufferSize = bufferSize;
}
protected override void Trim()
{
while (_queue.Count > _bufferSize) {
_queue.Dequeue();
}
}
}
[System.Runtime.CompilerServices.NullableContext(0)]
private sealed class ReplayAll : ReplayManyBase
{
public ReplayAll()
: base(0)
{
}
protected override void Trim()
{
}
}
[System.Runtime.CompilerServices.NullableContext(0)]
private abstract class ReplayBufferBase : ReplayBase
{
[System.Runtime.CompilerServices.NullableContext(1)]
protected override System.Reactive.IScheduledObserver<T> CreateScheduledObserver(IObserver<T> observer)
{
return new System.Reactive.Subjects.FastImmediateObserver<T>(observer);
}
protected override void DisposeCore()
{
}
}
[System.Runtime.CompilerServices.Nullable(0)]
private abstract class ReplayManyBase : ReplayBufferBase
{
protected readonly Queue<T> _queue;
protected ReplayManyBase(int queueSize)
{
_queue = new Queue<T>(Math.Min(queueSize, 64));
}
protected override void Next(T value)
{
_queue.Enqueue(value);
}
protected override int Replay(IObserver<T> observer)
{
int count = _queue.Count;
foreach (T item in _queue) {
observer.OnNext(item);
}
return count;
}
protected override void DisposeCore()
{
_queue.Clear();
}
}
private readonly SubjectBase<T> _implementation;
public override bool HasObservers => _implementation.HasObservers;
public override bool IsDisposed => _implementation.IsDisposed;
public ReplaySubject()
: this(2147483647)
{
}
public ReplaySubject(IScheduler scheduler)
{
_implementation = new ReplayByTime(scheduler);
}
public ReplaySubject(int bufferSize)
{
SubjectBase<T> implementation;
switch (bufferSize) {
case 1:
implementation = new ReplayOne();
break;
case int.MaxValue:
implementation = new ReplayAll();
break;
default:
implementation = new ReplayMany(bufferSize);
break;
}
_implementation = implementation;
}
public ReplaySubject(int bufferSize, IScheduler scheduler)
{
_implementation = new ReplayByTime(bufferSize, scheduler);
}
public ReplaySubject(TimeSpan window)
{
_implementation = new ReplayByTime(window);
}
public ReplaySubject(TimeSpan window, IScheduler scheduler)
{
_implementation = new ReplayByTime(window, scheduler);
}
public ReplaySubject(int bufferSize, TimeSpan window)
{
_implementation = new ReplayByTime(bufferSize, window);
}
public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
{
_implementation = new ReplayByTime(bufferSize, window, scheduler);
}
public override void OnNext(T value)
{
_implementation.OnNext(value);
}
public override void OnError(Exception error)
{
if (error == null)
throw new ArgumentNullException("error");
_implementation.OnError(error);
}
public override void OnCompleted()
{
_implementation.OnCompleted();
}
public override IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
return _implementation.Subscribe(observer);
}
public override void Dispose()
{
_implementation.Dispose();
}
}
}